EMQ X(简称 EMQ), 是一款完全开源,高度可伸缩,高可用的分布式 MQTT 消息服务器,同时也支持 CoAP/LwM2M 一站式 IoT 协议接入。
EMQX简单入门
最近项目上使用了mqtt协议来传输数据,之前没了解过,故简单学习下,本文作为学习记录以便之后复习使用。
1.什么是MQTT
MQTT是一种基于发布/订阅模式的轻量级消息传输协议,专门针对低带宽和不稳定网络环境的物联网应用而设计,可以用极少的代码为联网设备提供实时可靠的消息服务。MQTT 协议广泛应用于物联网、移动互联网、智能硬件、车联网、智慧城市、远程医疗、电力、石油与能源等领域。按照MQTT协议作者的说法,MQTT协议必须具备这几点。
- 简单容易实现
- 支持 QoS(设备网络环境复杂)
- 轻量且省带宽(因为那时候带宽很贵)
- 数据无关(不关心 Payload 数据格式)
- 有持续地会话感知能力(时刻知道设备是否在线
而MQTT协议凭借着轻量高效、可靠的消息传递、海量连接支持、安全的双向通信等优点已成为物联网行业的首选协议。
2.MQTT工作原理
2.1 基础概念
MQTT客户端:任何运行 MQTT客户端库的应用或设备都是 MQTT 客户端。例如,使用 MQTT 的即时通讯应用是客户端,使用 MQTT 上报数据的各种传感器是客户端,各种 MQTT测试工具也是客户端。
MQTT Broker: MQTT Broker 是负责处理客户端请求的关键组件,包括建立连接、断开连接、订阅和取消订阅等操作,同时还负责消息的转发。(而EMQX最为优秀的mqtt broker是首选的学习对象。本文也是基于emqx来进行学习介绍的。)
发布订阅模式:发布-订阅模式与客户端-服务器模式的不同之处在于,它将发送消息的客户端(发布者)和接收消息的客户端(订阅者)进行了解耦。发布者和订阅者之间无需建立直接连接,而是通过 MQTT Broker 来负责消息的路由和分发。
主题:MQTT 协议根据主题来转发消息。主题通过/来区分层级,MQTT 主题支持以下两种通配符:+和#。1. +:表示单层通配符,例如a/+匹配a/x或a/y。2. #:表示多层通配符,例如a/#匹配a/x、a/b/c/d。【注意:通配符主题只能用于订阅,不能用于发布。】
QoS(Quality of Service):
MQTT 提供了三种服务质量(QoS),在不同网络环境下保证消息的可靠性。
- QoS 0:消息最多传送一次。如果当前客户端不可用,它将丢失这条消息。
- QoS 1:消息至少传送一次。
- QoS 2:消息只传送一次。
2.2 工作流程
- 客户端使用 TCP/IP 协议与 Broker 建立连接,可以选择使用 TLS/SSL 加密来实现安全通信。客户端提供认证信息,并指定会话类型(Clean Session 或 Persistent Session)。
- 客户端既可以向特定主题发布消息,也可以订阅主题以接收消息。当客户端发布消息时,它会将消息发送给 MQTT Broker;而当客户端订阅消息时,它会接收与订阅主题相关的消息。
- MQTT Broker 接收发布的消息,并将这些消息转发给订阅了对应主题的客户端。它根据 QoS 等级确保消息可靠传递,并根据会话类型为断开连接的客户端存储消息。

3.MQTT简单使用
EMQX优势的地方在于提供了Web形式的客户端(MQTTX)以及在线试用的Broker,这对于学习MQTT的朋友提供了极大的便利。
本文使用docker形式部署本地的emqx。部署步骤如下:
- docker pull emqx/emqx:5.1.6
- docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.1.6
- 浏览器输入你的IP:18083,默认账户密码为admin/public,首次登录需改密码。
页面打开的是EMQX 提供了一个内置的管理控制台,即 EMQX Dashboard。方便用户通过 Web 页面就能轻松管理和监控 EMQX 集群,并配置和使用所需的各项功能。这也是EMQX的优势。
在EMQX Dashboard中进入客户端认证页面,添加基于password_based方式认证机制,添加时也可以选择存储认证信息的数据源,使用非常方便。【可选】
下面已Python客户端(paho-mqtt)为例演示数据接收与发送,Broker使用本地部署的,当然也可已使用EMQX提供的试用Broker。直接上代码。
import randomimport timefrom paho.mqtt import client as mqtt_clientbroker = 'IP'port = 8083topic = "/python/mqtt"client_id = f'python-mqtt-{random.randint(0, 1000)}'username = '账户'password = '密码'def connect_mqtt() -> mqtt_client:def on_connect(client, userdata, flags, rc):if rc == 0:print("Connected to MQTT Broker!")else:print("Failed to connect, return code %dn", rc)client = mqtt_client.Client(client_id, transport="websockets") # 使用websockets方式连接,默认端口8083client.username_pw_set(username=username, password=password)client.on_connect = on_connectclient.connect(host=broker, port=port)return clientdef publish(client: mqtt_client):msg_count = 0while True:time.sleep(1)msg = f"messages: {msg_count}"result = client.publish(topic, msg)status = result[0]if status == 0:print(f"Send {msg} to topic {topic}")else:print(f"Failed to send message to topic {topic}")msg_count += 1def run():client = connect_mqtt()client.loop_start()publish(client)
import randomfrom paho.mqtt import client as mqtt_clientbroker = 'IP'port = 8083topic = "/python/mqtt"client_id = f'python-mqtt-{random.randint(0, 1000)}'username = '账户'password = '密码'def connect_mqtt() -> mqtt_client:def on_connect(client, userdata, flags, rc):if rc == 0:print("Connected to MQTT Broker!")else:print("Failed to connect, return code %dn", rc)client = mqtt_client.Client(client_id, transport="websockets")client.username_pw_set(username=username, password=password)client.on_connect = on_connectclient.connect(host=broker, port=port)return clientdef subscribe(client: mqtt_client):def on_message(client, userdata, msg):print(msg)print(f"Received {msg.payload.decode()} from {msg.topic} topic")client.subscribe(topic)client.on_message = on_messagedef run():client = connect_mqtt()subscribe(client)client.loop_forever()


运行脚本后可以在控制台看到实时的看到Broker的使用情况,简直太方便了。

4.MQTT调试
EMQX控制台中概览可以看到当前数据流入大小,可以按时间段统计主题接收消费数据量,但如果要针对某个主题进行数据流入流出调试时可以使用主题监控这个功能,在主题监控下可以针对某个主题查看消息流入、流出、丢失的数值。

在发送消息时,客户端loop_start()后,使用publish返回值进行消息发送成功失败不是很准确。要保证消息成功发送必须在publish返回值后在调用wait_for_publish()f方法,保证数据百分百发送。
本篇文章来源于微信公众号: 运维进阶部落
微信扫描下方的二维码阅读本文


Comments NOTHING