小议 MQTT 物联网传输协议

物联网IoTInternet of Things)通过各种网络以及传感器技术,按照约定的协议将无处不在设备联结起来,以进行信息的传输与控制交互,并实现智能化的信息感知与管理,进而构建出万物相联的智能化环境,将网络连接能力渗透进现实世界的方方面面。而伴随近年 5G 无线网的迅速商用,海量物联网设备的接入给传统 Web 通信协议带来了挑战,必须有针对性的采取一系列全新特性的通信协议,从而解决网络环境不可靠终端设备系统资源有限等关键问题。

MQTT(消息队列遥测传输,Message Queuing Telemetry Transport)协议最早是由 Andy Stanford ClarkArlen Nipper 于 1999 年创建,起初主要是解决卫星与原油管道监测数据的传输问题,拥有最低的电池损耗与最小的带宽占用。后来在 OASIS 标准化组织的推动下于 2014 年 10 月公布了 V3.1.1 版本规范,并于 2019 年 3 月发布了最新的 V5.0 版本规范,目前 MQTT 已经成为物联网通信系统当中较为常用的一种传输协议。

发布/订阅模式

发布/订阅模式Publish/Subscribe)提供了传统客户/服务端模式Client/Server的一种替代方案,在客户/服务端模型当中,客户端直接与服务端进行通信;而发布/订阅则是将发送消息的发布者(Publisher)与接收消息的订阅者(Subscribers)分离。两者并不直接进行通信,甚至彼此之前都不知道对方的存在。发布者订阅者之间的连接是由代理者(Broker)进行处理,代理者的工作是过滤所有发布者传入的消息,并且将这些消息正确的分发给订阅者

发布/订阅模式的重要特性就是将消息的发布者订阅者解耦,这种解耦主要体现在如下三个方面:

  1. 空间解耦发布者订阅者不需要相互了解(例如无需交换 IP 地址与服务端口);
  2. 时间解耦发布者订阅者的操作不需要同时运行;
  3. 同步解耦:在发布或者订阅消息的过程当中,双方的操作都不会被中断;

发布/订阅模式优于传统客户/服务端模式的另一方面在于,代理者上的操作可以在缓存之后,以事件驱动的方式并行处理。显然代理者(Broker)在发布/订阅过程当中起到了举足轻重的作用,通过如下几种消息过滤方式,可以使得每个订阅者都只接收自己订阅的消息:

  1. 基于话题(Subject)进行过滤:代理者基于每条消息的话题或者主题进行过滤;
  2. 基于内容(Content)进行过滤:代理者根据特定的内容过滤消息;
  3. 基于类型(Type)进行过滤:代理者通过事件的类型进行过滤;

注意发布者订阅者的解耦是发布/订阅模式的关键所在。

MQTT 几乎体现了上面所提到发布/订阅模式的所有方面:

  • MQTT 在空间上分离了发布者订阅者,两者只需要知道代理者的 IP 地址与端口即可;
  • MQTT 可以在时间上对发布者订阅者进行解耦,即代理者可以为不在线的订阅者缓存发布者的消息;
  • MQTT 的发布者订阅者可以异步进行工作,在订阅或者发布消息时,任务不会被阻塞;

注意:除此之外,MQTT 还采用了基于主题(Topic)的消息过滤机制,并且提供有 3 种服务质量(QoS)等级。

MQTT 和传统消息队列(Message Queues)之间的区别主要体现在如下 3 个方面:

  1. 消息队列会存储消息,直至消息被消费:消息队列会将接收到的每一条消息都存储在队列当中,直至被客户端(消费者)接收;如果没有消费者接收到消息,那么消息将会一直保持在队列当中等待被消费;
  2. 1 条消息只被 1 个客户端消费:传统的消息队列当中,一条消息只能被一个消费者处理;而在 MQTT 当中,订阅了主题的每一位订阅者都会接收到消息;
  3. 队列创建时必须显式进行命名:使用消息队列之前,必须显式创建并且命名队列;而 MQTT 的主题则更加灵活,可以随时创建随时订阅;

发布者/代理者/订阅者

MQTT 采用的发布/订阅模式将发送消息的发布者(Publisher)与接收消息的订阅者(Subscribers)分离,然后代理者(Broker)通过消息的主题(Topic)来确定消息应当发送给哪个订阅者。总体来看,MQTT 协议当中的角色可以划分为如下 2 种类型:

  1. MQTT 客户端:通过 MQTT 协议连接至代理者服务的任意设备;发布者订阅者都属于 MQTT 的客户端,MQTT 的客户端可以由 C/C++、Go、Java、JavaScript 等多种编程语言实现,具体请参考MQTT 软件推荐
  2. MQTT 代理者:负责接收发布者的消息,并且根据主题路由这些消息至相应的订阅者,是所有发布/订阅模式协议的核心;

MQTT 连接

MQTT 协议构建在 TCP/IP 协议之上,所有客户端(包括发布者订阅者)以及代理者都需要基于 TCP/IP 协议栈进行工作。

MQTT 连接始终位于客户端代理者之间,客户端之间不会相互直接进行连接。当客户端发起连接时,客户端会向代理者发送 CONNECT 消息,而代理者则使用 CONNACK 消息进行响应。连接建立之后,代理者将会保持连接的打开状态,直至客户端发送断开连接命令或者是当前连接被中断。

客户端 CONNECT 发起连接

客户端代理者发送 CONNECT 消息请求连接,其中主要包含有如下内容:

  • 客户端标识符 clientId:用于标识连接到代理者的每个客户端,该标识符的取值对于代理者客户端而言必须唯一;
  • 清理会话 cleanSession:用于告知代理者,当前客户端是否需要建立持久会话;当 CleanSession = false 时,代理者将存储客户端的所有订阅,以及客户端以服务质量(QoS)级别 1 或者 2 所订阅的全部遗漏消息;而当 CleanSession = true 时,代理者不但不会为客户端保存任何消息,还会清除掉来自于之前持久会话的所有消息;
  • 用户名/密码 username/password:用于对客户端进行认证与授权,默认为明文传输,实际应用当中应当进行加密处理;
  • 临终遗嘱 lastWill*:属于 MQTT 临终遗嘱(LWT,Last Will and Testament)特性的一部分,当客户端非正常断开连接时,用于发送遗嘱消息通知其它的客户端
  • 保持连接 keepAlive:用于指定客户端在连接建立时,与代理者通信的时间间隔(以为单位),即代理者客户端在不发送消息的情况下,可以保持连接的最长时间;

