跳转至

MQTT协议

MQTT协议

MQTT 服务使用的 emqx,在 https://www.emqx.com/zh/try?product=broker 下载开源版本即可

下载下来之后解压,进到里面的 bin 文件夹下面运行:emqx start服务就启动了,访问浏览器 ip:18083 就可以来到 WEB 端的控制台,默认密码 admin/public

在监听器中可以看到对应的协议以及监听的端口

1654089614768-40fb442d-43e3-4bc3-be9e-062ac7028d91.png

MQTT

默认情况下,连账号密码都不用,mqttx 直接填好地址和端口就行,其他的随意

1654089631130-5a3b9aae-f76b-4fd8-8e82-64fb354e4180.png

去 WEB 端插件页面把 emqx_auth_mnesia 插件启动了,就需要账号密码认证了

1653969028123-06888308-33f0-4982-b715-dd78d826a83e.png

然后找到下载的文件夹里的 etc 目录里的 emqx.conf 搜索 allow_anonymous 改为 false

1653968679951-9fff5023-01ec-43fb-9760-bced4166e98d.png

去 etc/plugins 下面把想要使用的用户名去掉注释或者自己根据他的样子写一个

1653968750380-2cd72d7b-a73d-43ef-8380-b5dee0a906f0.png

然后重启 emqx 的服务 emqx restart

然后就可以使用 MQTTX 去测试一下是否生效了,随便输个密码报错了

1653968851905-e9074489-5743-457b-8028-64c0c51adecf.png

MQTTX 这个软件的输入框上面那个 Retain 是保留消息的意思,勾上这个之后发出去的消息当再次连接上就会收到,重复发送会覆盖,broker 为每个主题仅存储一条保留消息

1653988276486-1fcbb7ac-7e44-4216-aa94-6846dbec48dd.png

除了保留消息还有遗嘱消息,遗嘱消息是针对客户端可能因为未知的原因掉线、连接不正常的,客户端指定一条消息作为遗嘱消息发送给 broker 后,当 broker 检测到客户端连接不正常断开的话会将它的遗嘱消息发送给所在主题的所有客户端,如果是正确的 disconnect 则 broker 会丢弃存储的遗嘱消息

在 MQTTX 里面遗嘱消息在建立连接时设置

1654156035529-f6ce5913-6bd0-4d0b-a610-31a259629b70.png

还有一个 keep live 的报文,过一段时间双方确认一下是不是还在线,MQTTX 也是在连接时设置即可

1654156519249-dac6b2eb-ccd8-4bf9-a171-d346f67e5799.png

接下来抓包分析一下,客户端向服务器(broker)建立连接的请求,除了图中标出来的,还有 keep alive 的时间

1654130762561-73a83aa7-ea9b-4470-868d-ae7ce7d9eeeb.png

这是包含遗嘱消息建立连接的流量包

1654156732464-1e6f920b-086e-458b-b515-f56e7ba69368.png

服务端返回的响应

1654130904444-a20d23a5-6bef-4bd4-bd74-abd2b1e64923.png

主要是看这个 return code

返回码 返回码响应
0 已接受连接
1 连接被拒绝,协议版本不可接受
2 连接被拒绝,标识符被拒绝
3 连接被拒绝,服务器不可用
4 连接被拒绝,用户名或密码错误
5 连接被拒绝,未授权

发布消息

1654132374526-ef36cf4d-0ca8-413b-ac4e-29f451a3213f.png

这里的 Message Identifier 与 HiveMQ 这篇文章中的 packetId 应该是一个东西我觉着?用来唯一的标识一条消息

这里的 QoS 是服务质量,为 0 时最多发送一次,发送方不保证能发到,接收方也不确认收到消息,消息也不会被存储和重新发送

QoS 为 1 时保证一条消息至少一次发送给接收方,发送方会保存消息,直到从接收方收到了确认收到消息的 ACK 包,就像上面那个截图一样

QoS 为 2 时保证每条消息仅由预期接收者接收一次,双方之间有四次握手(这里面夹杂的那条从 broker 发回来的消息是在他们四次握手结束之前就发回来了?)

1654153945802-b9ff1330-742d-46fe-a3d8-2dd85d70e9cc.png

客户端订阅,这里面有订阅的主题和请求订阅的质量,服务端也会给一个 ACK

1654132763918-14850306-d020-4450-8922-9cce07c37644.png

取消订阅也一样

1654148225405-0981554e-4eb9-4f10-a074-50454d4edb41.png

在 MQTT 中,客户端发布或订阅之前不需要事先创建主题,服务端(broker)接受每个有效的主题而无需事先初始化

主题必须至少包含一个字符,区分大小写,一些主题的示例:

myhome/livingroom/temperature
5ff4a2ce-e485-40f4-826c-b1a5d81be9b6/status
BMW/Control/car/238234053/status

如果不想直接订阅某个精准的主题,MQTT 也有通配符,可以使用通配符同时订阅多个主题。通配符只能用于订阅主题,不能用于发布消息

加号(+)就代表单级通配符,这样用:BMW/Control/car/+/status,订阅产生的效果是:

