EMQ X plug-in persistence series (3)-EMQ X Redis data persistence

Introduction to EMQ X Data Persistence

Data persistence usage scenarios include recording the client’s online and offline status, subscribing to topic information, message content, operations of sending message receipt after message arrives to various databases such as Redis, MySQL, PostgreSQL, directing, Cassandra, AWS DynamoDB for quick query of external service or retaining the current running status in the service outage/client abnormal offline period , and restoreing the previous state when the connection is restored. Persistence can also be used for client proxy subscription. When the device client goes online, the persistence module directly loads the preset topic from the database and completes the proxy subscription, reducing the complexity of system design and reducing the communication cost of client subscription.

Users can also implement similar functions by subscribing to related topics, but these persistent support built into the enterprise version are more efficient and reliable, which greatly reduces the developer’s workload and improves system stability.

Persistent design

The principle of persistence is to call the processing function (action) when the configuration event hook is triggered. After the processing function obtains the corresponding data, it processes according to the configured instructions to realize the addition, deletion, modification and checking of the data. The same event hooks have the same parameters available in different databases, but the processing function (action) vary depending on the database characteristics. The overall persistence mode and process are as follows:

One-to-one message storage

  1. Publish a message;
  2. Backend records the message in the database;
  3. Subscribe to the topic;
  4. Backend gets the message of the topic from the database;
  5. Send a message to the Subscriber;
  6. After the Subscriber confirms, Backend removes the message from the database;

One-to-many message storage

  1. Publish a message;
  2. Backend records the message in the database;
  3. subscribe to topics by SUB1 and SUB2 ;
  4. Backend gets the message of the topic from the database;
  5. Send a message to SUB1 and SUB2;
  6. Backend records the location of the SUB1 and SUB2 read messages, as the next time the message is retrieved from there.

Redis Data persistence

This article uses practical examples to illustrate how Redis can store relevant information.

Redis is a high-performance key-value database that is fully open source for free and comply with the BSD protocol.

Redis has the following characteristics compared to other key-value cache products:

  • Redis has extremely high performance and supports 100,000-level read and write speeds in a single machine.
  • Redis supports data persistence. It can save the data in memory to disk and load it again when it is restarted.
  • Redis not only supports simple key-value data, but also provides storage for data structures such as list, set, zset, and hash.
  • Redis supports backup of data, i.e. data backup in master-slave mode data backup.

Readers can refer to Redis’ official Quick Start to install Redis (at the time of writing, Redis version is 5.0), and start the Redis server with the redis-server command.

Configure EMQ X server

In terms of EMQ X installed via RPM, the Redis-related configuration files are located in /etc/emqx/plugins/emqx_backend_redis.conf. If only Redis persistence function is test, most configurations do not need to be changed. The only place to be changed is the address of the Redis server: If the Redis installed by the reader is not on the same server as EMQ X, specify the address and port of the correct Redis server, as shown in the follow:

## Redis Server 127.0.0.1:6379, Redis Sentinel: 127.0.0.1:26379
backend.redis.pool1.server = 127.0.0.1:6379

Leave the rest of the configuration file unchanged and start the plugin:

emqx_ctl plugins load emqx_backend_redis

Client online status storage

When the client is online or offline, update the online status, online or offline time, and node client list to the redis database.

Although EMQ x itself provides the device online status API, it is more efficient to obtain the record directly from the database than to call the EMQ x API in the scenarios where the client online status and online / offline time need to be obtained frequently.

Configuration item

Open the configuration file and configure the Backend rules:

## Online
backend.redis.hook.client.connected.1 = { "action": { "function": "on_client_connected" }, "pool": "pool1"}

## Offline
backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}

Use example

Opens the http://127.0.0.1:18083 of EMQ X management console from the browser, and create a new client connection in Tools -> Websocket, specifying clientid as sub_client:

Open the redis-cli command line window and execute the command keys *. The result is as shown below. The reader can see that two keys are stored in Redis:

127.0.0.1:6379> keys *
1) "mqtt:node:emqx@127.0.0.1"
2) "mqtt:client:sub_client"

Connection list

The plugin records the client list and connection timestamp information under the node in the format of mqtt:node:{node_name}. The equivalent operation is as follows:

## redis key is mqtt:node:{node_name}
HMSET mqtt:node:emqx@127.0.0.1 sub_client 1542272836

Field description:

## Online device information under the node
127.0.0.1:6379> HGETALL mqtt:node:emqx@127.0.0.1
1) "sub_client1" # clientid
2) "1542272836" # Time stamp of online time
3) "sub_client"
4) "1542272836"

Connection details

The plug-in records the online status and online time of the client in the format of ‘mqtt: client: {client {ID}’. Equivalent operation is as follows:

## redis key is mqtt:client:{client_id}
HMSET mqtt:client:sub_client state 1 online_at 1542272854

Field description:

## Client online status
127.0.0.1:6379> HGETALL mqtt:client:sub_client
1) "state"
2) "0" # 0 offline 1 online
3) "online_at"
4) "1542272854" # online time stamp
5) "offline_at"
6) "undefined" # offline time stamp