代理者 CONNACK 进行响应

代理者接收到 CONNECT 消息之后,就会发送 CONNACK 消息进行响应,该消息包含有会话出现标识 sessionPresent连接返回码 returnCode 两项内容:

  • 会话出现标识 sessionPresent:用于告知客户端,当前的代理者是否已经拥有了一个会话;当客户端连接消息的 cleanSession = true 时,由于当前没有可用的会话,所以该项总是为 false;而当客户端连接消息的 cleanSession = false 时,此时就会存在两种可能性;如果会话消息对于 clientId 可用,并且代理者服务已经缓存了这些会话信息,那么 sessionPresenttrue;反之,如果代理者没有任何这个 clientId 的会话信息,那么该项就为 false
  • 连接返回码 returnCode:用于通知客户端当前连接是否成功,具体取值请参考下面表格:
返回代码 返回码响应
0 已接受连接;
1 连接被拒绝,不可接受的协议版本;
2 连接被拒绝,标识符被拒绝;
3 连接被拒绝,服务器不可用;
4 连接被拒绝,用户名或密码错误;
5 连接被拒绝,未获得授权;

发布/订阅/退订

发布(PUBLISH)

客户端连接到一个代理者之后,就可以立刻发布消息。代理者通过主题(Topic)对消息进行过滤和路由,所以每条消息都必须包含有一个主题。除此之外,每条消息还必须拥有一个负载(Payload),里面包含了需要以字节格式进行传输的数据,通常由客户端当中的发布者来决定负载的类型(二进制、文本、XML、JSON)。通常情况下,每条 PUBLISH 消息都会包含有如下属性:

  • 主题名称 topicName:是一段以斜杠/作为分隔符,具有层次结构的简单字符串,例如 home/bedroom/moisture
  • 服务质量 qos:服务质量(QoS,Quality of Service)等级有 012 三个等级,用于确定消息到达接收者(发布者订阅者代理者)所需的保障类型;
  • 保留标记 retainFlag:用于定义代理者是否缓存指定主题的最后一个正确值,当一个新的客户端订阅该主题时,就会接收到保留在该主题上的最后一条消息;
  • 负载 payload:指定消息所要传输的实际内容,可以是图像文本二进制等数据类型;
  • 数据包标识符 packetId:在客户端代理者进行消息传输时,为每个数据包添加的唯一标识,并且只与大于 0 的 QoS 级别有关,通常由客户端或者代理者自动进行设置;
  • DUP 标志 dupFlag:用于标识这条消息是重复的,由于预期接收者(客户端代理者)没有确认原始消息而被重发,该场景仅与 QoS 大于 0 场景有关;

注意:当发布者代理者发送 PUBLISH 消息时,代理者会根据 QoS 级别处理消息,并通过订阅者订阅的主题转发这些消息。发布者只关心将 PUBLISH 消息发送给代理者,而并不会获得其它任何反馈。

订阅(SUBSCRIBE)

如果客户端想要订阅指定的主题,那么就需要向代理者发送一个 SUBSCRIBE 消息,该消息非常简单,只包含一个唯一的数据包标识符 packetId,以及一个主题/服务质量列表

  • 数据包标识符 packetId订阅者代理者订阅消息时,会自动为每个数据包添加唯一的标识;
  • 主题/服务质量列表:一条 SUBSCRIBE 消息可以包含多个订阅,每个订阅由一个 Topic 主题和一个 QoS 级别组成;SUBSCRIBE 消息中的主题可以包含通配符,以便订阅指定主题模式(而非特定的主题);如果当前客户端具有重复的订阅,那么代理者将会响应该主题所具有的最高 QoS 级别消息;

订阅确认(SUBACK)

为了确认每条订阅,代理者会向订阅者发送一条 SUBACK 确认消息,该消息包含有原始订阅消息的数据包标识符 packetId,以及一个返回码 returnCode 列表。

  • 数据包标识符 packetId:数据包的唯一标识符,与 SUBSCRIBE 订阅消息当中的 packetId 保持一致;
  • 返回码 returnCode代理者会为在 SUBSCRIBE 消息当中接收到的每个 topic/qos 响应一个返回码,例如订阅者发送的 SUBSCRIBE 消息拥有 5 个订阅,那么代理者响应的 SUBACK 退订消息就会包含 5 个返回码,这些返回码用于确认每个主题是否都已经被正确订阅,并且展示代理者所授予的 QoS 级别;如果代理者由于权限或者主题不正确等原因拒绝了本次订阅,则 SUBACK 消息将会包含该主题的失败返回码;
返回代码 返回码响应
0 成功,最大 QoS 为 0
1 成功,最大 QoS 为 1
2 成功,最大 QoS 为 2
128 失败

客户端里的订阅者成功发送 SUBSCRIBE 消息,并且接收到 SUBACK 消息响应之后,就会获得与 SUBSCRIBE 消息包含主题相匹配的发布者已经发布的消息。

退订(UNSUBSCRIBE)

订阅者发送的 SUBSCRIBE 消息相对应的是 UNSUBSCRIBE 消息,代理者接收到该消息之后,就会删除订阅者的指定订阅。类似于 SUBSCRIBE 订阅消息,UNSUBSCRIBE 退订消息同样包含有一个数据包标识符 packetId 和一个主题列表

  • 数据包标识符 packetId:当消息在订阅者代理者之间流动时,用于唯一的标识消息,由订阅者或者代理者自动进行设置;
  • 主题/服务质量列表:包含多条订阅者想要取消订阅的主题,只需要发送主题 Topic,而不需要发送 QoS代理者接收到之后会取消这些主题的订阅,而不关心其最初订阅时的 QoS 级别;

退订确认(UNSUBACK)

