How to use MQTT in Flask

Project Initialization

$ python3 --version
Python 3.8.2
pip3 install flask-mqtt

Use Flask-MQTT

  • Broker: broker.emqx.io
  • TCP Port: 1883
  • Websocket Port: 8083

Import Flask-MQTT

from flask import Flask, request, jsonify
from flask_mqtt import Mqtt
app = Flask(__name__)

Configure Flask-MQTT Extension

app.config['MQTT_BROKER_URL'] = 'broker.emqx.io'
app.config['MQTT_BROKER_PORT'] = 1883
app.config['MQTT_USERNAME'] = '' # Set this item when you need to verify username and password
app.config['MQTT_PASSWORD'] = '' # Set this item when you need to verify username and password
app.config['MQTT_KEEPALIVE'] = 5 # Set KeepAlive time in seconds
app.config['MQTT_TLS_ENABLED'] = False # If your server supports TLS, set it True
topic = '/flask/mqtt'
mqtt_client = Mqtt(app)

Write Connect Callback Function

@mqtt_client.on_connect()
def handle_connect(client, userdata, flags, rc):
if rc == 0:
print('Connected successfully')
mqtt_client.subscribe(topic) # subscribe topic
else:
print('Bad connection. Code:', rc)

Write Message Callback Function

@mqtt_client.on_message()
def handle_mqtt_message(client, userdata, message):
data = dict(
topic=message.topic,
payload=message.payload.decode()
)
print('Received message on topic: {topic} with payload: {payload}'.format(**data))

Create Message Publish API

@app.route('/publish', methods=['POST'])
def publish_message():
request_data = request.get_json()
publish_result = mqtt_client.publish(request_data['topic'], request_data['msg'])
return jsonify({'code': publish_result[0]})

Run Flask Application

if __name__ == '__main__':
app.run(host='127.0.0.1', port=5000)

Test

Receive message

  1. Create a connection in MQTT X and connect to the MQTT server.

Publish Message

  1. Subscribe to the /flask/mqtt topic in MQTT X.

Complete Code

from flask import Flask, request, jsonify
from flask_mqtt import Mqtt
app = Flask(__name__)app.config['MQTT_BROKER_URL'] = 'broker.emqx.io'
app.config['MQTT_BROKER_PORT'] = 1883
app.config['MQTT_USERNAME'] = '' # Set this item when you need to verify username and password
app.config['MQTT_PASSWORD'] = '' # Set this item when you need to verify username and password
app.config['MQTT_KEEPALIVE'] = 5 # Set KeepAlive time in seconds
app.config['MQTT_TLS_ENABLED'] = False # If your broker supports TLS, set it True
topic = '/flask/mqtt'
mqtt_client = Mqtt(app)@mqtt_client.on_connect()
def handle_connect(client, userdata, flags, rc):
if rc == 0:
print('Connected successfully')
mqtt_client.subscribe(topic) # subscribe topic
else:
print('Bad connection. Code:', rc)
@mqtt_client.on_message()
def handle_mqtt_message(client, userdata, message):
data = dict(
topic=message.topic,
payload=message.payload.decode()
)
print('Received message on topic: {topic} with payload: {payload}'.format(**data))
@app.route('/publish', methods=['POST'])
def publish_message():
request_data = request.get_json()
publish_result = mqtt_client.publish(request_data['topic'], request_data['msg'])
return jsonify({'code': publish_result[0]})
if __name__ == '__main__':
app.run(host='127.0.0.1', port=5000)

Limitations

Summary

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store