Client proxy subscription

When the client goes online, the storage module reads the preset subscription list directly from the database, and the proxy loads the subscription topic. In the scenario where the client needs to communicate through a preset topic (receiving message), the application can set/change the proxy subscription list from the data layer.

Configuration item

Open the configuration file and configure the Backend rules:

## hook: client.connected
## action/function: on_subscribe_lookup
backend.redis.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}

Use example

When the sub_client device goes online, it needs to subscribe to the two QoS 1 topics of sub_client/upstream and sub_client/downlink:

  1. The plugin initializes the proxy subscription Hash in Redis in the format mqtt:sub:{client_id}:
## redis key 为 mqtt:sub:{client_id}
## HSET key {topic} {qos}
127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/upstream 1
(integer) 0

127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/downlink 1
(integer) 0

2. On the websocket page of the EMQ x management console, create a new client connection with the clientid sub_client and switch to the subscription page. It can be seen that the current client automatically subscribes to the two QoS 1 topics of sub client / upstream and sub client / downlink:

3. Switch back to the management Console WebSocket page to publish a message to the sub_client/downlink topic, and the published message can be received in the message subscription list.

Published message persistence

Configuration item

Open the configuration file, configure the backend rule, and support message filtering with the topic parameter. Use the wildcard of # here to store any topic message:

## hook: message.publish
## action/function: on_message_publish

backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

Use example

In the WebSocket page of EMQ X Management Console, use clientid sub_client to establish a connection and publish multiple messages to the topic upstream_topic. For each message, EMQ X will persist the two records of the message list and the message details.

Message list

EMQ X persists the message list as the message id to the mqtt:msg:{topic} Redis collection:

## Obtain all message ids in the upstream_topic collection
127.0.0.1:6379> ZRANGE mqtt:msg:upstream_topic 0 -1
1) "2VFsyhDm0cPIQvnY9osj"
2) "2VFszTClyjpVtLDLrn1u"
3) "2VFszozkwkYOcbEy8QN9"
4) "2VFszpEc7DfbEqC97I3g"
5) "2VFszpSzRviADmcOeuXd"
6) "2VFszpm3kvvLkJTcdmGU"
7) "2VFt0kuNrOktefX6m4nP"
127.0.0.1:6379>

Message details

Each message detail will be stored in Redis Hash as the key in mqtt:msg:{message_id} format:

## Obtain the message details with message id of 2VFt0kuNrOktefX6m4nP
127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP
1) "id"
2) "2VFt0kuNrOktefX6m4nP" ## message id
3) "from"
4) "sub_client" ## client id
5) "qos"
6) "2"
7) "topic"
8) "up/upstream_topic"
9) "payload"
10) "{ \"cmd\": \"reboot\" }"
11) "ts"
12) "1542338754" ## pub 时间戳
13) "retain"
14) "false"

Get offline messages

Configuration item

Open the configuration file and configure the Backend rules:

## hook: session.subscribed
## action/function: on_message_fetch_for_queue、on_message_fetch_for_pubsub

## One-to-one offline message
backend.redis.hook.session.subscribed.1 = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}

## One-to-many offline message
backend.redis.hook.session.subscribed.2 = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}

Use example

The MQTT offline message is required to meet the following conditions:

  1. Connect with clean_session = false
  2. Subscribe to QoS > 0
  3. Publish QoS > 0

Establish a connection in the EMQ X management console with the following configuration:

Retain message persistence

Configuration item

Open the configuration file and configure the Backend rules:

## hook: message.publish
## action/function: on_client_connected、on_message_retain

backend.redis.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}

backend.redis.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}

Message list

EMQ X persists the message list as the message id to mqtt:retain:{topic} Redis Hash:

## Obtain all message ids in the upstream_topic collection
127.0.0.1:6379> ZRANGE mqtt:retain:upstream_topic 0 -1
1) "2VFsyhDm0cPIQvnY9osj"
127.0.0.1:6379>

Message details

Each message detail will be stored in Redis Hash as the key in mqtt:msg:{message_id} format:

## Obtain the message details with message id of 2VFt0kuNrOktefX6m4nP
127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP
1) "id"
2) "2VFt0kuNrOktefX6m4nP" ## message id
3) "from"
4) "sub_client" ## client id
5) "qos"
6) "2"
7) "topic"
8) "up/upstream_topic"
9) "payload"
10) "{ \"cmd\": \"reboot\" }"
11) "ts"
12) "1542338754" ## pub time stamp
13) "retain"
14) "false"

Summary

After understanding the data structure stored in Redis, various Redis client can be used to read related information.

Welcome to our open source project github.com/emqx/emqx. Please visit the documentation for details.

EMQ is an open-source IoT data infrastructure software provider, delivering the world’s leading open-source MQTT message broker and stream processing database.