代理者接收到订阅者UNSUBSCRIBE 退订消息,并且代理者将指定的主题退订操作完成之后,就会向订阅者发送一条用于确认退订消息UNSUBACK 消息,该消息仅包含有原始 UNSUBSCRIBE 的数据包标识符。

  • 数据包标识符 packetId:消息的唯一标识符,同样与 UNSUBSCRIBE 消息当中提供的 packetId 相同;

当订阅者从代理者接收到 UNSUBACK 消息之后,客户端就可以认为 UNSUBSCRIBE 消息当中的订阅主题已经被代理者删除。

主题 Topic

MQTT 协议的主题(Topic)是指代理者用于为客户端过滤与路由消息的 UTF-8 编码字符串,一个主题是由一个或者多个主题级别(Topic Level)构成,每个主题级别由正斜线 / 进行分隔。每个主题必须至少包含 1 个字符(一个单独的斜杆 / 也是一个有效的主题),并且主题字符串允许存在空格。除此之外,主题区分大小写home/temperatureHome/Temperature 是 2 个不同的主题

相比于传统的消息队列(Message Queue),MQTT 的主题非常轻量级,客户端发布或者订阅主题之前,并不一定要提前创建这个主题代理者可以在不进行任何初始化的条件下,接受任意有效的主题。

通配符

当客户端的订阅者订阅某个主题时,既可以订阅已经发布的确切主题,也可以采用通配符同时订阅多个相关的主题模式。这里的通配符只能用于订阅主题,而不能用于发布消息。MQTT 拥有单级多级 2 种不同类型的通配符。

单级通配符 +:只能用于匹配一个主题级别

如果主题包含有除通配符之外的任意字符串,那么任意主题都可以与具备单级通配符的主题进行匹配,例如订阅 myhome/groundfloor/+/temperature 可以匹配如下一系列的主题:

多级通配符 #:可以匹配多个主题级别,必须作为主题的最后一个字符放置,并且以斜杠 / 开头。当订阅者订阅了携带有多级通配符主题时,无论该主题有多冗长,该订阅者都会接收到/#模式之前主题的全部消息:

系统保留主题 $:以$符号作为前缀的主题由 MQTT 系统内部保留,通常用于代理者内部统计信息,各种代理者(Broker)对其的实现与定义都会有所不同,下面是一些系统保留主题的示例:

1
2
3
4
5
$SYS/broker/clients/disconnected
$SYS/broker/clients/connected
$SYS/broker/messages/sent
$SYS/broker/clients/total
$SYS/broker/uptime

最佳实践

  • 切勿在主题开头使用正斜杠 /,避免引入一个零字符作为不必要的主题级别;
  • 永远不要在主题中使用空格(包括 UTF-8 当中不同类型的空白),从而避免为阅读和调试带来不必要的困扰;
  • 尽量保持主题简短,对于资源有限的物联网设备,每个字节占用的存储空间都非常重要;
  • 仅使用 ASCII 字符,避免使用一些不可打印的字符;
  • 将唯一标识符或者客户端 ID 嵌入主题,从而方便的识别消息的发送者;
  • 不要轻易直接订阅 #,即不要直接使用多级通配符订阅代理者上发布的全部消息,避免给订阅者带来过大的数据吞吐量;
  • 优化主题层次结构,保持主题命名的长期扩展性;

服务质量 QoS

服务质量QoS, Quality of Service)级别是消息发送方与接收方之间的一种约定,主要是通过控制消息的传输次数来确保其能够以指定的方式传递交付数据。作为 MQTT 协议的一个关键特性,QoS 可以使客户端能够选择与其网络可靠性应用程序逻辑相匹配的服务级别,MQTT 主要包含如下三个 QoS 级别:

  • 0最多传送一次
  • 1至少传送一次
  • 2只有一次

发布者将消息发送到代理者时定义消息的 QoS 级别,而代理者使用每个订阅者在订阅过程中定义的 QoS 级别,然后将消息传输给订阅者。如果订阅者定义的 QoS 级别低于发布者定义的 QoS 级别,则代理者将会以相对较低的那个 QoS 级别传输消息。接下来,讨论一下 MQTT 协议当中的每个 QoS 级别的工作原理。

QoS 0 最多一次

最小的 QoS 级别为 0,这种服务质量可以确保最佳的交付效果。但是并不保证交付结果,即接收方不会确认收到消息,发送方也不会存储和重新传输消息。

QoS 1 至少一次

QoS 级别 1 可以保证消息至少传递一次给接收者,发送方会存储消息,直至从接收方处获得确认接收到消息的 PUBACK 数据包为止,在这里消息可以被发送多次。

发送方采用数据包标识符 packetIdPUBLISH 数据包与相应的发布确认 PUBACK 数据包进行匹配,如果发送方在一定时间之内未收到 PUBACK 数据包,则发送方将会重新发送 PUBLISH 数据包;当接收端收到该 QoS 级别为 1 的消息以后,就可以立刻开始进行处理;例如现在接收方是一个代理者,则代理者会把消息发送给所有订阅者,然后回复一个 PUBACK 数据包:

接下来,如果发布者再次发布消息,就会被设置一个重复标记(DUP),该标记在 QoS 级别 1 里仅用于内部目的,无需代理者或者客户端来进行处理。

QoS 2 仅有一次

QoS 为 2 是 MQTT 最高级别的 QoS 级别,同时也是最为安全缓慢的服务质量级别,该级别能够保证每一条消息只会被收件人接收一次,这种保证由发送方接收方之间的至少两个请求/响应流(包含 4 次握手)来提供。发送方接收方都会使用原始 PUBLISH 消息的数据包标识符来协调消息的传递流程。

接收方发送方获得一个 QoS 级别为 2PUBLISH 数据包时,接收方在处理该消息的同时,会通过一个用于确认 PUBLISHPUBREC 数据包回复发送方。如果发送方未能从接收方那里接收到 PUBREC 数据包,那么发送方就会再次发送带有重复标志(DUP,Duplicate)的 PUBLISH 包,直至接收到确认的 PUBREC 数据包。

一旦发送方接收方那里接收到一个 PUBREC 数据包,发送方就可以安全的丢弃最初的 PUBLISH 数据包,并且存储来自于接收方PUBREC 数据包,同时使用 PUBREL 数据包进行响应。

