本文共 6077 字,大约阅读时间需要 20 分钟。
先上一个图,大概说明一下moquette 的类之间的关系
一.ProtocolProcessor类
该类是moquette里面的最终要的类,负责所有报文的处理,持有所有各模块功能的实现对象的引用, 下面详细介绍protected ConnectionDescriptorStore connectionDescriptors;//所有的连接描述符文存储,即clientId与通道之间的映射集合protected ConcurrentMapsubscriptionInCourse;//所有当前正在处理的 订阅关系的存储,之所以有这个是过滤无效的订阅请求private SubscriptionsDirectory subscriptions;//订阅目录,本质上是topic树private ISubscriptionsStore subscriptionStore;//所有的订阅的集合private boolean allowAnonymous;//是否允许匿名连接private boolean allowZeroByteClientId;//是否允许clientId为空private IAuthorizator m_authorizator; //对topic的读写权限认证private IMessagesStore m_messagesStore;//retainMessage的存储private ISessionsStore m_sessionsStore;//session 存储private IAuthenticator m_authenticator;//连接时候的鉴权认证private BrokerInterceptor m_interceptor;//各个层面的拦截器private Qos0PublishHandler qos0PublishHandler;//qos0拦截器private Qos1PublishHandler qos1PublishHandler;//qos1拦截器private Qos2PublishHandler qos2PublishHandler;/qos2拦截器private MessagesPublisher messagesPublisher;//分发消息,遗愿消息,以及集权间同步消息private InternalRepublisher internalRepublisher;//保留消息,qos1,qos2消息重发器 ConcurrentMap m_willStore//遗愿消息存储 几乎所有的功能的源头都在这个类里面
二.对14种报文的处理,都在ProtocolProcessor类,后面会分篇挨个讲解moquette对这14个报文的处理
具体哪14中文报文如下名字 值 报文流动方向 描述
Reserved 0 禁止 保留
CONNECT 1 客户端到服务端 客户端请求连接服务端CONNACK 2 服务端到客户端 连接报文确认PUBLISH 3 两个方向都允许 发布消息PUBACK 4 两个方向都允许 QoS 1消息发布收到确认PUBREC 5 两个方向都允许 发布收到(保证交付第一步)PUBREL 6 两个方向都允许 发布释放(保证交付第二步)PUBCOMP 7 两个方向都允许 QoS 2消息发布完成(保证交互第三步)SUBSCRIBE 8 客户端到服务端 客户端订阅请求SUBACK 9 服务端到客户端 订阅请求报文确认UNSUBSCRIBE 10 客户端到服务端 客户端取消订阅请求UNSUBACK 11 服务端到客户端 取消订阅报文确认PINGREQ 12 客户端到服务端 心跳请求PINGRESP 13 服务端到客户端 心跳响应DISCONNECT 14 客户端到服务端 客户端断开连接Reserved 15 禁止 保留或者到这里看更详细的mqtt中文翻译
非常感谢作者的辛劳工作和无私分享三.debug跟踪moquette 对CONNECT报文的处理
大概分为以下几步1.验证协议版本,如果不是mqtt-3.1或者mqtt-3.1.1则拒绝连接2.验证clientId是否为空,如果为空,但是配置的时候(在上篇介绍的moquette.cof里面配置)要求不允许唯恐,即上面的allowZeroByteClientId或者cleanSession为false即要求保存会话,则视为不合法,拒绝连接,否则由moquette生成clientId3.验证是否有登录的权限这里面贴上源码讲解一下private boolean login(Channel channel, MqttConnectMessage msg, final String clientId) { // handle user authenticationif (msg.variableHeader().hasUserName()) { byte[] pwd = null;if (msg.variableHeader().hasPassword()) { pwd = msg.payload().passwordInBytes();} else if (!this.allowAnonymous) { LOG.error("Client didn't supply any password and MQTT anonymous mode is disabled CId={}", clientId);failedCredentials(channel);return false;}if (!m_authenticator.checkValid(clientId, msg.payload().userName(), pwd)) { LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}, password={}",clientId, msg.payload().userName(), pwd);failedCredentials(channel);return false;}NettyUtils.userName(channel, msg.payload().userName());} else if (!this.allowAnonymous) { LOG.error("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId);failedCredentials(channel);return false;}return true;}3.1.如果CONNETCT报文里面的可变头里面没有用户名,直接返回true3.2.如果有用户名,同时有密码,从可变头取出密码,调用m_authenticator进行验证3.3 如果有用户名,没有密码,认证失败,拒绝连接3.4 如果没有用户名,同时配置为不允许匿名,则认证失败
4.创建连接描述符,连接描述符包括clientId,channel,isCleanSession,ConnectState,同时判断连接描述符集合里面是否包括该连接描述符,如果包含,代表该连接以及建立,断开连接
5.根据CONNECT报文里面的Keep Alive time 来设置tcp参数6.根据CONNECT报文遗愿消息标志位,觉得是否存储遗愿消息7.返回CONNACK报文,这里面把返回CONNACK报文单独讲解一下private boolean sendAck(ConnectionDescriptor descriptor, MqttConnectMessage msg, final String clientId) { LOG.info("Sending connect ACK. CId={}", clientId); final boolean success = descriptor.assignState(DISCONNECTED, SENDACK); if (!success) { return false; } MqttConnAckMessage okResp; ClientSession clientSession = m_sessionsStore.sessionForClient(clientId); boolean isSessionAlreadyStored = clientSession != null; if (!msg.variableHeader().isCleanSession() && isSessionAlreadyStored) { okResp = connAckWithSessionPresent(CONNECTION_ACCEPTED); } else { okResp = connAck(CONNECTION_ACCEPTED); } if (isSessionAlreadyStored) { LOG.info("Cleaning session. CId={}", clientId); clientSession.cleanSession(msg.variableHeader().isCleanSession()); } descriptor.writeAndFlush(okResp); LOG.info("The connect ACK has been sent. CId={}", clientId); return true;} 7.1 判断当前连接的状态,怎么判断的呢?这里面用了AtomicReference通过调用原子引用类 compareAndSet(DISCONNECTED, SENDACK)来解决并发修改连接状态的问题。 7.2如果状态是disConnect,将状态修改为sendAck 7.3 如果CONNETCT报文里面的CleanSession标识设置为0同时broker已经有了client的会话,将CONNACK报文里面的连接确认标志设为1,告诉客户端,broker已经有了响应的会话信息。否则将连接确认标志设为0 7.4 如果已经存在相应的client的会话,则根据新的连接,更新clientSession里面的是否清理session属性
8.唤醒拦截器记录连接事件
9.创建或者从新加载clientSession,这里面单独讲解一下private ClientSession createOrLoadClientSession(ConnectionDescriptor descriptor, MqttConnectMessage msg, String clientId) { final boolean success = descriptor.assignState(SENDACK, SESSION_CREATED); if (!success) { return null; } ClientSession clientSession = m_sessionsStore.sessionForClient(clientId); boolean isSessionAlreadyStored = clientSession != null; if (!isSessionAlreadyStored) { clientSession = m_sessionsStore.createNewSession(clientId, msg.variableHeader().isCleanSession()); } if (msg.variableHeader().isCleanSession()) { LOG.info("Cleaning session. CId={}", clientId); clientSession.cleanSession(); } return clientSession;} 9.1 AtomicReference通过调用原子引用类 compareAndSet(SENDACK, SESSION_CREATED)将连接状态从sendAck修改为session_create 9.2 session存储结合里面,是否已经存在会话信息,如果不存在,创建一个新的clientsession 9.3 如果存在,根据CONNETCT报文里面的cleansession自动决定是否清理调旧的会话信息。
10.如果CONNETCT报文要求不清理会话信息(cleansession标志位为0),则重发QoS1 and QoS2 messages,同时将连接状态从session_create修改成message_republish
11.将连接状态从session_create修改成established到此,broker和client直接的mqtt连接正式建立,后面client可以开始发送SUBSCRIBE或者PUBLISH报文了。
在这里再补充一点,对于broker来说,建立连接的过程中,连接状态会从disConnect->sendAck->session_create->message_republish->established,之所以要设置这些状态,是因为,每一步后面的操作都要基于前面的状态来决定是否需要真正执行,这里面用到了原子引用类来保证,状态的修改这个操作的原子行,确保了在并发的情况下,每一步操作都是条件满足的。下面一篇将会讲解SUBSCRIBE报文的处理
转载于:https://blog.51cto.com/13579730/2073630