Skip to content

Commit 17397f4

Browse files
committed
modify:mqtt login support for tcp handle function
fixed:close client is incorrect
1 parent a02a916 commit 17397f4

File tree

7 files changed

+103
-17
lines changed

7 files changed

+103
-17
lines changed

XEngine_Source/MQCore_ProtocolModule/MQCore_ProtocolModule.vcxproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
<PropertyGroup Label="UserMacros" />
7373
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
7474
<LinkIncremental>true</LinkIncremental>
75-
<IncludePath>$(XEngine_Include);../XEngine_Depend/XEngine_Module/jsoncpp;$(IncludePath)</IncludePath>
75+
<IncludePath>$(XEngine_Include);../XEngine_Depend/XEngine_Module/jsoncpp;..\MQCore_ProtocolModule;$(IncludePath)</IncludePath>
7676
<LibraryPath>$(XEngine_Lib32);$(LibraryPath)</LibraryPath>
7777
</PropertyGroup>
7878
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">

XEngine_Source/MQCore_ProtocolModule/ProtocolModule_Packet/ProtocolModule_Packet.cpp

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,14 @@ bool CProtocolModule_Packet::ProtocolModule_Packet_Common(int nNetType, XENGINE_
7171
{
7272
ProtocolModule_Packet_TCPCommon(pSt_ProtocolHdr, pSt_MQProtocol, ptszMsgBuffer, pInt_MsgLen, lpszMsgBuffer, nMsgLen);
7373
}
74-
else
74+
else if (XENGINE_MQAPP_NETTYPE_WEBSOCKET == nNetType)
7575
{
7676
ProtocolModule_Packet_WSCommon(pSt_ProtocolHdr, pSt_MQProtocol, ptszMsgBuffer, pInt_MsgLen, lpszMsgBuffer, nMsgLen);
7777
}
78+
else
79+
{
80+
ProtocolModule_Packet_MQTTCommon(pSt_ProtocolHdr, pSt_MQProtocol, ptszMsgBuffer, pInt_MsgLen, lpszMsgBuffer, nMsgLen);
81+
}
7882
return true;
7983
}
8084
/********************************************************************
@@ -841,5 +845,59 @@ bool CProtocolModule_Packet::ProtocolModule_Packet_WSCommon(XENGINE_PROTOCOLHDR*
841845
*pInt_MsgLen = Json::writeString(st_JsonBuilder, st_JsonRoot).length();
842846
memcpy(ptszMsgBuffer, Json::writeString(st_JsonBuilder, st_JsonRoot).c_str(), *pInt_MsgLen);
843847

848+
return true;
849+
}
850+
bool CProtocolModule_Packet::ProtocolModule_Packet_MQTTCommon(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, XENGINE_PROTOCOL_XMQ* pSt_MQProtocol, XCHAR* ptszMsgBuffer, int* pInt_MsgLen, LPCXSTR lpszMsgBuffer /* = NULL */, int nMsgLen /* = 0 */)
851+
{
852+
int nRVLen = 0;
853+
int nListCount = 6;
854+
XCHAR tszRVBuffer[1024];
855+
856+
if (pSt_ProtocolHdr->unOperatorCode)
857+
{
858+
if (0 == pSt_ProtocolHdr->wReserve)
859+
{
860+
int nRVLen = 0;
861+
int nListCount = 6;
862+
XCHAR tszRVBuffer[1024];
863+
MQTTPROTOCOL_HDRPROPERTY** ppSt_HDRProperty;
864+
865+
BaseLib_OperatorMemory_Malloc((XPPPMEM)&ppSt_HDRProperty, nListCount, sizeof(MQTTPROTOCOL_HDRPROPERTY));
866+
867+
ppSt_HDRProperty[0]->nProLen = 4;
868+
ppSt_HDRProperty[0]->st_unValue.nValue = 1024000;
869+
ppSt_HDRProperty[0]->byProFlag = XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_PROPERTY_PACKMAX;
870+
871+
ppSt_HDRProperty[1]->nProLen = 1;
872+
ppSt_HDRProperty[1]->st_unValue.byValue = 1;
873+
ppSt_HDRProperty[1]->byProFlag = XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_PROPERTY_REVERAVAI;
874+
875+
ppSt_HDRProperty[2]->nProLen = 1;
876+
ppSt_HDRProperty[2]->st_unValue.byValue = 1;
877+
ppSt_HDRProperty[2]->byProFlag = XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_PROPERTY_SHAREDSUBAVAI;
878+
879+
ppSt_HDRProperty[3]->nProLen = 1;
880+
ppSt_HDRProperty[3]->st_unValue.byValue = 1;
881+
ppSt_HDRProperty[3]->byProFlag = XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_PROPERTY_SUBIDAVAI;
882+
883+
ppSt_HDRProperty[4]->nProLen = 2;
884+
ppSt_HDRProperty[4]->st_unValue.wValue = 65535;
885+
ppSt_HDRProperty[4]->byProFlag = XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_PROPERTY_ALIASMAX;
886+
887+
ppSt_HDRProperty[5]->nProLen = 1;
888+
ppSt_HDRProperty[5]->st_unValue.byValue = 1;
889+
ppSt_HDRProperty[5]->byProFlag = XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_PROPERTY_WILDCARDSUBAVAI;
890+
891+
MQTTProtocol_Packet_REPConnect(tszRVBuffer, &nRVLen, 0, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_SUCCESS, &ppSt_HDRProperty, nListCount);
892+
MQTTProtocol_Packet_Header(ptszMsgBuffer, pInt_MsgLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_CONNACK, tszRVBuffer, nRVLen);
893+
BaseLib_OperatorMemory_Free((XPPPMEM)&ppSt_HDRProperty, nListCount);
894+
}
895+
else
896+
{
897+
MQTTProtocol_Packet_REPConnect(tszRVBuffer, &nRVLen, 0, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_USERPASS);
898+
MQTTProtocol_Packet_Header(ptszMsgBuffer, pInt_MsgLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_CONNACK, tszRVBuffer, nRVLen);
899+
}
900+
}
901+
844902
return true;
845903
}

