Introduction to the commonly used MQTT client library

Overview

MQTT is a lightweight publish-subscribe mode messaging protocol designed for IoT applications in low-bandwidth and unstable network environments. MQTT is based on the publish/subscribe paradigm and works on the TCP/IP protocol family. MQTT protocol is lightweight, simple, open and easy to implement, which makes it suitable for a wide range of applications.

  • Eclipse Paho Java Client
  • Eclipse Paho MQTT Go client
  • emqtt: Erlang MQTT client library provided by EMQ
  • MQTT.js Web & Node.js Platform MQTT Client
  • Eclipse Paho Python

Sample application introduction

The action of the MQTT client throughout its lifecycle can be summarized as: establishing a connection, subscribing to a topic, receiving and processing a message, publishing a message to a specified topic, unsubscribing, and disconnecting.

  • Specify the MQTT Broker basic information access address and port
  • Specify whether the transfer type is TCP or MQTT over WebSocket
  • If TLS is enabled, it is required to select the protocol version with the corresponding certificate.
  • If Broker has enabled authentication, the client needs to carry the corresponding MQTT Username Password information
  • Configure client parameters such as keepalive duration, clean session callback retention flag, MQTT protocol version, will message (LWT), etc.
  • Subscribe to the topic: After the connection is successfully established, the topic can be subscribed to when the topic information is specified.
  • Specify topic filter Topic, support for the use of the topic wildcards + and # during subscribing
  • Specify QoS, Qos 0 1 2 is optional according to the implementation of the client library and the broker. Note that some brokers and cloud service providers do not support some QoS levels. For example, AWS IoT, Alibaba Cloud IoT Suite, and Azure IoT Hub do not support QoS 2 Level message
  • Subscribing to topics may fail due to network issues or Broker ACL rule restrictions
  • Publishing a message: Publish a message to a specified topic
  • Specify the target topic. Note that the topic cannot contain the wildcard + or #, which may lead to message publishing failure and client disconnection (depending on the implementation of broker and Client Library)
  • Specify the message QoS level. There are also different QoS levels supported by different brokers and platforms. For example, if Azure IoT Hub publishes a message of QoS 2, it will disconnect the client
  • Specify the message body content, whose size cannot exceed the maximum message size set by the broker
  • Specify message Retain flag
  • Unsubscribe:
  • Specify the target topic
  • Disconnect:
  • Proactively disconnect with issuing a Will Message (LWT)

Eclipse Paho C and Eclipse Paho Embedded C

Both Eclipse Paho C and Eclipse Paho Embedded C are client libraries under the Eclipse Paho project, which are full-featured MQTT clients written in ANSI C. Eclipse Paho Embedded C can be used on desktop operating systems, but mainly for Embedded environments such as mbedArduino and FreeRTOS .

  • There is only one call block API-waitForCompletion in the asynchronous API, and the result is notified through the callback, which is more suitable for the environment of the non-main thread.
#include "stdio.h"
#include "stdlib.h"
#include "string.h"

#include "MQTTClient.h"

#define ADDRESS "tcp://broker.emqx.io:1883"
#define CLIENTID "emqx_test"
#define TOPIC "testtopic/1"
#define PAYLOAD "Hello World!"
#define QOS 1
#define TIMEOUT 10000L

int main(int argc, char* argv[])
{
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token;
int rc;

MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);

// Connection parameters
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;

if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return code %d\n", rc);
exit(-1);
}

// Publish message
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 0;
MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
printf("Waiting for up to %d seconds for publication of %s\n"
"on topic %s for client with ClientID: %s\n",
(int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);
rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
printf("Message with delivery token %d delivered\n", token);

// Disconnect
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
return rc;
}

Eclipse Paho Java Client

Eclipse Paho Java Client is an MQTT client library written in Java that can be used with JVM or other Java compatible platforms such as Android.

<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
package io.emqx;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;


public class App {
public static void main(String[] args) {
String subTopic = "testtopic/#";
String pubTopic = "testtopic/1";
String content = "Hello World";
int qos = 2;
String broker = "tcp://broker.emqx.io:1883";
String clientId = "emqx_test";
MemoryPersistence persistence = new MemoryPersistence();

try {
MqttClient client = new MqttClient(broker, clientId, persistence);

// Connection options
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName("emqx_test");
connOpts.setPassword("emqx_test_password".toCharArray());
// Retain connection
connOpts.setCleanSession(true);

// Set callback
client.setCallback(new PushCallback());

// Setup connection
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);

System.out.println("Connected");
System.out.println("Publishing message: " + content);

// Publish
client.subscribe(subTopic);

// Required parameters for publishing message
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
client.publish(pubTopic, message);
System.out.println("Message published");

client.disconnect();
System.out.println("Disconnected");
client.close();
System.exit(0);
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
package io.emqx;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class OnMessageCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
// Reconnect after lost connection.
System.out.println("Connection lost, and re-connect here.");
}

public void messageArrived(String topic, MqttMessage message) throws Exception {
// Message handler after receiving message
System.out.println("Topic:" + topic);
System.out.println("QoS:" + message.getQos());
System.out.println("Payload:" + new String(message.getPayload()));
}

public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}