接收方获得 PUBREL 数据包之后,就可以丢弃之前存储的全部状态,并使用一个 PUBCOMP 数据包进行响应,而发送方接收到 PUBCOMP 之后同样会进行类似的处理。在接收方完成处理并将 PUBCOMP 数据包响应给发送方之前,接收方将会缓存初始 PUBLISH数据包标识符 packetId 引用,该步骤对于避免第 2 次处理消息极为重要。发送方接收到 PUBCOMP 数据包之后,已发布消息的数据包标识符 packetId 就可以得到重用。

这样在整个 QoS 级别 2 的流程全部执行完成之后,收发双方就可以确定消息已经被正确的传递。如果数据包发生了丢失,那么发送方将有责任在恰当的时间之内重新传输消息,而接收方同样有责任去逐一响应每一条消息。

QoS 的降级

正如本节内容开始时所提到的,发送方接收方 QoS 的定义与级别是完全不同的,比如发布者代理者发送 PUBLISH 消息时定义了 QoS,而当代理者将消息传递给订阅者时,则代理者会使用订阅者在订阅时所定义的 QoS 级别。

例如客户端 A 是消息的发送者,而客户端 B 是消息的接收者,如果客户端 B 以 QoS 级别 1 向代理者发送订阅,而客户端 A 则向代理者发布 QoS 级别 2 消息。最后代理者会以 QoS 级别 1 将消息传输给客户端 B,这种情况下消息会多次传递给客户端 B,因为 QoS 级别 1 会确保消息至少传递一次,但是并不会阻止同一条消息被多次传递。

packetId 的唯一性

MQTT 用于 QoS 级别 12packetId 在数据传输时,其值在客户端代理者之间是唯一的(但是在所有客户端之间可能并非唯一)。每次交互流程结束之后,数据包标识符 packetId 就可以获得重用,而这正是 packetId 长度不需要超过 65535 的原因所在,因为客户端在没有完成交互的情况下,发送超过这个数量的消息是不现实的。

QoS 1/2 的消息排队

当客户端具有持久会话能力时,如果客户端意外掉线,则采用 QoS 级别 1 或者 2 发送的所有消息会排队等待该离线客户端重新上线。

最佳实践

当选择使用 QoS 级别 0 的时候:

  • 确保发送方接收方之间拥有一个基本稳定的连接;
  • 如果数据重要性不高,或者可以在很短时间内重新进行发送,那么就可以容忍小部分数据的丢失;
  • 可以无需使用消息队列,因为只有当断开连接的客户端具备 QoS 级别 1 或者 2,并且具有持久会话能力时,消息才会开始排队;

当选择使用 QoS 级别 1 的时候:

  • 如果需要获得每条消息,并且能够处理重复的消息,那么可以选择 QoS 级别 1;因为它可以确保消息至少被送达一次,并且被允许多次传递,但是前提是程序必须能够妥善的处理这些重复的消息
  • 如果设备和网络不能承受 QoS 级别 2 带来的性能开销,那么就可以考虑采用 QoS 级别 1

当选择使用 QoS 级别 2 的时候:

  • 如果确实需要准确的接收每一条消息,那么就可以选择 QoS 级别 2,但是需要注意由此带来的额外性能开销;

清理会话 cleanSession

客户端要从 MQTT 代理者接收消息,就必须连接到这个代理者,并且发布消息订阅主题。如果客户端代理者之间的连接突然中断,这些发布的消息订阅的主题就将会丢失,客户端需要在重新连接之后再次发布消息订阅主题。每次连接中断之后都要重新发布与订阅,这对于系统资源受限的客户端而言是一个不必要的负担。避免这个问题,需要客户端在连接到代理者服务时发起持久会话请求。持久会话(Persistent Session)会在客户端代理者需要建立保持连接时,由客户端提供自身的 clientId代理者,以用于鉴别每次会话请求。

代理者行为

在持久会话当中,为了便于客户端在脱机之后进行重新连接,代理者通常会持有如下信息:

  • 客户端会话之前是否存在;
  • 客户端的所有订阅信息;
  • 在服务质量 QoS 级别 12 的工作流程当中,客户端尚未确认的消息;
  • 客户端脱机时错过的所有QoS 级别 12 的消息;
  • 客户端接收到,但是尚未完全确认的 QoS 级别 2 的消息;

客户端使用一个清理会话 cleanSession 标志来通知代理者(Broker)是否使用持久会话:

  • cleanSession 标志设置为 true 时,表示当前客户端不需要持久会话,此时如果客户端断开连接,那么其所有信息都会丢失;
  • cleanSession 标志设置为 false 时,表示当前客户端需要建立一个持久会话,此时即使客户端断开连接,其所有信息也依然会被保留;

注意:MQTT 从 3.1.1 版本开始,代理者发送给客户端CONNACK 消息将会包含 cleanSession 清理会话标识。

客户端行为

客户端请求代理者保存会话数据的时候,客户端自身也将保存如下信息:

  • QoS 级别 12 流程当中,尚未由代理者确认的消息;
  • 代理者接收到,但是尚未完全确认的 QoS 级别 2 消息;

最佳实践

下面总结了持久会话cleanSession = true)与清理会话cleanSession = flase)两种场景下的一些最佳实践:

持久会话

  • 为了确保客户端可以获得来自某个订阅主题的所有消息,需要代理者客户端维持一个消息队列,以便客户端重新联机之后恢复这些消息;
  • 由于客户端的硬件资源有限,所以需要由代理者服务来存储客户端的订阅信息,并且恢复中断的通信;
  • 客户端需要在重新连接后恢复所有 QoS 级别 12 的发布消息;

清理会话

  • 客户端不需要获取离线时错过的消息;
  • 客户端只需要将消息发布到主题,而无需订阅该主题;
  • 不希望代理者存储会话信息,或者重试 QoS 级别 12 消息的传输;

注意:通常代理者服务会存储会话,直至客户端重新连接;如果客户端长时间没有恢复连接,则消息存储的时间长度要由具体的 MQTT 实现来决定。

保留消息 retainFlag

MQTT 当中发布者只能确保消息安全的传递给代理者,而无法保证消息一定会被订阅者接收到。而订阅者同样会面临相似的问题,即无法保证发布者何时(可能是几秒钟、几分钟甚至几个小时)会在其订阅的主题上发布消息。这种情况,正是 MQTT 消息保留机制的用武之地。

