Skip to content

Commit b6edcdd

Browse files
committed
modify:mqtt bind and unbind message support
1 parent 76f5faa commit b6edcdd

File tree

2 files changed

+40
-10
lines changed

2 files changed

+40
-10
lines changed

XEngine_Source/MQCore_ProtocolModule/ProtocolModule_Packet/ProtocolModule_Packet.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -851,7 +851,7 @@ bool CProtocolModule_Packet::ProtocolModule_Packet_MQTTCommon(XENGINE_PROTOCOLHD
851851
{
852852
int nRVLen = 0;
853853
int nListCount = 6;
854-
XCHAR tszRVBuffer[1024];
854+
XCHAR tszRVBuffer[1024] = {};
855855

856856
if (XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REPUSERLOG == pSt_ProtocolHdr->unOperatorCode)
857857
{
@@ -911,5 +911,22 @@ bool CProtocolModule_Packet::ProtocolModule_Packet_MQTTCommon(XENGINE_PROTOCOLHD
911911

912912
}
913913
}
914+
else if (XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REPTOPICBIND == pSt_ProtocolHdr->unOperatorCode)
915+
{
916+
if (0 == pSt_ProtocolHdr->wReserve)
917+
{
918+
MQTTProtocol_Packet_REPComm(tszRVBuffer, &nRVLen, pSt_ProtocolHdr->wPacketSerial, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_SUCCESS);
919+
}
920+
else
921+
{
922+
MQTTProtocol_Packet_REPComm(tszRVBuffer, &nRVLen, pSt_ProtocolHdr->wPacketSerial, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_TOPICNAME);
923+
}
924+
MQTTProtocol_Packet_Header(ptszMsgBuffer, pInt_MsgLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_SUBACK, tszRVBuffer, nRVLen);
925+
}
926+
else if (XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REPTOPICUNBIND == pSt_ProtocolHdr->unOperatorCode)
927+
{
928+
MQTTProtocol_Packet_REPComm(tszRVBuffer, &nRVLen, pSt_ProtocolHdr->wPacketSerial, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_SUCCESS);
929+
MQTTProtocol_Packet_Header(ptszMsgBuffer, pInt_MsgLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_UNSUBACK, tszRVBuffer, nRVLen);
930+
}
914931
return true;
915932
}

XEngine_Source/XEngine_MQServiceApp/MQService_MQTTTask.cpp

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,18 @@ bool MQService_MQTT_Handle(LPCXSTR lpszClientAddr, MQTTPROTOCOL_FIXEDHEADER* pSt
121121
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("MQTT客户端:%s,请求订阅失败,错误码:%lX"), lpszClientAddr, MQTTProtocol_GetLastError());
122122
return false;
123123
}
124+
XENGINE_PROTOCOLHDR st_ProtocolHdr = {};
125+
XENGINE_PROTOCOL_XMQ st_MQProtocol = {};
126+
st_ProtocolHdr.wHeader = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_HEADER;
127+
st_ProtocolHdr.unOperatorType = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
128+
st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQTOPICBIND;
129+
st_ProtocolHdr.unPacketSize = sizeof(XENGINE_PROTOCOL_XMQ);
130+
st_ProtocolHdr.wPacketSerial = wMsgID;
131+
st_ProtocolHdr.wTail = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_TAIL;
124132

125-
MQTTProtocol_Packet_REPComm(tszRVBuffer, &nRVLen, wMsgID, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_SUCCESS);
126-
MQTTProtocol_Packet_Header(tszSDBuffer, &nSDLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_SUBACK, tszRVBuffer, nRVLen);
127-
128-
XEngine_MQXService_Send(lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_MQTT);
129-
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("MQTT客户端:%s,请求订阅成功,主题名称:%s"), lpszClientAddr, tszTopicName);
133+
_tcsxcpy(st_MQProtocol.tszMQKey, tszTopicName);
134+
MessageQueue_TCP_Handle(&st_ProtocolHdr, lpszClientAddr, (LPCXSTR)&st_MQProtocol, sizeof(XENGINE_PROTOCOL_XMQ), XENGINE_MQAPP_NETTYPE_MQTT);
135+
//XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("MQTT客户端:%s,请求订阅成功,主题名称:%s"), lpszClientAddr, tszTopicName);
130136
}
131137
else if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_UNSUBSCRIBE == pSt_MQTTHdr->byMsgType)
132138
{
@@ -143,11 +149,18 @@ bool MQService_MQTT_Handle(LPCXSTR lpszClientAddr, MQTTPROTOCOL_FIXEDHEADER* pSt
143149
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("MQTT客户端:%s,请求取消订阅失败,错误码:%lX"), lpszClientAddr, MQTTProtocol_GetLastError());
144150
return false;
145151
}
146-
MQTTProtocol_Packet_REPComm(tszRVBuffer, &nRVLen, wMsgID, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_SUCCESS);
147-
MQTTProtocol_Packet_Header(tszSDBuffer, &nSDLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_UNSUBACK, tszRVBuffer, nRVLen);
152+
XENGINE_PROTOCOLHDR st_ProtocolHdr = {};
153+
XENGINE_PROTOCOL_XMQ st_MQProtocol = {};
154+
st_ProtocolHdr.wHeader = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_HEADER;
155+
st_ProtocolHdr.unOperatorType = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
156+
st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQTOPICUNBIND;
157+
st_ProtocolHdr.unPacketSize = sizeof(XENGINE_PROTOCOL_XMQ);
158+
st_ProtocolHdr.wPacketSerial = wMsgID;
159+
st_ProtocolHdr.wTail = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_TAIL;
148160

149-
XEngine_MQXService_Send(lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_MQTT);
150-
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("MQTT客户端:%s,请求取消订阅,主题名称:%s"), lpszClientAddr, tszTopicName);
161+
_tcsxcpy(st_MQProtocol.tszMQKey, tszTopicName);
162+
MessageQueue_TCP_Handle(&st_ProtocolHdr, lpszClientAddr, (LPCXSTR)&st_MQProtocol, sizeof(XENGINE_PROTOCOL_XMQ), XENGINE_MQAPP_NETTYPE_MQTT);
163+
//XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("MQTT客户端:%s,请求取消订阅,主题名称:%s"), lpszClientAddr, tszTopicName);
151164
}
152165
else if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_PUBLISH == pSt_MQTTHdr->byMsgType)
153166
{

0 commit comments

Comments
 (0)