Schema Registry Tutorial

The types of IoT devices are complex, and the encoding formats used by various vendors are different. Therefore, when accessing the IoT platform, a unified data format is required, so that applications of equipment on the platform can be managed.

EMQ X Enterprise Edition 3.4.0 provides Schema Registry functionality and provides encoding and decoding capabilities. Schema Registry manages the Schema used for encoding and decoding, processes encoding or decoding requests and returns results. Schema Registry works with a rules engine to adapt device access and rule design for various scenarios.

Data Format

[Figure 1: Using the Schema Registry to encode and decode device data]

Binary format support

Architecture design

Schema Registry can be decoded or coded. Schema ID needs to be specified when encoding and decoding.

[Figure 2: Schema Registry schematic]

Example of encoding call: parameter is Schema

schema_encode(SchemaID, Data) -> RawData

Example of decoding call:

schema_decode(SchemaID, RawData) -> Data

A common use case is to use the rule engine to call the encoding and decoding interfaces provided by the Schema Registry, and then use the encoded or decoded data as input for subsequent actions.

Coding and Decoding + Rule Engine

EMQ X’s PUB/SUB system routes messages to specified topics. Rule engine can flexibly configure business rules of data, match messages according to rules, and then specify corresponding actions. Data format conversion occurs before the rule matching process. The data is first converted to a Map format that can participate in rule matching, and then matched.

[Figure 3:Messaging, Rule Engine and Schema Registry]

Rule Engine internal data format (Map)

Map is a data structure of the form Key-Value, like #{key => value}. For example, user = #{id => 1, name => "Steve"} defines a user map with id for 1 and name for "Steve".

The SQL statement provides nested extraction and addition of Map fields by the “.” operator. The following is an example of using this SQL statement for this Map operation:

SELECT user.id AS my_id

The result filtered by the SQL statement is #{my_id => 1}.

JSON Coding and Decoding

SELECT json_decode(payload) AS p FROM "message.publish" WHERE p.x = p.y, topic ~= "t/#"

The above SQL statement will match the MQTT message with payload content of a JSON string: {"x" = 1, "y" = 1} and topic of t/a.

json_decode(payload) as p decodes the JSON string into the following Map data structure so that the fields p.x and p.y in the Map can be used in the WHERE clause:

Note: AS clause is necessary. The decoded data is assigned to a Key before it can be subsequently manipulated.

Coding and Decoding Practice

Protobuf data analysis example

The device publishes a binary message encoded by Protobuf that needs to be re-published to the topic associated with the “name” field after it has been matched by the rule engine. The format of the topic is “person/${name}”.

For example, republish the message with the “name” of “Shawn” to the topic of “person/Shawn”.

Create Schema

On the Dashboard interface of EMQ X , create a Protobuf Schema with the following parameters:

  1. Name: protobuf_person
  2. Codec type: protobuf
  3. Schema: The following protobuf schema defines a Person message.

After the Schema is created, emqx assigns a Schema ID and Version. If “protobuf_person” is created for the first time, the Schema ID is “protobuf_person:1.0”.

Create rules

The key point here is schema_decode('protobuf_person:1.0', payload, 'Person'):

  • schema_decode function decodes the contents of the payload field according to the Schema of 'protobuf_person:1.0';
  • as person saves the decoded value to the variable "person";
  • The last parameter Person indicates that the type of message in the payload is of type 'Person' defined in the protobuf schema.

Then, the action is added with the following parameters:

  • Action type: message republish
  • Objective topic: person/${person.name}
  • Message content template: ${person}

This action sends the decoded “person” in JSON format to the topic of person/${person.name}. Where ${person.name} is a variable placeholder that will be replaced at runtime with the value of the "name" field in the message content.

Device-side code

The following code populates a Person message with the Python language and encodes it into binary data, and then sent to the “t/1” topic. See complete code for details.

Check rule execution result

