博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
mqtt协议-broker之moqutte源码研究二之Connect报文处理
阅读量:6820 次
发布时间:2019-06-26

本文共 6077 字,大约阅读时间需要 20 分钟。

先上一个图,大概说明一下moquette 的类之间的关系

mqtt协议-broker之moqutte源码研究二之Connect报文处理

一.ProtocolProcessor类

该类是moquette里面的最终要的类,负责所有报文的处理,持有所有各模块功能的实现对象的引用, 下面详细介绍

protected ConnectionDescriptorStore connectionDescriptors;//所有的连接描述符文存储,即clientId与通道之间的映射集合protected ConcurrentMap
subscriptionInCourse;//所有当前正在处理的 订阅关系的存储,之所以有这个是过滤无效的订阅请求private SubscriptionsDirectory subscriptions;//订阅目录,本质上是topic树private ISubscriptionsStore subscriptionStore;//所有的订阅的集合private boolean allowAnonymous;//是否允许匿名连接private boolean allowZeroByteClientId;//是否允许clientId为空private IAuthorizator m_authorizator; //对topic的读写权限认证private IMessagesStore m_messagesStore;//retainMessage的存储private ISessionsStore m_sessionsStore;//session 存储private IAuthenticator m_authenticator;//连接时候的鉴权认证private BrokerInterceptor m_interceptor;//各个层面的拦截器private Qos0PublishHandler qos0PublishHandler;//qos0拦截器private Qos1PublishHandler qos1PublishHandler;//qos1拦截器private Qos2PublishHandler qos2PublishHandler;/qos2拦截器private MessagesPublisher messagesPublisher;//分发消息,遗愿消息,以及集权间同步消息private InternalRepublisher internalRepublisher;//保留消息,qos1,qos2消息重发器 ConcurrentMap
m_willStore//遗愿消息存储 几乎所有的功能的源头都在这个类里面

二.对14种报文的处理,都在ProtocolProcessor类,后面会分篇挨个讲解moquette对这14个报文的处理

具体哪14中文报文如下

名字                   值           报文流动方向                      描述

Reserved 0 禁止 保留

CONNECT 1 客户端到服务端 客户端请求连接服务端
CONNACK 2 服务端到客户端 连接报文确认
PUBLISH 3 两个方向都允许 发布消息
PUBACK 4 两个方向都允许 QoS 1消息发布收到确认
PUBREC 5 两个方向都允许 发布收到(保证交付第一步)
PUBREL 6 两个方向都允许 发布释放(保证交付第二步)
PUBCOMP 7 两个方向都允许 QoS 2消息发布完成(保证交互第三步)
SUBSCRIBE 8 客户端到服务端 客户端订阅请求
SUBACK 9 服务端到客户端 订阅请求报文确认
UNSUBSCRIBE 10 客户端到服务端 客户端取消订阅请求
UNSUBACK 11 服务端到客户端 取消订阅报文确认
PINGREQ 12 客户端到服务端 心跳请求
PINGRESP 13 服务端到客户端 心跳响应
DISCONNECT 14 客户端到服务端 客户端断开连接
Reserved 15 禁止 保留

或者到这里看更详细的mqtt中文翻译

非常感谢作者的辛劳工作和无私分享

三.debug跟踪moquette 对CONNECT报文的处理

大概分为以下几步
1.验证协议版本,如果不是mqtt-3.1或者mqtt-3.1.1则拒绝连接
2.验证clientId是否为空,如果为空,但是配置的时候(在上篇介绍的moquette.cof里面配置)要求不允许唯恐,即上面的allowZeroByteClientId或者cleanSession为false即要求保存会话,则视为不合法,拒绝连接,否则由moquette生成clientId
3.验证是否有登录的权限
这里面贴上源码讲解一下
private boolean login(Channel channel, MqttConnectMessage msg, final String clientId) {
// handle user authentication
if (msg.variableHeader().hasUserName()) {
byte[] pwd = null;
if (msg.variableHeader().hasPassword()) {
pwd = msg.payload().passwordInBytes();
} else if (!this.allowAnonymous) {
LOG.error("Client didn't supply any password and MQTT anonymous mode is disabled CId={}", clientId);
failedCredentials(channel);
return false;
}
if (!m_authenticator.checkValid(clientId, msg.payload().userName(), pwd)) {
LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}, password={}",
clientId, msg.payload().userName(), pwd);
failedCredentials(channel);
return false;
}
NettyUtils.userName(channel, msg.payload().userName());
} else if (!this.allowAnonymous) {
LOG.error("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId);
failedCredentials(channel);
return false;
}
return true;
}

3.1.如果CONNETCT报文里面的可变头里面没有用户名,直接返回true3.2.如果有用户名,同时有密码,从可变头取出密码,调用m_authenticator进行验证3.3 如果有用户名,没有密码,认证失败,拒绝连接3.4 如果没有用户名,同时配置为不允许匿名,则认证失败

4.创建连接描述符,连接描述符包括clientId,channel,isCleanSession,ConnectState,同时判断连接描述符集合里面是否包括该连接描述符,如果包含,代表该连接以及建立,断开连接

