Skip to content

Commit 76f5faa

Browse files
committed
modify:mqtt publish message supported
1 parent 17397f4 commit 76f5faa

File tree

3 files changed

+22
-13
lines changed

3 files changed

+22
-13
lines changed

XEngine_Source/MQCore_ProtocolModule/ProtocolModule_Packet/ProtocolModule_Packet.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -853,7 +853,7 @@ bool CProtocolModule_Packet::ProtocolModule_Packet_MQTTCommon(XENGINE_PROTOCOLHD
853853
int nListCount = 6;
854854
XCHAR tszRVBuffer[1024];
855855

856-
if (pSt_ProtocolHdr->unOperatorCode)
856+
if (XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REPUSERLOG == pSt_ProtocolHdr->unOperatorCode)
857857
{
858858
if (0 == pSt_ProtocolHdr->wReserve)
859859
{
@@ -898,6 +898,18 @@ bool CProtocolModule_Packet::ProtocolModule_Packet_MQTTCommon(XENGINE_PROTOCOLHD
898898
MQTTProtocol_Packet_Header(ptszMsgBuffer, pInt_MsgLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_CONNACK, tszRVBuffer, nRVLen);
899899
}
900900
}
901+
else if (XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REPPOST == pSt_ProtocolHdr->unOperatorCode)
902+
{
903+
if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_FLAG_PUBLISH_QOS1 == pSt_ProtocolHdr->byIsReply)
904+
{
905+
//需要回复
906+
MQTTProtocol_Packet_REPPublish(tszRVBuffer, &nRVLen, pSt_ProtocolHdr->wPacketSerial, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_SUCCESS);
907+
MQTTProtocol_Packet_Header(ptszMsgBuffer, pInt_MsgLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_PUBACK, tszRVBuffer, nRVLen);
908+
}
909+
else if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_FLAG_PUBLISH_QOS2 == pSt_ProtocolHdr->byIsReply)
910+
{
901911

912+
}
913+
}
902914
return true;
903915
}

XEngine_Source/XEngine_MQServiceApp/MQService_MQTTTask.cpp

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -165,22 +165,18 @@ bool MQService_MQTT_Handle(LPCXSTR lpszClientAddr, MQTTPROTOCOL_FIXEDHEADER* pSt
165165
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("MQTT客户端:%s,请求取消订阅失败,错误码:%lX"), lpszClientAddr, MQTTProtocol_GetLastError());
166166
return false;
167167
}
168-
if ((XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_FLAG_PUBLISH_QOS1 == pSt_MQTTHdr->byMsgFlag) || (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_FLAG_PUBLISH_QOS2 == pSt_MQTTHdr->byMsgFlag))
169-
{
170-
//需要回复
171-
MQTTProtocol_Packet_REPPublish(tszRVBuffer, &nRVLen, wMsgID, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_SUCCESS);
172-
MQTTProtocol_Packet_Header(tszSDBuffer, &nSDLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_PUBACK, tszRVBuffer, nRVLen);
173-
XEngine_MQXService_Send(lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_MQTT);
174-
}
175-
//ProtocolModule_Parse_Websocket(tszRVBuffer, nRVLen, &st_ProtocolHdr, tszSDBuffer, &nSDLen);
176-
//MessageQueue_TCP_Handle(&st_ProtocolHdr, lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_WEBSOCKET);
177-
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("MQTT客户端:%s,发布消息,主题名称:%s,推送大小:%d"), lpszClientAddr, tszTopicName, nRVLen);
168+
ProtocolModule_Parse_Websocket(tszRVBuffer, nRVLen, &st_ProtocolHdr, tszSDBuffer, &nSDLen);
169+
170+
st_ProtocolHdr.byIsReply = pSt_MQTTHdr->byMsgFlag;
171+
st_ProtocolHdr.wPacketSerial = wMsgID;
172+
MessageQueue_TCP_Handle(&st_ProtocolHdr, lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_MQTT);
173+
//XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("MQTT客户端:%s,发布消息,主题名称:%s,推送大小:%d"), lpszClientAddr, tszTopicName, nRVLen);
178174
}
179175
else if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_PINGREQ == pSt_MQTTHdr->byMsgType)
180176
{
181177
MQTTProtocol_Packet_Header(tszSDBuffer, &nSDLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_PINGREP);
182178
XEngine_MQXService_Send(lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_MQTT);
183-
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("MQTT客户端:%s,请求了心跳PING协议成功"));
179+
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("MQTT客户端:%s,请求了心跳PING协议成功"), lpszClientAddr);
184180
}
185181
return true;
186182
}

XEngine_Source/XEngine_MQServiceApp/MQService_TCPTask.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ bool MessageQueue_TCP_Handle(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, LPCXSTR lpszC
8484
memset(&st_ProtocolAuth, '\0', sizeof(XENGINE_PROTOCOL_USERAUTH));
8585

8686
memcpy(&st_ProtocolAuth, lpszMsgBuffer, sizeof(XENGINE_PROTOCOL_USERAUTH));
87+
pSt_ProtocolHdr->unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REPUSERLOG;
88+
8789
if (SessionModule_Client_GetAddr(st_ProtocolAuth.tszUserName))
8890
{
8991
pSt_ProtocolHdr->wReserve = 700;
@@ -95,7 +97,6 @@ bool MessageQueue_TCP_Handle(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, LPCXSTR lpszC
9597
_tcsxcpy(st_UserInfo.tszUserName, st_ProtocolAuth.tszUserName);
9698
_tcsxcpy(st_UserInfo.tszUserPass, st_ProtocolAuth.tszUserPass);
9799

98-
pSt_ProtocolHdr->unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REPUSERLOG;
99100
if (_tcsxlen(st_ServiceCfg.st_XPass.tszPassLogin) > 0)
100101
{
101102
int nRVLen = 0;

0 commit comments

Comments
 (0)