MQTT协议
MQTT协议
MQTT 服务使用的 emqx,在 https://www.emqx.com/zh/try?product=broker 下载开源版本即可
下载下来之后解压,进到里面的 bin 文件夹下面运行:emqx start
服务就启动了,访问浏览器 ip:18083 就可以来到 WEB 端的控制台,默认密码 admin/public
在监听器中可以看到对应的协议以及监听的端口
MQTT
默认情况下,连账号密码都不用,mqttx 直接填好地址和端口就行,其他的随意
去 WEB 端插件页面把 emqx_auth_mnesia 插件启动了,就需要账号密码认证了
然后找到下载的文件夹里的 etc 目录里的 emqx.conf 搜索 allow_anonymous 改为 false
去 etc/plugins 下面把想要使用的用户名去掉注释或者自己根据他的样子写一个
然后重启 emqx 的服务 emqx restart
然后就可以使用 MQTTX 去测试一下是否生效了,随便输个密码报错了
MQTTX 这个软件的输入框上面那个 Retain 是保留消息的意思,勾上这个之后发出去的消息当再次连接上就会收到,重复发送会覆盖,broker 为每个主题仅存储一条保留消息
除了保留消息还有遗嘱消息,遗嘱消息是针对客户端可能因为未知的原因掉线、连接不正常的,客户端指定一条消息作为遗嘱消息发送给 broker 后,当 broker 检测到客户端连接不正常断开的话会将它的遗嘱消息发送给所在主题的所有客户端,如果是正确的 disconnect 则 broker 会丢弃存储的遗嘱消息
在 MQTTX 里面遗嘱消息在建立连接时设置
还有一个 keep live 的报文,过一段时间双方确认一下是不是还在线,MQTTX 也是在连接时设置即可
接下来抓包分析一下,客户端向服务器(broker)建立连接的请求,除了图中标出来的,还有 keep alive 的时间
这是包含遗嘱消息建立连接的流量包
服务端返回的响应
主要是看这个 return code
返回码 | 返回码响应 |
---|---|
0 | 已接受连接 |
1 | 连接被拒绝,协议版本不可接受 |
2 | 连接被拒绝,标识符被拒绝 |
3 | 连接被拒绝,服务器不可用 |
4 | 连接被拒绝,用户名或密码错误 |
5 | 连接被拒绝,未授权 |
发布消息
这里的 Message Identifier 与 HiveMQ 这篇文章中的 packetId 应该是一个东西我觉着?用来唯一的标识一条消息
这里的 QoS 是服务质量,为 0 时最多发送一次,发送方不保证能发到,接收方也不确认收到消息,消息也不会被存储和重新发送
QoS 为 1 时保证一条消息至少一次发送给接收方,发送方会保存消息,直到从接收方收到了确认收到消息的 ACK 包,就像上面那个截图一样
QoS 为 2 时保证每条消息仅由预期接收者接收一次,双方之间有四次握手(这里面夹杂的那条从 broker 发回来的消息是在他们四次握手结束之前就发回来了?)
客户端订阅,这里面有订阅的主题和请求订阅的质量,服务端也会给一个 ACK
取消订阅也一样
在 MQTT 中,客户端发布或订阅之前不需要事先创建主题,服务端(broker)接受每个有效的主题而无需事先初始化
主题必须至少包含一个字符,区分大小写,一些主题的示例:
myhome/livingroom/temperature
5ff4a2ce-e485-40f4-826c-b1a5d81be9b6/status
BMW/Control/car/238234053/status
如果不想直接订阅某个精准的主题,MQTT 也有通配符,可以使用通配符同时订阅多个主题。通配符只能用于订阅主题,不能用于发布消息
加号(+)就代表单级通配符,这样用:BMW/Control/car/+/status,订阅产生的效果是:
BMW/Control/car/238234053/status √
BMW/Control/car/842661654/status √
BMW/Control/car/238233453/temp ×
BMW/Control/car/842691654/156498/status ×
井号(#)代表多级通配符,这样用:BMW/Control/car/#
BMW/Control/car/238234053/status √
BMW/Control/car/842661654/status √
BMW/Control/car/238233453/car √
BMW/Control/fly/238234053/status ×
MQTTS
根据 官方文档 中的自签名做就行
首先生成一个自签名的 CA 证书,生成一个长度为 2048 的密钥保存在 ca.key 中
openssl genrsa -out ca.key 2048
生成 emqx 根证书
openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 -out ca.pem
有了根证书就可以用来给其他实体签发实体证书
先给 emqx 生成一个私钥
openssl genrsa -out emqx.key 2048
建立一个 openssl.cnf 文件
[req]
default_bits = 2048
distinguished_name = req_distinguished_name
req_extensions = req_ext
x509_extensions = v3_req
prompt = no
[req_distinguished_name]
countryName = CN
stateOrProvinceName = Zhejiang
localityName = Hangzhou
organizationName = EMQX
commonName = Server certificate
[req_ext]
subjectAltName = @alt_names
[v3_req]
subjectAltName = @alt_names
[alt_names]
IP.1 = BROKER_ADDRESS #主要改这里
DNS.1 = BROKER_ADDRESS #主要改这里
以私钥和配置文件签发一个证书
openssl req -new -key ./emqx.key -config openssl.cnf -out emqx.csr
以根证书签发 emqx 实体证书
openssl x509 -req -in ./emqx.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out emqx.pem -days 3650 -sha256 -extensions v3_req -extfile openssl.cnf
将生成的 emqx.pem、emqx.key、ca.pem 放在 emqx 的 etc/certs 目录下,修改 etc 目录下的 emqx.conf
listener.ssl.external = 8883
...
listener.ssl.external.keyfile = etc/certs/emqx.key
...
listener.ssl.external.certfile = etc/certs/emqx.pem
...
listener.ssl.external.cacertfile = etc/certs/ca.pem
然后打开 MQTTX 配置好就可以连接了
这样再连接后就是加密的了
问题在于,我密钥证书啥的都有,我该怎么解密Orz。知道了,麻了,用了 ECDHE,即使有私钥也解不了
WS
MQTT-WebSoket 统一使用 /path 作为连接路径,连接时需指明,在 EMQX 上使用的路径为 /mqtt
WSS
需要用域名访问,直接改 hosts 文件就行,(把 MQTTX 的 SSL 安全给关掉)
用 Python 连接
使用的库:pip install paho-mqtt
里面用的 broker.emqx.io 是 emqx 提供的公共服务器,免费试用
发布:
import random
import time
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()
订阅:
import random
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'
def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt()
subscribe(client)
client.loop_forever()
if __name__ == '__main__':
run()
如果是像上面那样开启了认证的话,可以在 connect 之前加上
client.username_pw_set(username='admin', password='public')
比如我上面那个自己搭建的
如果使用 ws 的话,直接加上个 transport 改好端口即可
client = mqtt_client.Client(client_id,transport="websockets")
想要用 tls 的话,需要导入 ssl 库,端口需要改到对应的端口,这里面的 ssl.CERT_NONE 主要是取消一些验证
client.tls_set(ca_certs="./emqx/ca.pem",cert_reqs = ssl.CERT_NONE)
如果使用 wss 的话,就两个缝合一下
参考:
mqtt-essentials-wrap-up (HiveMQ 团队写的 MQTT 要点总结)