保留消息是在发布者发布主题的时候,将 retainFlag 设置为 true 的普通 MQTT 消息,代理者会存储该主题(Topic)下的最后一条保留消息以及相应的 QoS 级别。当订阅者在订阅具有保留消息主题时,会在订阅之后立刻接收到保留消息。换而言之,保留消息可以帮助客户端在订阅一个主题之后,立刻就可以获得状态更新,消除了等待发布者下一次更新的时间空白。

注意代理者只会为每个主题保留 1 条消息。保留消息不一定就是最后一条消息,但必须是最后一条把 retainFlag 设置为 true 的消息。

如果需要删除保留消息,则可以向其对应的主题发送仅有 0 Byte 有效载荷的保留消息,这样就可以达到让代理者删除之前保留消息的目的。

注意:如果 MQTT 没有保留消息机制,那么新的订阅者在发布间隔将会一直处于信息空白,使用保留消息有助于立刻为新连接的订阅者提供最近的有效信息。

临终遗嘱 lastWill*

当某个客户端未能正常断开与代理者之间的连接,那么就可以采用 MQTT 的临终遗嘱(LWT,Last Will and Testament)机制来通知其它客户端,连接至该代理者的每个客户端都将会接收到一条临终遗嘱消息。这条临终遗嘱消息依然是带有 topicretainFlagqospayload 的普通 MQTT 消息。

代理者服务将会存储这些消息,直至检测到客户端非正常断开连接,从而将临终遗嘱消息广播给所有订阅客户端。而如果客户端是通过正常的 DISCONNECT 消息正确断开连接,那么代理者将不会广播终遗嘱消息。

客户端可以在 CONNECT 消息当中指定好临终遗嘱消息,然后再初始化客户端代理者之间的连接。

广播时机

根据 MQTT 的 3.1.1 版本规范,在如下的场景当中,代理者就会广播某个客户端的临终遗嘱消息:

  • 代理者检测到 I/O 错误或者网络故障;
  • 客户端在定义的保持连接周期内通信失败;
  • 客户端在关闭网络连接之前,没有发送 DISCONNECT 数据包;
  • 由于协议发生错误,代理者关闭网络连接;

最佳实践

生产场景当中,临终遗嘱经常与保留消息结合起来使用,以保存特定主题上的客户端状态:

  1. 首先 client1代理者发送一条带有 lastWillMessage 参数的 CONNECT 消息,该消息有效负载 payload = "Offline",同时设置临终遗嘱属性为 lastWillRetain = truelastWillTopic = client1/status
  2. 接着客户端在相同的 client1/status 主题上发布有效载荷 payload = "Online" 以及保留标志 retainFlag = truePUBLISH 消息;
  3. 只要 client1 保持连接,新订阅到 client1/status 主题的客户端就会接收到 "Online" 保留消息;
  4. 如果 client1 不幸意外的断开连接,则代理者将使用有效负载 payload = "Offline" 作为新的保留消息发布临终遗嘱
  5. client1 脱机时,订阅到该主题的客户端将会从代理者那里,接收到作为保留消息临终遗嘱 "Offline",从而使得其它客户端更新至 client1 在这个主题上的最新状态;

保持连接 keepAlive

MQTT 构建在传输控制协议(TCP,Transmission Control Protocol)基础之上,该协议能够确保数据包可靠有序,并经过错误检查之后在网络上传输。但是有时候通信双方之间的传输会不同步(例如其中一方崩溃或发生传输错误),这种不完整的连接状态在 TCP 当中被称为半开连接(Half-open Connection),进而造成通信的一方明明已经传输失败,而另一方仍然在尝试发送消息并且等待这些消息被确认。

MQTT 通过保持连接(Keep Alive)来解决半开连接造成的上述问题,该机制可以确保代理者客户端之间的连接仍然是打开的,并且双方知晓已经建立连接,当客户端代理者建立连接时,双方通信的间隔时间(以秒为单位)定义了代理者客户端不能相互通信的最大时间长度。

换而言之,保持连接(Keep Alive)规定了客户端,从完成一个控制包传输到开始下一个控制包发送之间,所允许的最大时间间隔。客户端有责任确保控制报文的发送间隔不会超过 Keep Alive 的设定值,即在没有发送其它任何控制数据包的情况下,客户端必须发送一个 PINGREQ 作为心跳请求数据包。

也就是说只要消息经常交换,并且没有超过 Keep Alive 的设定值,就无需发送额外的消息来确定当前连接是否仍然打开。如果客户端Keep Alive 期间没有发送消息,那么其必须向代理者发送一个 PINGREQ 数据包,以确认当前代理者仍然可用;而代理者必须断开未发送消息或者 PINGREQ 数据包的客户端连接,断开连接时间是 Keep Alive 时间间隔的 1.5 倍。同样,如果客户端在合理的时间内没有接收到代理者的响应,也将会自动关闭连接。

MQTT 的保持连接机制(Keep Alive)主要使用到了心跳请求 PINGREQ心跳响应 PINGRESP 两个数据包。

PINGREQ

心跳请求 PINGREQ客户端代理者发送,报文当中不包含有效负载,用于向代理者表明当前客户端仍然处于活动状态。如果客户端不发送 PUBLISH 或者 SUBSCRIBE 等任何其它类型的包,则客户端必须向代理者发送 PINGREQ 心跳请求包。

PINGRESP

心跳响应 PINGRESP代理者客户端发送,同样也不包含有效负载,当代理者接收到一个 PINGREQ 心跳请求包时,代理者必须使用一个 PINGRESP 心跳响应包来进行回复,从而向客户端表明当前代理者仍然处于可用状态。

注意事项

  • 如果代理者没有从客户端接收到 PINGREQ 或者其它数据包,则代理者将会自动关闭连接,并且发送临终遗嘱消息;
  • 客户端发起 CONNECT 连接时,要设置适当的 keepAlive 属性,例如客户端可以根据信号强度来调整该属性值;
  • keepAlive 属性可以设置的最大保持时间为 18 小时 12 分 15 秒
  • 如果保持连接的时间间隔为 0,那么保持连接机制就将会失效;