5.根据CONNECT报文里面的Keep Alive time 来设置tcp参数
6.根据CONNECT报文遗愿消息标志位,觉得是否存储遗愿消息
7.返回CONNACK报文,这里面把返回CONNACK报文单独讲解一下

private boolean sendAck(ConnectionDescriptor descriptor, MqttConnectMessage msg, final String clientId) {    LOG.info("Sending connect ACK. CId={}", clientId);    final boolean success = descriptor.assignState(DISCONNECTED, SENDACK);    if (!success) {        return false;    }    MqttConnAckMessage okResp;    ClientSession clientSession = m_sessionsStore.sessionForClient(clientId);    boolean isSessionAlreadyStored = clientSession != null;    if (!msg.variableHeader().isCleanSession() && isSessionAlreadyStored) {        okResp = connAckWithSessionPresent(CONNECTION_ACCEPTED);    } else {        okResp = connAck(CONNECTION_ACCEPTED);    }    if (isSessionAlreadyStored) {        LOG.info("Cleaning session. CId={}", clientId);        clientSession.cleanSession(msg.variableHeader().isCleanSession());    }    descriptor.writeAndFlush(okResp);    LOG.info("The connect ACK has been sent. CId={}", clientId);    return true;}        7.1 判断当前连接的状态,怎么判断的呢?这里面用了AtomicReference
通过调用原子引用类 compareAndSet(DISCONNECTED, SENDACK)来解决并发修改连接状态的问题。 7.2如果状态是disConnect,将状态修改为sendAck 7.3 如果CONNETCT报文里面的CleanSession标识设置为0同时broker已经有了client的会话,将CONNACK报文里面的连接确认标志设为1,告诉客户端,broker已经有了响应的会话信息。否则将连接确认标志设为0 7.4 如果已经存在相应的client的会话,则根据新的连接,更新clientSession里面的是否清理session属性

8.唤醒拦截器记录连接事件

9.创建或者从新加载clientSession,这里面单独讲解一下

private ClientSession createOrLoadClientSession(ConnectionDescriptor descriptor, MqttConnectMessage msg,        String clientId) {    final boolean success = descriptor.assignState(SENDACK, SESSION_CREATED);    if (!success) {        return null;    }    ClientSession clientSession = m_sessionsStore.sessionForClient(clientId);    boolean isSessionAlreadyStored = clientSession != null;    if (!isSessionAlreadyStored) {        clientSession = m_sessionsStore.createNewSession(clientId, msg.variableHeader().isCleanSession());    }    if (msg.variableHeader().isCleanSession()) {        LOG.info("Cleaning session. CId={}", clientId);        clientSession.cleanSession();    }    return clientSession;}     9.1 AtomicReference
通过调用原子引用类 compareAndSet(SENDACK, SESSION_CREATED)将连接状态从sendAck修改为session_create 9.2 session存储结合里面,是否已经存在会话信息,如果不存在,创建一个新的clientsession 9.3 如果存在,根据CONNETCT报文里面的cleansession自动决定是否清理调旧的会话信息。

10.如果CONNETCT报文要求不清理会话信息(cleansession标志位为0),则重发QoS1 and QoS2 messages,同时将连接状态从session_create修改成message_republish

11.将连接状态从session_create修改成established

到此,broker和client直接的mqtt连接正式建立,后面client可以开始发送SUBSCRIBE或者PUBLISH报文了。

在这里再补充一点,对于broker来说,建立连接的过程中,连接状态会从disConnect->sendAck->session_create->message_republish->established,之所以要设置这些状态,是因为,每一步后面的操作都要基于前面的状态来决定是否需要真正执行,这里面用到了原子引用类来保证,状态的修改这个操作的原子行,确保了在并发的情况下,每一步操作都是条件满足的。

下面一篇将会讲解SUBSCRIBE报文的处理

转载于:https://blog.51cto.com/13579730/2073630

你可能感兴趣的文章
把匹配的小写转换成大写(\U、\u)
查看>>
【Android网络开发の5】Android中的网络数据下载
查看>>
linux终端使用python的matplotlib模块画图出现“could not open display”问题解决
查看>>
9月国内浏览器市场份额大战:IE份额上升至48.45%
查看>>
Tapestry 教程(五)实现Hi-Lo猜谜游戏
查看>>
2015年12月国内网民地域分布12强:湖北跻身上榜
查看>>
mysql-5.6安装
查看>>
LNMP环境搭建 Ubuntu篇
查看>>
设置低版本VDA注册高版本DDC
查看>>
multi-process script for ping host
查看>>
云数据库SQL Server 2008 R2版推出OSS版本数据上云
查看>>
Android 侵权案下周复审
查看>>
shell基础知识;
查看>>
RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查
查看>>
shell 中如何输出 n 个连续字符
查看>>
Bootstrap V4 自学开始!
查看>>
技术博客2014年4月份头条记录
查看>>
聚合国内外主流广告平台|开发者服务-KeyMob移动广告聚合平台
查看>>
解决PotPalyer不能拖放播放
查看>>
Linux安装mysql5.7
查看>>