1+ #include " MQService_Hdr.h"
2+
3+ XHTHREAD CALLBACK MessageQueue_MQTTThread (XPVOID lParam)
4+ {
5+ int nThreadPos = *(int *)lParam;
6+ nThreadPos++;
7+ while (bIsRun)
8+ {
9+ if (!MQTTProtocol_Parse_WaitEvent (nThreadPos))
10+ {
11+ continue ;
12+ }
13+ int nListCount = 0 ;
14+ XENGINE_MANAGEPOOL_TASKEVENT** ppSst_ListAddr;
15+
16+ MQTTProtocol_Parse_GetPool (nThreadPos, &ppSst_ListAddr, &nListCount);
17+ for (int i = 0 ; i < nListCount; i++)
18+ {
19+ for (int j = 0 ; j < ppSst_ListAddr[i]->nPktCount ; j++)
20+ {
21+ MQTTPROTOCOL_INFORMATION st_MQTTProtcol = {};
22+
23+ if (MQTTProtocol_Parse_Recv (ppSst_ListAddr[i]->tszClientAddr , &st_MQTTProtcol))
24+ {
25+ MQService_MQTT_Handle (ppSst_ListAddr[i]->tszClientAddr , &st_MQTTProtcol);
26+ }
27+ }
28+ }
29+ BaseLib_OperatorMemory_Free ((XPPPMEM)&ppSst_ListAddr, nListCount);
30+ }
31+ return 0 ;
32+ }
33+ void Packet_Property (XCHAR* ptszMsgBuffer, int * pInt_Len, MQTTPROTOCOL_HDRPROPERTY*** pppSt_HDRProperty, int nListCount)
34+ {
35+ BaseLib_OperatorMemory_Malloc ((XPPPMEM)pppSt_HDRProperty, nListCount, sizeof (MQTTPROTOCOL_HDRPROPERTY));
36+
37+ (*pppSt_HDRProperty)[0 ]->nProLen = 4 ;
38+ (*pppSt_HDRProperty)[0 ]->st_unValue .nValue = 1024000 ;
39+ (*pppSt_HDRProperty)[0 ]->byProFlag = XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_PROPERTY_PACKMAX;
40+
41+ (*pppSt_HDRProperty)[1 ]->nProLen = 1 ;
42+ (*pppSt_HDRProperty)[1 ]->st_unValue .byValue = 1 ;
43+ (*pppSt_HDRProperty)[1 ]->byProFlag = XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_PROPERTY_REVERAVAI;
44+
45+ (*pppSt_HDRProperty)[2 ]->nProLen = 1 ;
46+ (*pppSt_HDRProperty)[2 ]->st_unValue .byValue = 1 ;
47+ (*pppSt_HDRProperty)[2 ]->byProFlag = XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_PROPERTY_SHAREDSUBAVAI;
48+
49+ (*pppSt_HDRProperty)[3 ]->nProLen = 1 ;
50+ (*pppSt_HDRProperty)[3 ]->st_unValue .byValue = 1 ;
51+ (*pppSt_HDRProperty)[3 ]->byProFlag = XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_PROPERTY_SUBIDAVAI;
52+
53+ (*pppSt_HDRProperty)[4 ]->nProLen = 2 ;
54+ (*pppSt_HDRProperty)[4 ]->st_unValue .wValue = 65535 ;
55+ (*pppSt_HDRProperty)[4 ]->byProFlag = XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_PROPERTY_ALIASMAX;
56+
57+ (*pppSt_HDRProperty)[5 ]->nProLen = 1 ;
58+ (*pppSt_HDRProperty)[5 ]->st_unValue .byValue = 1 ;
59+ (*pppSt_HDRProperty)[5 ]->byProFlag = XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_PROPERTY_WILDCARDSUBAVAI;
60+ }
61+ bool MQService_MQTT_Handle (LPCXSTR lpszClientAddr, MQTTPROTOCOL_INFORMATION* pSt_MQTTProtcol)
62+ {
63+ int nSDLen = 0 ;
64+ int nRVLen = 0 ;
65+ XCHAR tszSDBuffer[1024 ];
66+ XCHAR tszRVBuffer[1024 ];
67+
68+ memset (tszSDBuffer, ' \0 ' , sizeof (tszSDBuffer));
69+ memset (tszRVBuffer, ' \0 ' , sizeof (tszRVBuffer));
70+ // 是不是连接
71+ if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_CONNECT == pSt_MQTTProtcol->st_FixedHdr .byMsgType )
72+ {
73+ int nListCount = 6 ;
74+ MQTTPROTOCOL_HDRPROPERTY** ppSt_HDRProperty;
75+
76+ Packet_Property (tszSDBuffer, &nSDLen, &ppSt_HDRProperty, nListCount);
77+ MQTTProtocol_Packet_REPConnect (tszRVBuffer, &nRVLen, 0 , XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_SUCCESS, &ppSt_HDRProperty, nListCount);
78+ MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, nRVLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_CONNACK, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_FLAG_CONNACK);
79+ memcpy (tszSDBuffer + nSDLen, tszRVBuffer, nRVLen);
80+ nSDLen += nRVLen;
81+
82+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_TCP);
83+ }
84+ else if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_SUBSCRIBE == pSt_MQTTProtcol->st_FixedHdr .byMsgType )
85+ {
86+ MQTTProtocol_Packet_REPComm (tszRVBuffer, &nRVLen, pSt_MQTTProtcol->wMsgID , XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_SUCCESS);
87+ MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, nRVLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_SUBACK, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_FLAG_SUBACK);
88+ memcpy (tszSDBuffer + nSDLen, tszRVBuffer, nRVLen);
89+ nSDLen += nRVLen;
90+
91+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_TCP);
92+ }
93+ else if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_UNSUBSCRIBE == pSt_MQTTProtcol->st_FixedHdr .byMsgType )
94+ {
95+ MQTTProtocol_Packet_REPComm (tszRVBuffer, &nRVLen, pSt_MQTTProtcol->wMsgID , XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_SUCCESS);
96+ MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, nRVLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_UNSUBACK, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_FLAG_UNSUBACK);
97+ memcpy (tszSDBuffer + nSDLen, tszRVBuffer, nRVLen);
98+ nSDLen += nRVLen;
99+
100+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_TCP);
101+ }
102+ else if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_PUBLISH == pSt_MQTTProtcol->st_FixedHdr .byMsgType )
103+ {
104+ if ((XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_FLAG_PUBLISH_QOS1 == pSt_MQTTProtcol->st_FixedHdr .byMsgFlag ) || (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_FLAG_PUBLISH_QOS2 == pSt_MQTTProtcol->st_FixedHdr .byMsgFlag ))
105+ {
106+ // 需要回复
107+ MQTTProtocol_Packet_REPPublish (tszRVBuffer, &nRVLen, pSt_MQTTProtcol->wMsgID , XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_SUCCESS);
108+ MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, nRVLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_PUBACK, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_FLAG_PUBACK);
109+ memcpy (tszSDBuffer + nSDLen, tszRVBuffer, nRVLen);
110+ nSDLen += nRVLen;
111+
112+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_TCP);
113+ }
114+ }
115+ else if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_PINGREQ == pSt_MQTTProtcol->st_FixedHdr .byMsgType )
116+ {
117+ MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, 0 , XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_PINGREP, 0 );
118+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_TCP);
119+ }
120+ return true ;
121+ }
0 commit comments