Verifying Kuiper stream processing with MQTT X

Introduction and installation

Kuiper

# Get a Docker mirroring
$ docker pull emqx/kuiper:1.0.2

# Enabling a Docker container
$ docker run -p 9081:9081 -d --name kuiper emqx/kuiper:1.0.2

Kuiper-manager

# Get a Docker mirroring
$ docker pull emqx/kuiper-manager:1.0.2

# Enabling a Docker container
$ docker run -p 9082:9082 -d emqx/kuiper-manager:1.0.2

EMQ X Edge

# Get a Docker mirroring
$ docker pull emqx/emqx-edge:4.2.4

# Enabling a Docker container
$ docker run -d --name emqx -p 1883:1883 emqx/emqx-edge:4.2.4

MQTT X

Tutorials

Use of Kuiper-manager

Nodes

Streams

  1. Enter a stream name to identify it, here we enter the stream name as demo;
  2. Enter the structure definition, for example, we can define in advance which field types are in the stream data that this stream need to receive. To add, simply enter the name of the field and select the type to add, which includes bigint, float, string, boolean, array, struct etc. Structure definitions are optional and can be cancelled or switched on by checking whether stream with structure above the structure list; when the structure definition is cancelled, data of any structure type will be received. In this article we have specified the data structures to be processed, so we add two separate fields: temperature and humidity, both of type bigint.
  3. Enter Data source, we will use MQTT as the message source in this article, so this configuration allows you to enter the Topic for receiving messages, here we enter /kuiper/stream.
  4. Select Stream Type, which will be chosen here as MQTT.
  5. Select Configuration Group, and the configuration group is the configuration information defined under the stream type, for example, the default MQTT configuration group servers information is ['tcp://127.0.0.1:1883']. You can customize this configuration information by clicking on the Source Configuration button above to go to the page to configure, or you can go to the etc directory to modify the configuration file. Here we select the reconfigured demo_conf configuration group.
CREATE STREAM demo (
temperature bigint,
humidity bigint,
) WITH (DATASOURCE="/kuiper/stream", FORMAT="json", CONF_KEY="demo_conf", TYPE="mqtt");

Rules

  1. Enter the rule ID to mark the rule, here we enter demoRule.
SELECT temperature, humidity FROM demo WHERE temperature > 30
{
"id": "demoRule",
"sql": "SELECT temperature, humidity FROM demo WHERE temperature > 30",
"actions": [
{
"mqtt": {
"server": "tcp://172.17.0.2:1883",
"topic": "/kuiper/rule"
}
}
]
}

The use of MQTT X

/**
* Simulated temperature and humidity reporting
* @return Return a simulated temperature and humidity JSON data - { "temperature": 23, "humidity": 40 }
* @param value, MQTT Payload - {}
*/

function random(min, max) {
return Math.round(Math.random() * (max - min)) + min
}

function handlePayload(value) {
let _value = value
if (typeof value === 'string') {
_value = JSON.parse(value)
}
_value.temperature = random(10, 40)
_value.humidity = random(20, 40)
return JSON.stringify(_value, null, 2)
}

execute(handlePayload)

Verify Results

Summary

--

--

--

EMQ is an open-source IoT data infrastructure software provider, delivering the world’s leading open-source MQTT message broker and stream processing database.

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Speed up Docker for Mac with Mutagen

Static Variables, Methods, Block, and Classes in Java

Google OR Tools — A Guide

A middleware network that empower multiple ecosystems to interoperate and interconnect in web3.

Modelling tabular data with CatBoost and NODE

Initiate Your Development Skills with Crio’s Linux & GIT Micro-experiences

4 ‘Don’t be Stupid’ tips for Aspiring Product Engineers.

Non-paid Software is Not “Free”

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
EMQ Technologies

EMQ Technologies

EMQ is an open-source IoT data infrastructure software provider, delivering the world’s leading open-source MQTT message broker and stream processing database.

More from Medium

Github Personal Access Tokens

Helidon 2 CLI

Fixing some problems starting Minikube with VirtualBox on Mac

Install PostgreSQL on MacOS