Bridge MQTT Data from EMQX Cloud to Confluent Cloud on GCP

Where to go after Google IoT Core?

Say you’re a Google IoT Core user and have your own resources on GCP, such as Kafka / Confluent, Cloud SQL, MongoDB, or InfluxDB. EMQX Cloud, a fully-managed MQTT service available on the Google Cloud Platform, and most importantly, supports VPC peering on GCP, will be your perfect option in this case.

In this article, we will share how to bridge MQTT data from EMQX Cloud to Confluent Cloud on GCP. EMQX Cloud can connect to Google Cloud SQL and also supports connecting with Confluent Cloud based on GCP’s instances. From EMQX Cloud to Confluent Cloud to user applications, they can all communicate with each other by establishing a peer-to-peer connection. This communication is under the same network, making it more secure and reliable as it’s a private network connection. This Cloud-to-Cloud native connection allows users to enjoy the benefits of GCP infrastructure while completely avoiding vendor lock-in.

To make it more vivid, we will simulate temperature and humidity data and report these data to EMQX Cloud via the MQTT protocol and then use the EMQX Cloud Data Integrations to bridge the data into Confluent Cloud based on the GCP platform.

EMQX Cloud

EMQX Cloud provides a one-stop O&M colocation and a unique isolated environment for MQTT services. In the era of IoT (Internet of Everything), EMQX Cloud can help you quickly build industry applications and easily realize the connection, movement, processing, and analytics of IoT data.

With the infrastructure provided by cloud providers, EMQX Cloud is available in dozens of countries and regions around the world, providing low-cost, secure, and reliable cloud services to build and grow your business-critical IoT applications. For more information, please visit the EMQX Cloud website or see the EMQX Cloud documentation.

Confluent Cloud

Create Deployments

Create EMQX Cluster

When the status is Running, the creation of the deployment is complete.

Create Kafka Cluster

Points to note are that EMQX Cloud does not accept CIDR in the range of 10.11.1.0/24 ~ 10.64.255.0/24.

Next, we create Confluent peer-to-peer connection between a server on GCP and Confluent Cloud, and create a topic named emqx via the private network.

# Install Confluent CLI
sudo apt-get update -y && sudo apt-get dist-upgrade -y && sudo apt autoremove -y
curl -sL --http1.1 https://cnfl.io/cli | sh -s -- -b /usr/local/bin
confluent update
# Environment preparation
confluent login --save
confluent environment use env-xxx
confluent kafka cluster use lkc-xxx
# Create an API key and secret for kafka authentication
confluent api-key create --resource lkc-xxx
confluent api-key use <API Key> --resource lkc-xxx
# Create a topic named emqx
confluent kafka topic create emqx
confluent kafka topic list
# Go to the confluent instance and view the emqx topic
confluent kafka topic consume -b emqx

VPC Peering Setup

  1. Go to the Networking section of the Cluster settings page and click on the Add Peering button.
  2. Fill in the vpc information, you could get the information from VPC Peering section of the deployment console.

Log in to EMQX Cloud console, go to the deployment details page, click the + VPC Peering Connection button, fill in the information.

Project ID: GCP Project ID of your peering VPC

VPC ID: Network Name of your peering VPC

The above information is available in your Confluent Cloud console.

When the vpc status turns to running, you successfully create the vpc peering connection.

Data Integrations

Go to the Data Integrations page. On the data integration page, click Kafka resources.

Fill in the Kafka connection details, and then click test. Please check the Kafka service if the test fails. Click the New button after the test is passed and you will see the Create Resource successfully message.

2. Create a new rule

Put the following SQL statement in the SQL input field. The device reporting message time (up timestamp), client ID, and message body (payload) will be retrieved from the temp hum/emqx subject in the SQL rule, and the device ambient temperature and humidity will be read from the message body.

SELECT 
timestamp as up_timestamp,
clientid as client_id,
payload.temp as temp,
payload.hum as hum
FROM
"temp_hum/emqx"

To see if the rule SQL fulfills our requirements, click SQL test and fill in the test payload, topic, and client information.

3.Add Action to Rule

Click Next to add a Kafka forwarding action to the rule once the SQL test succeeds. To demonstrate how to bridge the data reported by the device to Kafka, we’ll utilize the following Kafka topic and message template.

# kafka topic
emqx
# kafka message template
{"up_timestamp": ${up_timestamp}, "client_id":
${client_id}, "temp": ${temp}, "hum": ${hum}}

Verification

  1. Use MQTTX to simulate temperature and humidity data reporting

You need to replace broker.emqx.io with the created deployment connection address, and add client authentication information to the EMQX Cloud console.

2. View data bridging results

confluent kafka topic consume -b emqx

So far, we have used EMQX Cloud data integration based on the GCP platform to bridge the entire process of data to the Confluent Cloud.

Originally published at https://www.emqx.com.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
EMQ Technologies

EMQ is an open-source IoT data infrastructure software provider, delivering the world’s leading open-source MQTT message broker and stream processing database.