BMW/Control/car/238234053/status        √
BMW/Control/car/842661654/status        √
BMW/Control/car/238233453/temp          ×
BMW/Control/car/842691654/156498/status ×

井号(#)代表多级通配符,这样用:BMW/Control/car/#

BMW/Control/car/238234053/status        √
BMW/Control/car/842661654/status        √
BMW/Control/car/238233453/car           √
BMW/Control/fly/238234053/status        ×

MQTTS

根据 官方文档 中的自签名做就行

首先生成一个自签名的 CA 证书,生成一个长度为 2048 的密钥保存在 ca.key 中

openssl genrsa -out ca.key 2048

生成 emqx 根证书

openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 -out ca.pem

有了根证书就可以用来给其他实体签发实体证书

先给 emqx 生成一个私钥

openssl genrsa -out emqx.key 2048

建立一个 openssl.cnf 文件

[req]
default_bits  = 2048
distinguished_name = req_distinguished_name
req_extensions = req_ext
x509_extensions = v3_req
prompt = no
[req_distinguished_name]
countryName = CN
stateOrProvinceName = Zhejiang
localityName = Hangzhou
organizationName = EMQX
commonName = Server certificate
[req_ext]
subjectAltName = @alt_names
[v3_req]
subjectAltName = @alt_names
[alt_names]
IP.1 = BROKER_ADDRESS   #主要改这里
DNS.1 = BROKER_ADDRESS  #主要改这里

以私钥和配置文件签发一个证书

openssl req -new -key ./emqx.key -config openssl.cnf -out emqx.csr

以根证书签发 emqx 实体证书

openssl x509 -req -in ./emqx.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out emqx.pem -days 3650 -sha256 -extensions v3_req -extfile openssl.cnf

将生成的 emqx.pem、emqx.key、ca.pem 放在 emqx 的 etc/certs 目录下,修改 etc 目录下的 emqx.conf

listener.ssl.external = 8883
...
listener.ssl.external.keyfile = etc/certs/emqx.key
...
listener.ssl.external.certfile = etc/certs/emqx.pem
...
listener.ssl.external.cacertfile = etc/certs/ca.pem

然后打开 MQTTX 配置好就可以连接了

1654083472492-42699252-e08b-4e60-b7e4-314ae6198640.png

这样再连接后就是加密的了

1654083628102-4df5ace0-717a-4942-a475-7dac90443320.png

问题在于,我密钥证书啥的都有,我该怎么解密Orz。知道了,麻了,用了 ECDHE,即使有私钥也解不了

1654085327049-58489a5c-f14c-4a7e-a637-38bda4cfb0eb.png

1654086368564-2bf50a0c-c7a8-4215-9f20-be89ec74810c.png

WS

MQTT-WebSoket 统一使用 /path 作为连接路径,连接时需指明,在 EMQX 上使用的路径为 /mqtt

1654089110308-4ce68ee8-83c3-4fa3-b477-f1b91e136872.png

WSS

需要用域名访问,直接改 hosts 文件就行,(把 MQTTX 的 SSL 安全给关掉)

1654089394789-dfb7c416-744b-4d8f-a9cd-3357516031f0.png

用 Python 连接

使用的库:pip install paho-mqtt

里面用的 broker.emqx.io 是 emqx 提供的公共服务器,免费试用

发布:

import random
import time
from paho.mqtt import client as mqtt_client

broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'

def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client

def publish(client):
    msg_count = 0
    while True:
        time.sleep(1)
        msg = f"messages: {msg_count}"
        result = client.publish(topic, msg)
        # result: [0, 1]
        status = result[0]
        if status == 0:
            print(f"Send `{msg}` to topic `{topic}`")
        else:
            print(f"Failed to send message to topic {topic}")
            msg_count += 1

def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)
if __name__ == '__main__':
    run()

订阅:

import random
from paho.mqtt import client as mqtt_client

broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'

def connect_mqtt() -> mqtt_client:
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)
    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client

def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
    client.subscribe(topic)
    client.on_message = on_message

def run():
    client = connect_mqtt()
    subscribe(client)
    client.loop_forever()

if __name__ == '__main__':
    run()

1653977135315-a4cf7711-7a28-4e00-8cc1-78d19923db6a.png

如果是像上面那样开启了认证的话,可以在 connect 之前加上

client.username_pw_set(username='admin', password='public')

比如我上面那个自己搭建的

1653977439399-2647aa70-86a8-4e5c-9f81-939830377503.png

如果使用 ws 的话,直接加上个 transport 改好端口即可

client = mqtt_client.Client(client_id,transport="websockets")

想要用 tls 的话,需要导入 ssl 库,端口需要改到对应的端口,这里面的 ssl.CERT_NONE 主要是取消一些验证

client.tls_set(ca_certs="./emqx/ca.pem",cert_reqs = ssl.CERT_NONE)

如果使用 wss 的话,就两个缝合一下

参考:

mqtt-essentials-wrap-up (HiveMQ 团队写的 MQTT 要点总结)

原文: https://www.yuque.com/hxfqg9/iot/pqfymw