EMQ X rule engine series (6) store messages to DynamoDB database

EMQ Technologies
5 min readOct 14, 2019

--

Amazon DynamoDB Introduction

Amazon DynamoDB is a fully hosted NoSQL database service that supports key values and document data structures.

Amazon DynamoDB is provided by Amazon as part of the AWS cloud portfolio, delivering fast, predictable performance and seamless scaling.

Amazon DynamoDB Service Address:

https://aws.amazon.com/dynamodb/

Scenario Introduction

In this scenario, messages that meet the criteria under the EMQ X specified topic are required to be stored in the DynamoDB database. In order to facilitate subsequent analysis and retrieval, the message content needs to be split and stored.

In this scenario, the message reported by the device is as follows:

  • Reported topic: cmd/state/:id, the topic id represents the vehicle client ID
  • Message body:

When the reported data of engine speed value is greater than `8000', the current information is stored for subsequent analysis of the user’s vehicle usage.

Preparation

Define DynamoDB data table

According to the scenario requirements, define the data table use_statistics structure as follows:

use_statistics.json

Create DynamoDB data table

Create use_statistics data table with the aws cli command:

Confirm the existence of the data table through the aws cli command after the successful creation:

Configuration instructions

Create resource

Open EMQ X Dashboard, go to the Resources page on the left menu, click the New button, type DynamoDB server information for resource creation.

The network environment of the nodes in the EMQ X cluster may be different. After the resources are created successfully, click the Status button in the list to check the connection status of each node. If the resources on the node are unavailable, check whether the configuration is correct and the network connectivity is correct, and click the Reconnect button to manually reconnect.

Create rules

Go to the left menu of rules page, click new button to create rules. Select trigger event of message publishing to trigger this rule for data processing when the message is published.

After selecting the trigger event, we can see the optional field and sample SQL in the interface:

Screen the required fields

Rule engine uses SQL statements to process rule conditions. In this business, we need to select all the fields in payload separately, use the payload.fieldName format to select, and also need the topic context information oftopic, qos, id. The current SQL is as follows:

Establish screening conditions

Conditional screening is done by using the SQL statement WHERE clause, in which we need to define two conditions:

  • Only handle cmd/state/:id topic, use the topic wildcard =~ to screen topic: `topic =~ 'cmd/state/+'
  • Only process tachometer > 8000 messages, use the comparator to screen tachometer: payload.tachometer > 8000

Combine the previous step to get the SQL as follows:

Output test is done with the SQL test function

With the SQL test function, we can view the current SQL processed data output in real time. This function requires us to specify the simulated raw data such as payload.

The payload data is as follows, note to change the tachometer value to satisfy the SQL condition:

Click the SQL Test toggle button, change topic and payload to be the information in the scenario, and click the Test button to view the data output:

The test output data is:

The test output is as expected and we can proceed to the next step.

Add a response action and store the message to DynamoDB

After the input and output of SQL condition is correct, we continue to add the corresponding action, configure to write SQL statement, and store the screening result in DynamoDB.

Click the Add button in the response action, select the Save data to DynamoDB action, select the resource just selected, fill the DynamoDB table name, Hash Key, and Range Key.

Test

Expected result

We successfully created a rule that contains a processing action, and expected result of the action is as follows:

  1. When the device reports a message to the cmd/state/:id topic, it will hit SQL when the value of tachometer in the message exceeds 8000, and the number of hit in the rule list is increased by 1;
  2. A piece of data will be added to the ‘use_statistics’ table in DynamoDB with the same value as the current message.

Test with the Websocket tool in Dashboard

Switch to tools -> Websocket page, connect to EMQ X with any client, and send the following message to message card after successful connection:

  • Topic: cmd/state/NXP-058659730253–963945118132721–22
  • Message body:

Click the Send button. After sending successfully, the statistic of hit under current rule is 1.

View the data table records with the aws cli command to get the following data:

So far, we have implemented the business development of using the rules engine to store messages to DynamoDB.

Welcome to our open source project github.com/emqx/emqx. Please visit the documentation for details.

--

--

EMQ Technologies
EMQ Technologies

Written by EMQ Technologies

EMQ is an open-source IoT data infrastructure software provider, delivering the world’s leading open-source MQTT message broker and stream processing database.

No responses yet