2) Install the python dependencies and execute the device side code:

3) Check that the Websocket receives a message with the topict person/Shawn:

{"email":"liuxy@emqx.io","id":1,"name":"Shawn"}

Avro data analysis example

Rule requirement

For example, republish the message with the “name” of “Shawn” to the topic of “avro_user/Shawn”.

Create Schema

  1. Name: avro_user
  2. Codec type: avro
  3. Schema:

After the Schema is created, emqx assigns a Schema ID and Version. If “avro_user” is created for the first time, the Schema ID is “avro_user:1.0”.

Create rule

The key point here is schema_decode('avro_user:1.0', payload):

  • schema_decode function decodes the contents of the payload field according to the Schema of 'avro_user:1.0';
  • as person saves the decoded value to the variable "avro_user" ;

Then, the action is added with the following parameters:

  • Action type: message republish
  • Objective topic: avro_user/${avro_user.name}
  • Message content template: ${avro_user}

This action sends the decoded “user” in JSON format to the topic of avro_user/${avro_user.name}. Where ${avro_user.name} ` is a variable placeholder that will be replaced at runtime with the value of the "name" field in the message content.

Device-side code

The following code populates a User message with the Python language and encodes it into binary data, and then sent to the “t/1” topic. See complete code for details.

Check rule execution result

2) Install the python dependencies and execute the device side code:

3) Check that the Websocket receives a message with the topict avro_user/Shawn `:

{"favorite_color":"red","favorite_number":666,"name":"Shawn"}

Custom codec example

The device issues an arbitrary message to verify that the self-deployed codec service works.

Create Schema

On the Dashboard interface of EMQ X , create a 3rd-Party Schema with the following parameters:

  1. Name: my_parser
  2. Codec type: 3rd-party
  3. Third-party type: HTTP
  4. URL: http://127.0.0.1:9003/parser
  5. Codec configuration: xor

Other configurations remain the default. Emqx will assign a Schema ID “my_parser”. There is no Version management in custom codecs .

The fifth codec configuration above is optional, which is a string, and the content is related to the business of the codec service.

Create rules

This SQL statement first encode the data, and then decode. The purpose is to verify that the encoding and decoding process is correct:

  • schema_encode function encodes the contents of the payload field according to the Schema of 'my_parser' and saves the result in the variable of encoded_data.
  • schema_decode function decodes the contents of the payload field according to the Schema of 'my_parser' and saves the result in the variable of decoded_data.

Finally, the filtered result by this SQL statement is the two variables encoded_data and decoded_data.

Then, the action is added with the following parameters:

  • Action type: check (debug)

This check will print the results filtered by the SQL statement to the emqx console (erlang shell).

If the service is started with emqx console, the print will be displayed directly in the console; if the service is started with emqx start, the print will be output to the erlang.log.N file in the log directory, where “N” is an integer. For example “erlang.log.1”, “erlang.log.2”.

Codec server code

The following code implements an HTTP codec service in Python language. For simplicity, this service provides two simple ways to code and decode (encryption and decryption). See complete code for details.

  • Bitwise XOR
  • character replacement

Run this service:

Check rule execution result

1) In Dashboard’s Websocket tool, log in to an MQTT Client and post a message to “t/1” with the content “hello”.

2) Check the print in the emqx console (erlang shell):

Select Data is the data filtered by the SQL statement, Envs is the environment variable available inside the rule engine, and Action Init Params is the initialization parameter of the action. These three data are all in the Map format.

The two fields selected_data and encoded_data in Selected Data correspond to the two ASs in the SELECT statement. Since decoded_data is the result of encoding and then decoding, it is restored to the content we sent "hello", indicating that the codec plugin works properly.

Welcome to our open source project github.com/emqx/emqx. Please visit the documentation for details.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
EMQ Technologies

EMQ is an open-source IoT data infrastructure software provider, delivering the world’s leading open-source MQTT message broker and stream processing database.