EMQ X Rule Engine Series -Bridge Messages to Pulsar
Introduction to the Pulsar messaging system
Apache Pulsar is an enterprise-level pub-sub messaging system. Pulsar aims to replace Apache Kafka’s dominance for many years. Pulsar provides faster throughput and lower latency than Kafka in many scenarios, and provides developers with a set of compatible APIs.
Pulsar combines high-performance streams and flexible traditional queues into a unified message model and API to synchronize flow processing with queue processing.
For the installation and use of Pulsar, please refer to Pulsar’s website. For more information about Pulsar’s introduction and comparison of bridging solutions, please see:compared with Kafka, what’s good about Pulsar as a Big Data Analysis Rookie
Scenario introduction
In this scenario, it is required to bridge messages under the EMQ X specified topic that meet the criteria to the Pulsar . To facilitate subsequent analysis and retrieval, the message content needs to be split.
In this scenario, the information reported by the device is as follows:
- reported topic: cmd/state/:id,Id in the topic represents the vehicle client identifier
- Message body:
When the reported engine speed value is greater than 8000
, the current information is stored for subsequent analysis of vehicle usage.
Preparation
Create Pulsar topic
Create emqx_rule_engine_output
topic:
./bin/pulsar-admin topics create-partitioned-topic -p 5 emqx_rule_engine_output
Configuration instructions
Create resources
Open EMQ X Dashboard, enter the Resources page on the left menu, click the New button, and enter the Pulsar server information for resource creation.
The network environment of the nodes in the EMQ X cluster may be different from each other. After the resources are successfully created, click the Status button in the list to view the resource connection status of each node. If the resources on the nodes are not available, please check whether the configuration is correct and the network connectivity is OK, and click the Reconnect button to reconnect manually.
Create rules
Enter the Rules page on the left menu and click the New button to create a rule. Here we choose Message publish as the trigger event , and trigger the rule for data processing when the message is published.
After the trigger event is selected, we can see optional fields and sample SQL on the interface:
Filter the required field
The rule engine uses SQL statements to process rule conditions. In this business, we need to select all the fields in the payload
individually, use the payload.fieldName
format to select, and also need the message context information of topic,qos,andid
. the current SQL is as follows:
SELECT
payload.id as client_id, payload.speed as speed,
payload.tachometer as tachometer,
payload.ts as ts, id
FROM
"message.publish"
WHERE
topic =~ 't/#'
Establish filtering conditions
Conditional filtering is done by using the WHERE clause of the SQL statement. In this business we need to define two conditions:
- Only process
cmd/state/:id
topics, use topic wildcard=~
to filtertopic
:topic= ~'cmd/state/+'
- Only process messages with
tachometer>8000
, use the comparator to filtertachometer
:payload.tachometer>8000
Combine the previous step to get the SQL as follows:
SELECT
payload.id as client_id, payload.speed as speed,
payload.tachometer as tachometer,
payload.ts as ts,
id
FROM
"message.publish"
WHERE
topic =~ 'cmd/state/+'
AND payload.tachometer > 8000
Output test is done by using SQL test function
With the SQL test feature, we can view the data output of the current SQL processing in real time, which requires us to specify payloads to simulate the raw data.
The payload data is as follows, pay attention to changing the value of tachometer
to meet the SQL conditions:
{
"id": "NXP-058659730253-963945118132721-22",
"speed": 32.12,
"direction": 198.33212,
"tachometer": 9001,
"dynamical": 8.93,
"location": {
"lng": 116.296011,
"lat": 40.005091
},
"ts": 1563268202
}
Click the SQL Test toggle button, change the topic
and payload
to be the information in the scenario, and click the Test button to see the data output:
The test output data is:
{
"client_id": "NXP-058659730253-963945118132721-22",
"id": "589A429E9572FB44B0000057C0001",
"speed": 32.12,
"tachometer": 9001,
"ts": 1563268202
}
The test output is as expected and we can proceed to the next step.
Add response action, and bridge message to Pulsar
After the SQL condition input and output are correct, we continue to add response actions, configure to write SQL statements, and bridge the filtering results to Pulsar.
Click the Add button in the response action, select the action of Bridge data to Pulsar, select the resource just selected, and fill in the Pulsar topic with the emqx_rule_engine_output
created above
Test
Expected result
We have successfully created a rule that contains a processing action. The expected result of the action is as follows:
- When the device reports a message to the topic
cmd/state/:id
, and the value oftachometer
in the message exceeds 8000, it will hit SQL, and the number of hits in the rule list will increase by 1; - Pulsar’s
emqx_rule_engine_output
topic will add a piece of message with the same value as the current message.
Test with Websocket tools in Dashboard
Switch to the Tools -> Websocket page, use any information client to connect to EMQ X. After the connection is successful, send the following information in the Message card:
- Topic: cmd/state/NXP-058659730253–963945118132721–22
- Message body:
Click the Send button to see that the hit statistics for current rule is 1.
Then, use the Pulsar command to see if the message was successfully produced:
./bin/pulsar-client consume emqx_rule_engine_output -s "sub-name" -n 1000
----- got message -----
{"client_id":"NXP-058659730253-963945118132721-22","id":"58DEEDE7CF3D4F440000019CA0003","speed":32.12,"tachometer":8081,"ts":1563268202}
So far, we have implement business development of using the rules engine to bridge messages to Pulsar.
Welcome to our open source project github.com/emqx/emqx. Please visit the documentation for details.