Eclipse Paho MQTT Go client

Eclipse Paho MQTT Go Client is the Go language client library for the Eclipse Paho project, which can connect to the MQTT Broker to publish messages, subscribe to topics and receive published messages and support a completely asynchronous mode of operation.

go get github.com/eclipse/paho.mqtt.golang
package main

import (
"fmt"
"log"
"os"
"time"

"github.com/eclipse/paho.mqtt.golang"
)

var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
}

func main() {
mqtt.DEBUG = log.New(os.Stdout, "", 0)
mqtt.ERROR = log.New(os.Stdout, "", 0)
opts := mqtt.NewClientOptions().AddBroker("tcp://broker.emqx.io:1883").SetClientID("emqx_test_client")

opts.SetKeepAlive(60 * time.Second)
// Message callback handler
opts.SetDefaultPublishHandler(f)
opts.SetPingTimeout(1 * time.Second)

c := mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}

// Subscription
if token := c.Subscribe("testtopic/#", 0, nil); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}

// Publish message
token := c.Publish("testtopic/1", 0, false, "Hello World")
token.Wait()

time.Sleep(6 * time.Second)

// Cancel subscription
if token := c.Unsubscribe("testtopic/#"); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}

// Disconnect
c.Disconnect(250)
time.Sleep(1 * time.Second)
}

emqtt : Erlang MQTT client library provided by EMQ

emqtt is a client library officially provided by EMQ of the open source MQTT Broker EMQ X, which is applicable for the Erlang language.

ClientId = <<"test">>.
{ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]).
{ok, _Props} = emqtt:connect(ConnPid).
Topic = <<"guide/#">>.
QoS = 1.
{ok, _Props, _ReasonCodes} = emqtt:subscribe(ConnPid, {Topic, QoS}).
{ok, _PktId} = emqtt:publish(ConnPid, <<"guide/1">>, <<"Hello World!">>, QoS).
%% If the qos of publish packet is 0, `publish` function would not return packetid.
ok = emqtt:publish(ConnPid, <<"guide/2">>, <<"Hello World!">>, 0).

%% Recursively get messages from mail box.
Y = fun (Proc) -> ((fun (F) -> F(F) end)((fun(ProcGen) -> Proc(fun() -> (ProcGen(ProcGen))() end) end))) end.
Rec = fun(Receive) -> fun()-> receive {publish, Msg} -> io:format("Msg: ~p~n", [Msg]), Receive(); _Other -> Receive() after 5 -> ok end end end.
(Y(Rec))().

%% If you don't like y combinator, you can also try named function to recursively get messages in erlang shell.
Receive = fun Rec() -> receive {publish, Msg} -> io:format("Msg: ~p~n", [Msg]), Rec(); _Other -> Rec() after 5 -> ok end end.
Receive().

{ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, <<"guide/#">>).

ok = emqtt:disconnect(ConnPid).

MQTT.js Web & Node.js platform MQTT client

MQTT.js is a module written in JavaScript that implements the MQTT protocol client functionality and can be used in Node.js or in a browser environment. When used in Node.js, the -g global installation can be done with a command line, and it can be integrated into the project for callback.

  • Node.js environment: MQTT, MQTT over WebSocket
npm i mqtt
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
<script>
// Initialize a global mqtt variable
console.log(mqtt)
</script>
// const mqtt = require('mqtt')
import mqtt from 'mqtt'

// Connection option
const options = {
clean: true, // Retain connection
connectTimeout: 4000, // Timeout
// Authtication
clientId: 'emqx_test',
username: 'emqx_test',
password: 'emqx_test',
}

// Connection string
// ws: unsecured WebSocket
// wss: secured WebSocket connection
// mqtt: unsecured TCP connection
// mqtts: secured TCP connection
const connectUrl = 'wss://broker.emqx.io:8084/mqtt'
const client = mqtt.connect(connectUrl, options)

client.on('reconnect', (error) => {
console.log('reconnect:', error)
})

client.on('reconnect', (error) => {
console.log('reconnect:', error)
})

client.on('message', (topic, message) => {
console.log('message:', topic, message.toString())
})

Eclipse Paho Python

Eclipse Paho Python is the Python language client library under the Eclipse Paho project, which can connect to the MQTT Broker to publish messages, subscribe to topics and receive Published message.

pip install paho-mqtt
import paho.mqtt.client as mqtt


# Successful Connection Callback
def on_connect(client, userdata, flags, rc):
print('Connected with result code '+str(rc))
client.subscribe('testtopic/#')

# Message delivery callback
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))

client = mqtt.Client()

# Set callback handler
client.on_connect = on_connect
client.on_message = on_message

# Set up connection
client.connect('broker.emqx.io', 1883, 60)
# Publish message
client.publish('emqtt',payload='Hello World',qos=0)

client.loop_forever()

Summary

This is the introduction of the MQTT protocol, MQTT client library usage process and common MQTT clients. Readers are welcome to learn MQTT and develop projects through EMQ X.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
EMQ Technologies

EMQ is an open-source IoT data infrastructure software provider, delivering the world’s leading open-source MQTT message broker and stream processing database.