EMQ X plug-in persistence series (3)-EMQ X Redis data persistence

Introduction to EMQ X Data Persistence

Persistent design

One-to-one message storage

One-to-many message storage

Redis Data persistence

Configure EMQ X server

## Redis Server 127.0.0.1:6379, Redis Sentinel: 127.0.0.1:26379
backend.redis.pool1.server = 127.0.0.1:6379
emqx_ctl plugins load emqx_backend_redis

Client online status storage

Configuration item

## Online
backend.redis.hook.client.connected.1 = { "action": { "function": "on_client_connected" }, "pool": "pool1"}

## Offline
backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}

Use example

127.0.0.1:6379> keys *
1) "mqtt:node:emqx@127.0.0.1"
2) "mqtt:client:sub_client"

Connection list

## redis key is mqtt:node:{node_name}
HMSET mqtt:node:emqx@127.0.0.1 sub_client 1542272836
## Online device information under the node
127.0.0.1:6379> HGETALL mqtt:node:emqx@127.0.0.1
1) "sub_client1" # clientid
2) "1542272836" # Time stamp of online time
3) "sub_client"
4) "1542272836"

Connection details

## redis key is mqtt:client:{client_id}
HMSET mqtt:client:sub_client state 1 online_at 1542272854
## Client online status
127.0.0.1:6379> HGETALL mqtt:client:sub_client
1) "state"
2) "0" # 0 offline 1 online
3) "online_at"
4) "1542272854" # online time stamp
5) "offline_at"
6) "undefined" # offline time stamp

Client proxy subscription

Configuration item

## hook: client.connected
## action/function: on_subscribe_lookup
backend.redis.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}

Use example

## redis key 为 mqtt:sub:{client_id}
## HSET key {topic} {qos}
127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/upstream 1
(integer) 0

127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/downlink 1
(integer) 0

Published message persistence

Configuration item

## hook: message.publish
## action/function: on_message_publish

backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

Use example

Message list

## Obtain all message ids in the upstream_topic collection
127.0.0.1:6379> ZRANGE mqtt:msg:upstream_topic 0 -1
1) "2VFsyhDm0cPIQvnY9osj"
2) "2VFszTClyjpVtLDLrn1u"
3) "2VFszozkwkYOcbEy8QN9"
4) "2VFszpEc7DfbEqC97I3g"
5) "2VFszpSzRviADmcOeuXd"
6) "2VFszpm3kvvLkJTcdmGU"
7) "2VFt0kuNrOktefX6m4nP"
127.0.0.1:6379>

Message details

## Obtain the message details with message id of 2VFt0kuNrOktefX6m4nP
127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP
1) "id"
2) "2VFt0kuNrOktefX6m4nP" ## message id
3) "from"
4) "sub_client" ## client id
5) "qos"
6) "2"
7) "topic"
8) "up/upstream_topic"
9) "payload"
10) "{ \"cmd\": \"reboot\" }"
11) "ts"
12) "1542338754" ## pub 时间戳
13) "retain"
14) "false"

Get offline messages

Configuration item

## hook: session.subscribed
## action/function: on_message_fetch_for_queue、on_message_fetch_for_pubsub

## One-to-one offline message
backend.redis.hook.session.subscribed.1 = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}

## One-to-many offline message
backend.redis.hook.session.subscribed.2 = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}

Use example

Retain message persistence

Configuration item

## hook: message.publish
## action/function: on_client_connected、on_message_retain

backend.redis.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}

backend.redis.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}

Message list

## Obtain all message ids in the upstream_topic collection
127.0.0.1:6379> ZRANGE mqtt:retain:upstream_topic 0 -1
1) "2VFsyhDm0cPIQvnY9osj"
127.0.0.1:6379>

Message details

## Obtain the message details with message id of 2VFt0kuNrOktefX6m4nP
127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP
1) "id"
2) "2VFt0kuNrOktefX6m4nP" ## message id
3) "from"
4) "sub_client" ## client id
5) "qos"
6) "2"
7) "topic"
8) "up/upstream_topic"
9) "payload"
10) "{ \"cmd\": \"reboot\" }"
11) "ts"
12) "1542338754" ## pub time stamp
13) "retain"
14) "false"

Summary

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
EMQ Technologies

EMQ is an open-source IoT data infrastructure software provider, delivering the world’s leading open-source MQTT message broker and stream processing database.