EMQ X rule engine series (6) store messages to DynamoDB database
Amazon DynamoDB Introduction
Amazon DynamoDB is a fully hosted NoSQL database service that supports key values and document data structures.
Amazon DynamoDB is provided by Amazon as part of the AWS cloud portfolio, delivering fast, predictable performance and seamless scaling.
Amazon DynamoDB Service Address:
https://aws.amazon.com/dynamodb/
Scenario Introduction
In this scenario, messages that meet the criteria under the EMQ X specified topic are required to be stored in the DynamoDB database. In order to facilitate subsequent analysis and retrieval, the message content needs to be split and stored.
In this scenario, the message reported by the device is as follows:
- Reported topic: cmd/state/:id, the topic id represents the vehicle client ID
- Message body:
When the reported data of engine speed value is greater than `8000', the current information is stored for subsequent analysis of the user’s vehicle usage.
Preparation
Define DynamoDB data table
According to the scenario requirements, define the data table use_statistics
structure as follows:
use_statistics.json
Create DynamoDB data table
Create use_statistics
data table with the aws cli command:
Confirm the existence of the data table through the aws cli command after the successful creation:
Configuration instructions
Create resource
Open EMQ X Dashboard, go to the Resources page on the left menu, click the New button, type DynamoDB server information for resource creation.
The network environment of the nodes in the EMQ X cluster may be different. After the resources are created successfully, click the Status button in the list to check the connection status of each node. If the resources on the node are unavailable, check whether the configuration is correct and the network connectivity is correct, and click the Reconnect button to manually reconnect.
Create rules
Go to the left menu of rules page, click new button to create rules. Select trigger event of message publishing to trigger this rule for data processing when the message is published.
After selecting the trigger event, we can see the optional field and sample SQL in the interface:
Screen the required fields
Rule engine uses SQL statements to process rule conditions. In this business, we need to select all the fields in payload
separately, use the payload.fieldName
format to select, and also need the topic context information oftopic
, qos
, id
. The current SQL is as follows:
Establish screening conditions
Conditional screening is done by using the SQL statement WHERE clause, in which we need to define two conditions:
- Only handle
cmd/state/:id
topic, use the topic wildcard=~
to screentopic
: `topic =~ 'cmd/state/+' - Only process
tachometer > 8000
messages, use the comparator to screentachometer
:payload.tachometer > 8000
Combine the previous step to get the SQL as follows:
Output test is done with the SQL test function
With the SQL test function, we can view the current SQL processed data output in real time. This function requires us to specify the simulated raw data such as payload.
The payload data is as follows, note to change the tachometer
value to satisfy the SQL condition:
Click the SQL Test toggle button, change topic
and payload
to be the information in the scenario, and click the Test button to view the data output:
The test output data is:
The test output is as expected and we can proceed to the next step.
Add a response action and store the message to DynamoDB
After the input and output of SQL condition is correct, we continue to add the corresponding action, configure to write SQL statement, and store the screening result in DynamoDB.
Click the Add button in the response action, select the Save data to DynamoDB action, select the resource just selected, fill the DynamoDB table name, Hash Key, and Range Key.
Test
Expected result
We successfully created a rule that contains a processing action, and expected result of the action is as follows:
- When the device reports a message to the
cmd/state/:id
topic, it will hit SQL when the value oftachometer
in the message exceeds 8000, and the number of hit in the rule list is increased by 1; - A piece of data will be added to the ‘use_statistics’ table in DynamoDB with the same value as the current message.
Test with the Websocket tool in Dashboard
Switch to tools -> Websocket page, connect to EMQ X with any client, and send the following message to message card after successful connection:
- Topic: cmd/state/NXP-058659730253–963945118132721–22
- Message body:
Click the Send button. After sending successfully, the statistic of hit under current rule is 1.
View the data table records with the aws cli command to get the following data:
So far, we have implemented the business development of using the rules engine to store messages to DynamoDB.
Welcome to our open source project github.com/emqx/emqx. Please visit the documentation for details.