客户端接管

通常情况下,断开连接的客户端会尝试重新进行连接。但是有时代理者仍然会存在有一个半开的客户端连接。在 MQTT 当中,如果代理者检测到半开连接,就会执行客户端接管操作。即代理者根据客户端标识符 clientId 关闭与该客户端的前一个连接,然后与该客户端建立起一个新的连接,这个行为可以确保发生半开连接问题时,断开的客户端能够建立起新的有效连接。

基于 WebSocket 的 MQTT

WebSocket 与 MQTT 一样也是基于 TCP/IP 的一种通信协议,因而 MQTT 在支持 WebSocket 的 Web 浏览器当中,其工作方式与原生 MQTT 基本相同,所需要的只是一个类似 Eclipse Paho JavaScript Client 这样的浏览器客户端。

Paho 一款使用 JavaScript 编写,并基于 Web 浏览器的 MQTT 客户端库,其底层通过 WebSockets 协议连接至 MQTT 代理者(Broker)。使用如下 cdnjs 链接,可以方便的将 Paho 引入到 Web 前端工程当中:

1
2
3
4
5
6
7
8
<script
src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.js"
type="text/javascript"
></script>
<script
src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.min.js"
type="text/javascript"
></script>

下面的代码是一个基于 Paho 的基本示例,它通过 WebSockets 连接到 MQTT 代理者服务,并且订阅主题 World。订阅成功之后就会向 World 主题发布的 Hello 消息,并将任意订阅该主题的消息打印至 Web 浏览器控制台。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/* 建立一个客户端实例 */
client = new Paho.MQTT.Client(
location.hostname,
Number(location.port),
"clientId"
);

/* 设置回调函数 */
client.onConnectionLost = onConnectionLost;
client.onMessageArrived = onMessageArrived;

/* 连接至客户端 */
client.connect({ onSuccess: onConnect });

/*当客户端连接的时候调用 */
function onConnect() {
/* 连接建立之后,进行订阅并且发送消息 */
console.log("onConnect");
client.subscribe("World");
message = new Paho.MQTT.Message("Hello");
message.destinationName = "World";
client.send(message);
}

/* 当客户端丢失连接的时候调用 */
function onConnectionLost(responseObject) {
if (responseObject.errorCode !== 0) {
console.log("onConnectionLost:" + responseObject.errorMessage);
}
}

/* 当消息到达的时候调用 */
function onMessageArrived(message) {
console.log("onMessageArrived:" + message.payloadString);
}

注意:鉴于 JavaScript 版本的的 Paho 已经年久失修,实际开发当中推荐采用 MQTT.js 来代替使用。

MQTT 5 新特性

虽然 MQTT 协议发明于 1999 年,但是其广泛应用则是在数年之后。MQTT 3.1.1 作为 OASIS 与 ISO 标准发布第 5 年之后。2019 年 3 月,MQTT 5 作为新一代的协议标准正式生效。新规范的一个关键目标是增强可扩展性,并且更为适用于大规模系统,MQTT 的 V3.1.1 版本证明了 MQTT 是一种可扩展有状态的物联网协议,而 V5 版本的设计旨在使 MQTT 的代理者服务更加容易的支持海量并发。

MQTT 的 V5 基本保持与 V3.1.1 版本的兼容,只是对临终遗嘱等少部分特性细节进行了调整,并且添加了 TTL共享订阅 等流行新特性,此外新版本协议在线路传输上也略有变化,并且增加了新的控制报文 AUTH

自定义 MQTT 协议头

MQTT 5 最令人兴奋的新特性之一,是可以在协议头部当中添加自定义的 key-value 键值对属性。类似于 HTTP 协议,MQTT 的客户端代理者可以通过添加任意数量的自定义协议头来携带元数据,这些元数据可以应用于特定的业务场景。

注意自定义 MQTT 协议头被大量运用于实现 MQTT 5 当中的各种新特性。

原因码 Reason Codes

原因码(Reason Codes)用于标识一些预定义的协议错误,通常放置在数据包当中,从而允许客户端代理者去解释错误条件。原因码有时也被称为消极确认(Negative Acknowledgements),诸如 CONNACKPUBACKPUBRECPUBRELPUBCOMPSUBACKUNSUBACKAUTHDISCONNECT 等 MQTT 包都可以携带原因码。

注意:原因码的消极确认范围位于 Quota ExceededProtocol Error 之间,由客户端代理者负责解释这些新增的原因码。

CONNACK 不支持特性返回码

伴随着 MQTT 应用的普及,许多企业与开源组织创建并且提供了诸多的 MQTT 实现,但是并非所有这些实现都与 MQTT 规范完全兼容。因此新版本的 MQTT 5 为不完整的 MQTT 实现提供了一种用于表达代理者不支持某个特性的方法,而客户端可以基于此来确保不会使用到这些不受支持的特性。

代理者通过 CONNACK 数据包(客户端发送 CONNECT 数据包之后由代理者发送 CONNACK 数据包)当中的预定义协议头来标识某个不支持的协议特性,这些协议头同样也可以用来通知客户端是否具有使用某个协议特性的权限。MQTT 5 规范使用如下的预定义协议头来标识代理者未实现的特性,或者客户端不允许使用的协议特性:

预定义协议头 数据类型 功能描述
Retain Available Boolean 标识保留消息是否可用;
Maximum QoS Number 允许客户端用于发布消息、订阅主题的最大 QoS 级别;
Wildcard available Boolean 通配符是否可以用于主题订阅;
Subscription identifiers available Boolean 订阅标识符对于 MQTT 客户端是否可用;
Shared Subscriptions available Boolean 共享订阅对于 MQTT 客户端是否可用;
Maximum Message Size Number MQTT 客户端能够使用的最大消息尺寸;
Server Keep Alive Number 代理者为某个客户端所能够支持的保持连接时间间隔;

注意返回码功能的缺点在于 MQTT 客户端需要解释这些状态码,并且确保程序当中不会使用到代理者不支持的特性,或者客户端没有权限使用的特性。

Clean Session 变更为 Clean Start