XEngine_Source/MQCore_ProtocolModule/ProtocolModule_Packet/ProtocolModule_Packet.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,6 @@ class CProtocolModule_Packet
4141
protected:
4242
bool ProtocolModule_Packet_TCPCommon(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, XENGINE_PROTOCOL_XMQ* pSt_MQProtocol, XCHAR* ptszMsgBuffer, int* pInt_MsgLen, LPCXSTR lpszMsgBuffer = NULL, int nMsgLen = 0);
4343
bool ProtocolModule_Packet_WSCommon(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, XENGINE_PROTOCOL_XMQ* pSt_MQProtocol, XCHAR* ptszMsgBuffer, int* pInt_MsgLen, LPCXSTR lpszMsgBuffer = NULL, int nMsgLen = 0);
44+
bool ProtocolModule_Packet_MQTTCommon(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, XENGINE_PROTOCOL_XMQ* pSt_MQProtocol, XCHAR* ptszMsgBuffer, int* pInt_MsgLen, LPCXSTR lpszMsgBuffer = NULL, int nMsgLen = 0);
4445
private:
4546
};

XEngine_Source/MQCore_ProtocolModule/pch.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@
2323
#include <XEngine_Include/XEngine_BaseLib/BaseLib_Error.h>
2424
#include <XEngine_Include/XEngine_Core/OPenSsl_Define.h>
2525
#include <XEngine_Include/XEngine_Core/OPenSsl_Error.h>
26+
#include <XEngine_Include/XEngine_Core/ManagePool_Define.h>
2627
#include <XEngine_Include/XEngine_HelpComponents/DataBase_Define.h>
28+
#include <XEngine_Include/XEngine_RfcComponents/MQTTProtocol_Define.h>
29+
#include <XEngine_Include/XEngine_RfcComponents/MQTTProtocol_Error.h>
2730
#include "../XQueue_ProtocolHdr.h"
2831
#include "../MQCore_DBModule/DBModule_Define.h"
2932
#include "Protocol_Define.h"
@@ -46,6 +49,7 @@ extern XLONG Protocol_dwErrorCode;
4649
#ifdef _MSC_BUILD
4750
#pragma comment(lib,"XEngine_BaseLib/XEngine_BaseLib")
4851
#pragma comment(lib,"XEngine_Core/XEngine_OPenSsl")
52+
#pragma comment(lib,"XEngine_RfcComponents/RfcComponents_MQTTProtocol.lib")
4953
#ifdef _DEBUG
5054
#ifdef _WIN64
5155
#pragma comment(lib,"../x64/Debug/jsoncpp")

