每一个 MQTT 结点只负载了部分客户端,要实现消息在集群中不同客户端间的传递,必须要实现集群间的消息共享。本文便是介绍其中的实现方式。
概述
要实现集群间消息的传递,一般有三种方式:
- 借助消息中间件,如 RocketMQ, 将
MQTT中的Topic映射到MQ中间件的对等实体上,如Queue, 消息的传递从而转化为MQ中间件消息的订阅; - 借助数据库,如 Mysql, 将消息写入到数据库中,
MQTT broker定期获取这些消息,从而实现消息的传递; - Broker 间互联通信,每一个 Broker 节点都与其它结点建立 Tcp 连接,消息通过 TCP 连接广播出去,也可借助第三方组件来实现该功能,如 JGroups.
这三种方式各有优缺点,第一种方式有较大的吞吐量,但需要引入第三方中间件,占用较多资源。第二种方式,系统的瓶颈在于数据库读写能力,一般而言,系统的吞吐量相对较小。第三种方式相对而言,不用额外占用资源,又可获得较好的性能表现,是一种较为适中的方案,缺点是要实现一个安全可靠的通信系统,有一定的技术难度。
本文的方案是采用第三种方式。
集群方案
该方案的本质是建立一个连接所有结点的通信网络,每一个结点既是服务器也是客户端,如下图所示:

该通信网络有如下特点:
- 每一结点与其它节点都建立一个
Tcp连接,如果有n个结点,则其中一结点要与其它所有的结点建立n-1 TCP条连接; - 相同结点之间共享一个连接,如
Server 1,Server 2两个结点,Server 1-> Server 2,Server 2-> Server 1使用同一个TCP连接; - 整个网络总共有
n*(n-1)/2条TCP连接; - 每一个结点都会赋予一个整数值的编号,如
1,2,3..., 连接总是由编号小的服务器发起;
集群的配置如下:1
2
3
4=singleton|cluster
=1
=192.168.1.100:3883
=192.168.1.101:3883
每一个结点都会开放出一个 3883 端口来进行集群的通信,本质上来说,每一个结点都是一个 RPC 服务器,所以集群通信组件使用的是本文作者之前编写的 alligator-rpc.
消息类型
在集群中需要通信的有四种消息类型:
| 消息类型 | 描述 |
|---|---|
| PUBLISH 消息 | 客户端发送的 PUBLISH 消息 |
| 订阅消息 | Broker 节点的订阅关系,用于消息的路由,决定是否向该节点广播消息 |
| 服务器登陆消息 | 服务器之间的认证消息 |
| 客户端登陆消息 | 客户端登陆消息,客户端重新登陆会将之前的客户端下线,该消息用于通知其它节点上的同一个客户端下线 |
消息路由
每一个 Broker 节点都只负载了部分客户端,一个 Broker 节点收到一条 PUBLISH 消息之后,由于它没有全局的订阅关系视图,它不能判断其它节点是否订阅了该消息的 Topic. 为了让其它节点的客户端也收到该消息,简单的做法是向集群中其它结点广播该消息。但该方案有比较大缺点,向集群广播所有消息,一方向会加大集群的压力,另外一方面也会浪费集群的网络带宽,造成了不必要的消息传输。
一种优化的方案是每一结点都维护一个全局的订阅关系 Ctrie 树,该Ctrie 树包括了集群中所有客户端(客户端有所在服务器的信息)的订阅关系,因此可以判断出那些服务器订阅了消息,从而进行消息的转。但方案也有一个问题,如果客户端数量很多且订阅关系较复杂时,生成的 Ctrie 树会比较大,最终降低了查询匹配的效率。
为了避免 Ctrie 树过大的问题,可以改变数据的粒度,Ctrie 树是以客户端为维度进行匹配,现在转换思路,以服务器为维度来维护订阅关系。
最终的方案以 Topic Filter 首个 token 为索引来维护服务器级别的订阅关系,假定集群有三个结点,分别是 1, 2, 3, 每一个结点都维护了各自的 Topic Filter, 如下表所示:
| Topic Filter | Sever | 首个 token |
|---|---|---|
| order/biz,user/new,live/id | 1 | order,user,live |
| live/id,meeting/id | 2 | live,meeting |
| log/stat | 3 | log |
说明:
首个 token:主要是指 Topic Filter 的第一个层级。
然后再以 首个 token 为索引,建立服务器的订阅关系,如下表所示:
| 首个 token | Server List |
|---|---|
| order | {1} |
| user | {2} |
| live | {1,2} |
| meeting | {2} |
| log | {3} |
知道了服务器的订阅关系之后,收到一个 PUBLISH 消息之后,提取出 Topic 中的 首个 token, 索引出订阅的服务器列表便可进行消息的路由转发。
二级路由
假定一个消息在两个节点之间传递,如 Client 1--->Server 1--->Server 2--->Client 2, 节点之间 Server 1--->Server 2 的消息转发由服务器的订阅关系决定,节点到客户端 Server 2--->Client 2 的消息转发由 Ctrie 决定。可以看出,消息在多个节点之间传递时,便存在二级路由关系,即:1) 服务器订阅关系;2)客户端订阅关系 Ctrie.