MQTT 3.1.1 当中一个非常重要的功能是清理会话 cleanSession,如果客户端代理者发送携带有 cleanSession = true 标志的 CONNECT 数据包,那么一旦底层网络连接中断,代理者就会立即丢弃客户端的所有信息。

而在 MQTT 5 当中,客户端可以选择使用携带有清理启动标志 cleanStart = trueCONNECT 消息,在这里代理者会丢弃之前的所有会话数据,使得客户端以新的会话开始连接。当客户端服务器之间的 TCP 连接关闭之后,会话并不会被自动清除,而是会在客户端断开连接之后才会触发删除操作,并且此时必须将 Session Expiry Interval 协议头设置为 0

注意:新的清理启动标志 cleanStart 简化了会话处理过程,相比于 cleanSession 具有更好的灵活性,并且更加易于实现。

额外的 MQTT 数据包

MQTT 5 引入了一个崭新的 AUTH 数据包,主要应用于于实现实现一些特定的身份验证机制。该数据包可以在建立连接之后,让代理者客户端使用较为复杂的权限验证,甚至还可以在不关闭连接的情况下重新对客户端进行验证。

UTF-8 字符串对

自定义协议头的出现,还同时引入了新的数据类型 UTF-8 编码的字符串对,该数据类型本质上是一个包含有 keyvalue键值对,该数据类型目前仅限用于自定义协议头。算上这个新引入的数据类型,MQTT 5 协议总共支持 7 种不同的数据类型:

  • (Bit)
  • 两字节整型(Two Byte Integer)
  • 四字节整型(Four Byte Integer)
  • UTF-8 编码字符串(UTF-8 Encoded String)
  • 可变字节整型(Variable Byte Integer)
  • 二进制型(Binary Data)
  • UTF-8 字符串对(UTF-8 String Pair)

注意:MQTT 大部分应用场景使用的都是二进制型(Binary Data)和UTF-8 编码字符串(UTF-8 Encoded String),当然也可以使用这里提到的 UTF-8 编码字符串(UTF-8 Encoded String),而其它所有数据类型对于用户都会是隐藏的,仅由 MQTT 客户端代理者生成 MQTT 数据包时使用。

双向断开 DISCONNECT 数据包

MQTT 3.1.1 当中,客户端会在关闭底层 TCP 连接之前,发送 DISCONNECT 数据包来通知代理者服务正常断开连接。但是并不能在代理者出现问题,需要关闭 TCP 连接的时候通知客户端

而在 MQTT 5 当中,可以允许代理者在关闭 TCP 连接之前,发送 DISCONNECT 数据包,这样客户端就可以方便的了解到 TCP 连接断开的原因,并且采取相应的应对策略。虽然代理者无需解释连接断开的确切原因,但是至少可以让客户端开发人员,了解到连接断开的大致原因。

注意:由于 DISCONNECT 数据包同样可以携带原因码(Reason Codes),所以可以很容易的标识出连接断开的具体原因(例如没有权限)。

QoS 级别 1 与 2 消息不再重试

MQTT 客户端使用 TCP 双向连接作为底层传输协议,确保数据包总是能够正确的进行传输。如果 TCP 连接发生中断,则 QoS 级别 12 会发起多次 TCP 连接保证消息被正确的传递。

MQTT 3.1.1 允许在 TCP 连接正常时重新传递消息,实践证明这是一个糟糕的主意,因为超出负载能力的客户端可能会因此变得更加超载。例如客户端代理者接收到一条消息,并且需要耗费 11 秒来进行处理,并且在处理之后确认数据包。假设此时代理者会在超时 10 秒之后重新传输消息,这种方法就会浪费宝贵的带宽,并且加重客户端的负担。

注意MQTT 5 规范不允许代理者客户端在正常的 TCP 连接上重新传递消息,如果遇到 TCP 连接关闭的情况,则代理者客户端必须重新发送未经确认的数据包,因此采用 QoS 级别 12 就显得非常必要。

无用户名使用密码

MQTT 3.1.1 要求客户端使用 CONNECT 数据包中的 password 字段时,需要同时使用 username 字段,这对于某些没有用户名的场景极为不便(例如 OAuth 使用一个 JSON 令牌作为身份验证与鉴权信息)。

MQTT 5 虽然提供了更为优雅的方式(通过 AUTH 数据包)来携带令牌信息,但是仍然可以利用 CONNECT 数据包里的 password 字段,并且可以只使用 password 字段,而无需再行填写 username 字段。

三端综合运用实例

本示例通过 Web 页面来控制 Arduino 开发板上连接的一枚 LED 发光二极管,需要使用基于 Nodejs 上的 Aedes 来发布同时支持 TCP 和 Websocket 的代理者(Broker)服务,然后通过 PubSubClient 在 Arduino 开发板上发布 inEsp32TopicoutEsp32Topic 主题,最后使用 MQTT.js 在 Web 网页端订阅这些主题,从而实现网页开发板服务器三端的 MQTT 状态联动。

Aedes

常用的 MQTT 代理者(Broker)服务有 EMQ XHiveMQMosca 等等,其中 Mosca 基于 NodeJS 实现,不过目前 Github 上该项目已经停止维护,原作者创建了一款支持数据持久化与集群,并且性能更为优越的 Aedes,其完整的实现了 MQTT V3.1.1 协议并且支持 Websocket,通过下面的命令可以安装 Aedes 以及相关依赖到 Web 服务端工程:

1
npm install aedes aedes-server-factory --save-dev

下面示例代码通过 aedes 以及 aedes-server-factory 建立了同时支持 TCPWebSocket 协议的 MQTT 代理者服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
const Aedes = require("aedes")();
const { createServer } = require("aedes-server-factory");
const WebsocketPort = 8080;
const TcpPort = 1883;

/* 使用 Websocket 协议传输的 Broker */
createServer(Aedes, { ws: true }).listen(WebsocketPort, function () {
console.info("Aedes MQTT Websocket server listening on port ", WebsocketPort);
});
/* 使用 TCP 协议传输的 Broker */
createServer(Aedes).listen(TcpPort, function () {
console.info("Aedes MQTT TCP server started and listening on port ", TcpPort);
});

PubSubClient