XEngine_Source/XEngine_MQServiceApp/MQService_MQTTTask.cpp

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ bool MQService_MQTT_Handle(LPCXSTR lpszClientAddr, MQTTPROTOCOL_FIXEDHEADER* pSt
7777
MQTTPROTOCOL_HDRPROPERTY** ppSt_HDRProperty;
7878
MQTTPROTOCOL_HDRCONNNECT st_HDRConnect = {};
7979
MQTTPROTOCOL_USERINFO st_USerInfo = {};
80-
80+
8181
if (!MQTTProtocol_Parse_Connect(lpszMSGBuffer, nMSGLen, &st_HDRConnect, &st_USerInfo, &ppSt_HDRProperty, &nListCount))
8282
{
8383
//错误断开连接
@@ -89,14 +89,21 @@ bool MQService_MQTT_Handle(LPCXSTR lpszClientAddr, MQTTPROTOCOL_FIXEDHEADER* pSt
8989
}
9090
BaseLib_OperatorMemory_Free((XPPPMEM)&ppSt_HDRProperty, nListCount);
9191

92-
nListCount = 6;
93-
Packet_Property(&ppSt_HDRProperty, nListCount);
94-
MQTTProtocol_Packet_REPConnect(tszRVBuffer, &nRVLen, 0, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_SUCCESS, &ppSt_HDRProperty, nListCount);
95-
MQTTProtocol_Packet_Header(tszSDBuffer, &nSDLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_CONNACK, tszRVBuffer, nRVLen);
96-
97-
BaseLib_OperatorMemory_Free((XPPPMEM)&ppSt_HDRProperty, nListCount);
98-
XEngine_MQXService_Send(lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_MQTT);
99-
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("MQTT客户端:%s,请求链接成功,客户端ID:%s,用户名:%s"), lpszClientAddr, st_USerInfo.tszClientID, st_USerInfo.tszClientUser);
92+
XENGINE_PROTOCOLHDR st_ProtocolHdr = {};
93+
XENGINE_PROTOCOL_USERAUTH st_ProtocolAuth = {};
94+
st_ProtocolHdr.wHeader = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_HEADER;
95+
st_ProtocolHdr.unOperatorType = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_AUTH;
96+
st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQUSERLOG;
97+
st_ProtocolHdr.unPacketSize = sizeof(XENGINE_PROTOCOL_USERAUTH);
98+
st_ProtocolHdr.wTail = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_TAIL;
99+
100+
st_ProtocolAuth.enClientType = ENUM_PROTOCOL_FOR_SERVICE_TYPE_USER;
101+
st_ProtocolAuth.enDeviceType = ENUM_PROTOCOL_FOR_DEVICE_TYPE_MOBILE_EMBEDDED;
102+
_tcsxcpy(st_ProtocolAuth.tszUserName, st_USerInfo.tszClientUser);
103+
_tcsxcpy(st_ProtocolAuth.tszUserPass, st_USerInfo.tszClientPass);
104+
105+
MessageQueue_TCP_Handle(&st_ProtocolHdr, lpszClientAddr, (LPCXSTR)&st_ProtocolAuth, sizeof(XENGINE_PROTOCOL_USERAUTH), XENGINE_MQAPP_NETTYPE_MQTT);
106+
//XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("MQTT客户端:%s,请求链接成功,客户端ID:%s,用户名:%s"), lpszClientAddr, st_USerInfo.tszClientID, st_USerInfo.tszClientUser);
100107
}
101108
else if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_SUBSCRIBE == pSt_MQTTHdr->byMsgType)
102109
{

XEngine_Source/XEngine_MQServiceApp/MQService_Net.cpp

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,26 +97,38 @@ void XEngine_MQXService_Close(LPCXSTR lpszClientAddr, int nIPProto, bool bHeart)
9797
{
9898
if (XENGINE_MQAPP_NETTYPE_TCP == nIPProto)
9999
{
100-
HelpComponents_Datas_DeleteEx(xhTCPPacket, lpszClientAddr);
101-
NetCore_TCPXCore_CloseForClientEx(xhTCPSocket, lpszClientAddr);
100+
HelpComponents_Datas_DeleteEx(xhTCPPacket, lpszClientAddr);
101+
if (bHeart)
102+
{
103+
NetCore_TCPXCore_CloseForClientEx(xhTCPSocket, lpszClientAddr);
104+
}
102105
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("TCP客户端离开,TCP客户端地址:%s"), lpszClientAddr);
103106
}
104107
else if (XENGINE_MQAPP_NETTYPE_WEBSOCKET == nIPProto)
105108
{
106109
RfcComponents_WSPacket_DeleteEx(xhWSPacket, lpszClientAddr);
107-
NetCore_TCPXCore_CloseForClientEx(xhWSSocket, lpszClientAddr);
110+
if (bHeart)
111+
{
112+
NetCore_TCPXCore_CloseForClientEx(xhWSSocket, lpszClientAddr);
113+
}
108114
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("Websocket客户端离开,Websocket客户端地址:%s"), lpszClientAddr);
109115
}
110116
else if (XENGINE_MQAPP_NETTYPE_HTTP == nIPProto)
111117
{
112118
HttpProtocol_Server_CloseClinetEx(xhHTTPPacket, lpszClientAddr);
113-
NetCore_TCPXCore_CloseForClientEx(xhHTTPSocket, lpszClientAddr);
119+
if (bHeart)
120+
{
121+
NetCore_TCPXCore_CloseForClientEx(xhHTTPSocket, lpszClientAddr);
122+
}
114123
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("HTTP客户端离开,HTTP客户端地址:%s"), lpszClientAddr);
115124
}
116125
else
117126
{
118127
MQTTProtocol_Parse_Delete(lpszClientAddr);
119-
NetCore_TCPXCore_CloseForClientEx(xhTCPSocket, lpszClientAddr);
128+
if (bHeart)
129+
{
130+
NetCore_TCPXCore_CloseForClientEx(xhMQTTSocket, lpszClientAddr);
131+
}
120132
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("MQTT客户端离开,MQTT客户端地址:%s"), lpszClientAddr);
121133
}
122134
XENGINE_PROTOCOL_USERINFO st_UserInfo;

XEngine_Source/XEngine_MQServiceApp/MQService_TCPTask.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,14 @@ bool MessageQueue_TCP_Handle(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, LPCXSTR lpszC
4545
{
4646
lpszClientType = _X("TCP");
4747
}
48-
else
48+
else if (XENGINE_MQAPP_NETTYPE_WEBSOCKET == nNetType)
4949
{
5050
lpszClientType = _X("WEBSOCKET");
5151
}
52+
else
53+
{
54+
lpszClientType = _X("MQTT");
55+
}
5256

5357
if (ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_HEARTBEAT == pSt_ProtocolHdr->unOperatorType)
5458
{

0 commit comments

Comments
 (0)