Bridge MQTT Data from EMQX Cloud to Confluent Cloud on GCP

EMQ Technologies
7 min readSep 9, 2022

--

Where to go after Google IoT Core?

Google’s sudden announcement of shutting down its Cloud IoT Core service leaves customers very tight schedule to start exploring alternative IoT technologies and migrate their IoT applications and endpoints. Given that Google IoT Core is based on MQTT, the best alternative options for seamless migration undoubtedly are MQTT-based IoT messaging platforms or services.

Say you’re a Google IoT Core user and have your own resources on GCP, such as Kafka / Confluent, Cloud SQL, MongoDB, or InfluxDB. EMQX Cloud, a fully-managed MQTT service available on the Google Cloud Platform, and most importantly, supports VPC peering on GCP, will be your perfect option in this case.

In this article, we will share how to bridge MQTT data from EMQX Cloud to Confluent Cloud on GCP. EMQX Cloud can connect to Google Cloud SQL and also supports connecting with Confluent Cloud based on GCP’s instances. From EMQX Cloud to Confluent Cloud to user applications, they can all communicate with each other by establishing a peer-to-peer connection. This communication is under the same network, making it more secure and reliable as it’s a private network connection. This Cloud-to-Cloud native connection allows users to enjoy the benefits of GCP infrastructure while completely avoiding vendor lock-in.

To make it more vivid, we will simulate temperature and humidity data and report these data to EMQX Cloud via the MQTT protocol and then use the EMQX Cloud Data Integrations to bridge the data into Confluent Cloud based on the GCP platform.

EMQX Cloud

EMQX Cloud is a fully managed MQTT service for IoT that enables users to connect IoT devices to any cloud without the burden of maintaining infrastructure. It’s the world’s first fully managed MQTT 5.0 public cloud service, as well as the first and only one that supports all three major public cloud platforms, including GCP, AWS, and Azure.

EMQX Cloud provides a one-stop O&M colocation and a unique isolated environment for MQTT services. In the era of IoT (Internet of Everything), EMQX Cloud can help you quickly build industry applications and easily realize the connection, movement, processing, and analytics of IoT data.

With the infrastructure provided by cloud providers, EMQX Cloud is available in dozens of countries and regions around the world, providing low-cost, secure, and reliable cloud services to build and grow your business-critical IoT applications. For more information, please visit the EMQX Cloud website or see the EMQX Cloud documentation.

Confluent Cloud

Confluent Cloud is a fully managed Apache Kafka service, a Software-as-a-Service solution that allows developers to deploy Kafka-based solutions on all three major clouds: AWS, Azure, and GCP. As a SaaS, it offers the latest stable version of Kafka (using the same set of open source APIs that developers are familiar with), high performance, multiple availability zones, and support for Java, Python, C, C++,.NET, and Go clients.

Create Deployments

Create EMQX Cluster

Create a GCP deployment of EMQX Cloud, other options default.

When the status is Running, the creation of the deployment is complete.

Create Kafka Cluster

Create a dedicated version of Confluent Cloud that can support vpc peer-to-peer connections. If you are creating a Confluent Cloud instance for the first time, you can refer to the help documentation.

Points to note are that EMQX Cloud does not accept CIDR in the range of 10.11.1.0/24 ~ 10.64.255.0/24.

Next, we create Confluent peer-to-peer connection between a server on GCP and Confluent Cloud, and create a topic named emqx via the private network.

# Install Confluent CLI
sudo apt-get update -y && sudo apt-get dist-upgrade -y && sudo apt autoremove -y
curl -sL --http1.1 https://cnfl.io/cli | sh -s -- -b /usr/local/bin
confluent update
# Environment preparation
confluent login --save
confluent environment use env-xxx
confluent kafka cluster use lkc-xxx
# Create an API key and secret for kafka authentication
confluent api-key create --resource lkc-xxx
confluent api-key use <API Key> --resource lkc-xxx
# Create a topic named emqx
confluent kafka topic create emqx
confluent kafka topic list
# Go to the confluent instance and view the emqx topic
confluent kafka topic consume -b emqx

VPC Peering Setup

After the Confluent Cloud cluster has been created, we could add peering by the following steps:

  1. Go to the Networking section of the Cluster settings page and click on the Add Peering button.
  2. Fill in the vpc information, you could get the information from VPC Peering section of the deployment console.

Log in to EMQX Cloud console, go to the deployment details page, click the + VPC Peering Connection button, fill in the information.

Project ID: GCP Project ID of your peering VPC

VPC ID: Network Name of your peering VPC

The above information is available in your Confluent Cloud console.

When the vpc status turns to running, you successfully create the vpc peering connection.

Data Integrations

1.Create Kafka resources

Go to the Data Integrations page. On the data integration page, click Kafka resources.

Fill in the Kafka connection details, and then click test. Please check the Kafka service if the test fails. Click the New button after the test is passed and you will see the Create Resource successfully message.

2. Create a new rule

Put the following SQL statement in the SQL input field. The device reporting message time (up timestamp), client ID, and message body (payload) will be retrieved from the temp hum/emqx subject in the SQL rule, and the device ambient temperature and humidity will be read from the message body.

SELECT 
timestamp as up_timestamp,
clientid as client_id,
payload.temp as temp,
payload.hum as hum
FROM
"temp_hum/emqx"

To see if the rule SQL fulfills our requirements, click SQL test and fill in the test payload, topic, and client information.

3.Add Action to Rule

Click Next to add a Kafka forwarding action to the rule once the SQL test succeeds. To demonstrate how to bridge the data reported by the device to Kafka, we’ll utilize the following Kafka topic and message template.

# kafka topic
emqx
# kafka message template
{"up_timestamp": ${up_timestamp}, "client_id":
${client_id}, "temp": ${temp}, "hum": ${hum}}

Verification

  1. Use MQTTX to simulate temperature and humidity data reporting

You need to replace broker.emqx.io with the created deployment connection address, and add client authentication information to the EMQX Cloud console.

2. View data bridging results

confluent kafka topic consume -b emqx

So far, we have used EMQX Cloud data integration based on the GCP platform to bridge the entire process of data to the Confluent Cloud.

Originally published at https://www.emqx.com.

--

--

EMQ Technologies

EMQ is an open-source IoT data infrastructure software provider, delivering the world’s leading open-source MQTT message broker and stream processing database.