PubSubClient 是一款运行于 Arduino 上的 MQTT 消息订阅与发布客户端,下面的示例代码基于 ESP32 开发板执行了如下任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#include <WiFi.h>
#include <PubSubClient.h>

/* 配置网络 SSID 以及 Broker 服务 */
const char* PASSWORD = "*****";
const char* SSID = "*************";
const char* MQTT_SERVER = "***************";
const int MQTT_PORT = 1883;
const int LED = 33;

WiFiClient espClient;
PubSubClient client(espClient);
unsigned long lastMsg = 0;
#define MSG_BUFFER_SIZE (50)
char msg[MSG_BUFFER_SIZE];
int value = 0;

/* Wifi 配置函数 */
void setup_wifi() {
delay(10);
Serial.println();
Serial.print("Connecting to ");
Serial.println(SSID);

WiFi.mode(WIFI_STA);
WiFi.begin(SSID, PASSWORD);

while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}

randomSeed(micros());

Serial.println("");
Serial.println("WiFi connected");
Serial.println("IP address: ");
Serial.println(WiFi.localIP());
}

/* 客户端回调函数 */
void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("Message arrived [");
Serial.print(topic);
Serial.print("] ");
for (int i = 0; i < length; i++) {
Serial.print((char)payload[i]);
}
Serial.println();
/* 如果接收到的第一个字符是 1,那么就点亮 LED 发光二极管 */
if ((char)payload[0] == '1') {
digitalWrite(LED, HIGH); // 输出高电平,打开 LED
Serial.println("LED OPEN");
}
else if((char)payload[0] == '0') {
digitalWrite(LED, LOW); // 输出低电平,关闭 LED
Serial.println("LED COLOSE");
}
}

/* 重新连接 */
void reconnect() {
/* 循环直至重新建立连接 */
while (!client.connected()) {
Serial.print("Attempting MQTT connection...");
/* 创建一个随机的客户端 ID */
String clientId = "ESP32Client-";
clientId += String(random(0xffff), HEX);
/* 开始尝试连接 */
if (client.connect(clientId.c_str())) {
Serial.println("connected");
client.publish("outEsp32Topic", "Hello ESP32!"); // 连接建立之后发布一个 outEsp32Topic 主题
client.subscribe("inEsp32Topic"); // 然后重新订阅 inEsp32Topic 主题
} else {
Serial.print("failed, rc=");
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
delay(5000); // 等待 5 秒再试
}
}
}

void setup() {
pinMode(LED, OUTPUT); // 将 LED 引脚初始化为输出模式
Serial.begin(115200); // 设置串口波特率
setup_wifi(); // 调用 Wifi 配置函数
client.setServer(MQTT_SERVER, MQTT_PORT);
client.setCallback(callback);
}

void loop() {
if (!client.connected()) {
reconnect();
}
client.loop();

unsigned long now = millis();
if (now - lastMsg > 2000) {
lastMsg = now;
++value;
snprintf (msg, MSG_BUFFER_SIZE, "%ld", digitalRead(LED));
Serial.print("Publish message: ");
Serial.println(msg);
client.publish("outEsp32Topic", msg);
}
}

MQTT.js

MQTT.js 是一款可以同时运行于 NodeJS 和 Web 浏览器的 MQTT 协议客户端,可以通过下面的命令直接将相关的依赖安装至 Webpack 前端工程:

1
npm install mqtt --save

如下示例代码通过一个开关控件,实时切换页面当中一个圆形图案的背景颜色,并且向主题 inEsp32Topic 发布 LED 的亮灭状态,从而控制与 Arduino 相连接的发光二极管:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
<template>
<main id="test">
<header>MQTT Test</header>
<form>
<button v-bind:style="styleObject" type="button">{{ led }}</button>
<el-switch v-model="led" active-text="打开" active-value="OPEN" inactive-text="关闭" inactive-value="CLOSE" active-color="#ff4949" @change="onChange">
</el-switch>
</form>
</main>
</template>
<script>
import MQTT from "mqtt";

export default {
data() {
return {
led: "OPEN",
host: "ws://localhost:8080",
MqttClient: {},
styleObject: {
backgroundColor: "#FF4949",
},
};
},
methods: {
onChange(status) {
const vm = this;
/* 打开 LED */
if (vm.led == "OPEN") {
this.styleObject.backgroundColor = "#FF4949";
this.styleObject.color = "#FFFFFF";
vm.MqttClient.publish("inEsp32Topic", "1", { qos: 0, retain: false });
} else if (vm.led == "CLOSE") {
/* 关闭 LED */
this.styleObject.backgroundColor = "#D3DCE6";
this.styleObject.color = "#324057";
vm.MqttClient.publish("inEsp32Topic", "0", { qos: 0, retain: false });
}
this.$message({
message: "LED is " + status,
type: "warning",
offset: 50,
duration: 500,
});
},
},
mounted() {
const vm = this;
const clientId = "mqttjs_" + Math.random().toString(16).substr(2, 8);
console.info("connecting MQTT client");

vm.MqttClient = MQTT.connect(vm.host, {
keepalive: 30,
clientId: clientId,
protocolId: "MQTT",
protocolVersion: 4,
clean: true,
reconnectPeriod: 1000,
connectTimeout: 30 * 1000,
will: {
topic: "WillMsg",
payload: "Connection Closed abnormally..!",
qos: 0,
retain: false,
},
rejectUnauthorized: false,
});

vm.MqttClient.on("error", function (err) {
console.info(err);
vm.MqttClient.end();
});

vm.MqttClient.on("connect", function () {
console.info("client connected:" + clientId);
vm.MqttClient.subscribe("outEsp32Topic", { qos: 0 });
vm.MqttClient.publish("inEsp32Topic", "1", { qos: 0, retain: false });
});

vm.MqttClient.on("message", function (topic, message, packet) {
console.info(
"Received Message = " + message.toString() + "\nOn topic:= " + topic
);
if (message.toString() == 1) {
vm.led = "OPEN";
console.log("LED 已经打开");
} else if (message.toString() == 0) {
vm.led = "CLOSE";
console.log("LED 已经关闭");
}
});

vm.MqttClient.on("close", function () {
console.info(clientId + " disconnected");
});
},
};
</script>