@@ -18,19 +18,22 @@ XHTHREAD CALLBACK MessageQueue_MQTTThread(XPVOID lParam)
1818 {
1919 for (int j = 0 ; j < ppSst_ListAddr[i]->nPktCount ; j++)
2020 {
21- MQTTPROTOCOL_INFORMATION st_MQTTProtcol = {};
21+ int nMSGLen = 0 ;
22+ XCHAR *ptszMSGBuffer = NULL ;
23+ MQTTPROTOCOL_FIXEDHEADER st_MQTTHdr = {};
2224
23- if (MQTTProtocol_Parse_Recv (ppSst_ListAddr[i]->tszClientAddr , &st_MQTTProtcol ))
25+ if (MQTTProtocol_Parse_Recv (ppSst_ListAddr[i]->tszClientAddr , &st_MQTTHdr, &ptszMSGBuffer, &nMSGLen ))
2426 {
25- MQService_MQTT_Handle (ppSst_ListAddr[i]->tszClientAddr , &st_MQTTProtcol);
27+ MQService_MQTT_Handle (ppSst_ListAddr[i]->tszClientAddr , &st_MQTTHdr, ptszMSGBuffer, nMSGLen);
28+ BaseLib_OperatorMemory_FreeCStyle ((XPPMEM)&ptszMSGBuffer);
2629 }
2730 }
2831 }
2932 BaseLib_OperatorMemory_Free ((XPPPMEM)&ppSst_ListAddr, nListCount);
3033 }
3134 return 0 ;
3235}
33- void Packet_Property (XCHAR* ptszMsgBuffer, int * pInt_Len, MQTTPROTOCOL_HDRPROPERTY*** pppSt_HDRProperty, int nListCount)
36+ void Packet_Property (MQTTPROTOCOL_HDRPROPERTY*** pppSt_HDRProperty, int nListCount)
3437{
3538 BaseLib_OperatorMemory_Malloc ((XPPPMEM)pppSt_HDRProperty, nListCount, sizeof (MQTTPROTOCOL_HDRPROPERTY));
3639
@@ -58,7 +61,7 @@ void Packet_Property(XCHAR* ptszMsgBuffer, int* pInt_Len, MQTTPROTOCOL_HDRPROPER
5861 (*pppSt_HDRProperty)[5 ]->st_unValue .byValue = 1 ;
5962 (*pppSt_HDRProperty)[5 ]->byProFlag = XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_PROPERTY_WILDCARDSUBAVAI;
6063}
61- bool MQService_MQTT_Handle (LPCXSTR lpszClientAddr, MQTTPROTOCOL_INFORMATION* pSt_MQTTProtcol )
64+ bool MQService_MQTT_Handle (LPCXSTR lpszClientAddr, MQTTPROTOCOL_FIXEDHEADER* pSt_MQTTHdr, LPCXSTR lpszMSGBuffer, int nMSGLen )
6265{
6366 int nSDLen = 0 ;
6467 int nRVLen = 0 ;
@@ -68,54 +71,110 @@ bool MQService_MQTT_Handle(LPCXSTR lpszClientAddr, MQTTPROTOCOL_INFORMATION* pSt
6871 memset (tszSDBuffer, ' \0 ' , sizeof (tszSDBuffer));
6972 memset (tszRVBuffer, ' \0 ' , sizeof (tszRVBuffer));
7073 // 是不是连接
71- if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_CONNECT == pSt_MQTTProtcol-> st_FixedHdr . byMsgType )
74+ if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_CONNECT == pSt_MQTTHdr-> byMsgType )
7275 {
73- int nListCount = 6 ;
76+ int nListCount = 0 ;
7477 MQTTPROTOCOL_HDRPROPERTY** ppSt_HDRProperty;
78+ MQTTPROTOCOL_HDRCONNNECT st_HDRConnect = {};
79+ MQTTPROTOCOL_USERINFO st_USerInfo = {};
80+
81+ if (!MQTTProtocol_Parse_Connect (lpszMSGBuffer, nMSGLen, &st_HDRConnect, &st_USerInfo, &ppSt_HDRProperty, &nListCount))
82+ {
83+ // 错误断开连接
84+ MQTTProtocol_Packet_DisConnect (tszRVBuffer, &nRVLen);
85+ MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_DISCONN, tszRVBuffer, nRVLen);
86+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_MQTT);
87+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X (" MQTT客户端:%s,请求链接失败,错误码:%lX" ), lpszClientAddr, MQTTProtocol_GetLastError ());
88+ return false ;
89+ }
90+ BaseLib_OperatorMemory_Free ((XPPPMEM)&ppSt_HDRProperty, nListCount);
7591
76- Packet_Property (tszSDBuffer, &nSDLen, &ppSt_HDRProperty, nListCount);
92+ nListCount = 6 ;
93+ Packet_Property (&ppSt_HDRProperty, nListCount);
7794 MQTTProtocol_Packet_REPConnect (tszRVBuffer, &nRVLen, 0 , XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_SUCCESS, &ppSt_HDRProperty, nListCount);
78- MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, nRVLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_CONNACK, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_FLAG_CONNACK);
79- memcpy (tszSDBuffer + nSDLen, tszRVBuffer, nRVLen);
80- nSDLen += nRVLen;
95+ MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_CONNACK, tszRVBuffer, nRVLen);
8196
97+ BaseLib_OperatorMemory_Free ((XPPPMEM)&ppSt_HDRProperty, nListCount);
8298 XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_MQTT);
99+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X (" MQTT客户端:%s,请求链接成功,客户端ID:%s,用户名:%s" ), lpszClientAddr, st_USerInfo.tszClientID , st_USerInfo.tszClientUser );
83100 }
84- else if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_SUBSCRIBE == pSt_MQTTProtcol-> st_FixedHdr . byMsgType )
101+ else if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_SUBSCRIBE == pSt_MQTTHdr-> byMsgType )
85102 {
86- MQTTProtocol_Packet_REPComm (tszRVBuffer, &nRVLen, pSt_MQTTProtcol->wMsgID , XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_SUCCESS);
87- MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, nRVLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_SUBACK, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_FLAG_SUBACK);
88- memcpy (tszSDBuffer + nSDLen, tszRVBuffer, nRVLen);
89- nSDLen += nRVLen;
103+ XSHOT wMsgID = 0 ;
104+ XCHAR tszTopicName[MAX_PATH] = {};
105+ int nListCount = 0 ;
106+ MQTTPROTOCOL_HDRPROPERTY** ppSt_HDRProperty;
107+ MQTTPROTOCOL_HDRSUBSCRIBE st_SubScribe = {};
108+
109+ if (!MQTTProtocol_Parse_Subscribe (lpszMSGBuffer, nMSGLen, &wMsgID, tszTopicName, &st_SubScribe, &ppSt_HDRProperty, &nListCount))
110+ {
111+ MQTTProtocol_Packet_DisConnect (tszRVBuffer, &nRVLen);
112+ MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_DISCONN, tszRVBuffer, nRVLen);
113+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_MQTT);
114+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X (" MQTT客户端:%s,请求订阅失败,错误码:%lX" ), lpszClientAddr, MQTTProtocol_GetLastError ());
115+ return false ;
116+ }
117+
118+ MQTTProtocol_Packet_REPComm (tszRVBuffer, &nRVLen, wMsgID, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_SUCCESS);
119+ MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_SUBACK, tszRVBuffer, nRVLen);
90120
91121 XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_MQTT);
122+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X (" MQTT客户端:%s,请求订阅成功,主题名称:%s" ), lpszClientAddr, tszTopicName);
92123 }
93- else if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_UNSUBSCRIBE == pSt_MQTTProtcol-> st_FixedHdr . byMsgType )
124+ else if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_UNSUBSCRIBE == pSt_MQTTHdr-> byMsgType )
94125 {
95- MQTTProtocol_Packet_REPComm (tszRVBuffer, &nRVLen, pSt_MQTTProtcol->wMsgID , XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_SUCCESS);
96- MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, nRVLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_UNSUBACK, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_FLAG_UNSUBACK);
97- memcpy (tszSDBuffer + nSDLen, tszRVBuffer, nRVLen);
98- nSDLen += nRVLen;
126+ XSHOT wMsgID = 0 ;
127+ XCHAR tszTopicName[MAX_PATH] = {};
128+ int nListCount = 0 ;
129+ MQTTPROTOCOL_HDRPROPERTY** ppSt_HDRProperty;
130+
131+ if (!MQTTProtocol_Parse_UNSubcribe (lpszMSGBuffer, nMSGLen, &wMsgID, tszTopicName, &ppSt_HDRProperty, &nListCount))
132+ {
133+ MQTTProtocol_Packet_DisConnect (tszRVBuffer, &nRVLen);
134+ MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_DISCONN, tszRVBuffer, nRVLen);
135+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_MQTT);
136+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X (" MQTT客户端:%s,请求取消订阅失败,错误码:%lX" ), lpszClientAddr, MQTTProtocol_GetLastError ());
137+ return false ;
138+ }
139+ MQTTProtocol_Packet_REPComm (tszRVBuffer, &nRVLen, wMsgID, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_SUCCESS);
140+ MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_UNSUBACK, tszRVBuffer, nRVLen);
99141
100142 XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_MQTT);
143+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X (" MQTT客户端:%s,请求取消订阅,主题名称:%s" ), lpszClientAddr, tszTopicName);
101144 }
102- else if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_PUBLISH == pSt_MQTTProtcol-> st_FixedHdr . byMsgType )
145+ else if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_PUBLISH == pSt_MQTTHdr-> byMsgType )
103146 {
104- if ((XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_FLAG_PUBLISH_QOS1 == pSt_MQTTProtcol->st_FixedHdr .byMsgFlag ) || (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_FLAG_PUBLISH_QOS2 == pSt_MQTTProtcol->st_FixedHdr .byMsgFlag ))
147+ int nPLen = 0 ;
148+ int nListCount = 0 ;
149+ XSHOT wMsgID = 0 ;
150+ XCHAR tszTopicName[MAX_PATH] = {};
151+ XENGINE_PROTOCOLHDR st_ProtocolHdr = {};
152+ MQTTPROTOCOL_HDRPROPERTY** ppSt_HDRProperty;
153+
154+ if (!MQTTProtocol_Parse_Publish (lpszMSGBuffer, nMSGLen, pSt_MQTTHdr, tszTopicName, &wMsgID, tszRVBuffer, &nRVLen, &ppSt_HDRProperty, &nListCount))
155+ {
156+ MQTTProtocol_Packet_DisConnect (tszRVBuffer, &nRVLen);
157+ MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_DISCONN, tszRVBuffer, nRVLen);
158+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_MQTT);
159+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X (" MQTT客户端:%s,请求取消订阅失败,错误码:%lX" ), lpszClientAddr, MQTTProtocol_GetLastError ());
160+ return false ;
161+ }
162+ if ((XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_FLAG_PUBLISH_QOS1 == pSt_MQTTHdr->byMsgFlag ) || (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_FLAG_PUBLISH_QOS2 == pSt_MQTTHdr->byMsgFlag ))
105163 {
106164 // 需要回复
107- MQTTProtocol_Packet_REPPublish (tszRVBuffer, &nRVLen, pSt_MQTTProtcol->wMsgID , XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_SUCCESS);
108- MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, nRVLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_PUBACK, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_FLAG_PUBACK);
109- memcpy (tszSDBuffer + nSDLen, tszRVBuffer, nRVLen);
110- nSDLen += nRVLen;
111-
165+ MQTTProtocol_Packet_REPPublish (tszRVBuffer, &nRVLen, wMsgID, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_SUCCESS);
166+ MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_PUBACK, tszRVBuffer, nRVLen);
112167 XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_MQTT);
113168 }
169+ // ProtocolModule_Parse_Websocket(tszRVBuffer, nRVLen, &st_ProtocolHdr, tszSDBuffer, &nSDLen);
170+ // MessageQueue_TCP_Handle(&st_ProtocolHdr, lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_WEBSOCKET);
171+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X (" MQTT客户端:%s,发布消息,主题名称:%s,推送大小:%d" ), lpszClientAddr, tszTopicName, nRVLen);
114172 }
115- else if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_PINGREQ == pSt_MQTTProtcol-> st_FixedHdr . byMsgType )
173+ else if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_PINGREQ == pSt_MQTTHdr-> byMsgType )
116174 {
117- MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, 0 , XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_PINGREP, 0 );
175+ MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_PINGREP);
118176 XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_MQTT);
177+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X (" MQTT客户端:%s,请求了心跳PING协议成功" ));
119178 }
120179 return true ;
121180}
0 commit comments