Skip to content

Commit f3fb460

Browse files
committed
update:mq example
1 parent 8625f78 commit f3fb460

File tree

4 files changed

+89
-14
lines changed

4 files changed

+89
-14
lines changed

XEngine_Apps/MQCore_HTTPApp/MQCore_HTTPApp.cpp

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
#include <XEngine_Include/XEngine_NetHelp/APIClient_Error.h>
2121
#include "../../XEngine_Source/XQueue_ProtocolHdr.h"
2222

23-
//g++ -std=c++17 -Wall -g MQCore_HTTPApp.cpp -o MQCore_HTTPApp.exe -I ../../XEngine_Source/XEngine_ThirdPart/jsoncpp -L /usr/local/lib/XEngine_Release/XEngine_BaseLib -L /usr/local/lib/XEngine_Release/XEngine_NetHelp -L ../../XEngine_Source/XEngine_ThirdPart/jsoncpp -lXEngine_BaseLib -lNetHelp_APIClient -ljsoncpp -Wl,-rpath=../../XEngine_Source/XEngine_ThirdPart/jsoncpp,--disable-new-dtags
23+
//g++ -std=c++17 -Wall -g MQCore_HTTPApp.cpp -o MQCore_HTTPApp.exe -I ../../XEngine_Source/XEngine_ThirdPart/jsoncpp -L /usr/local/lib/XEngine_Release/XEngine_BaseLib -L /usr/local/lib/XEngine_Release/XEngine_NetHelp -L ../../XEngine_Source/XEngine_ThirdPart/jsoncpp -lXEngine_BaseLib -lNetHelp_APIClient -ljsoncpp
2424
void MQ_GetUserList()
2525
{
2626
LPCXSTR lpszPostUrl = _X("http://127.0.0.1:5202/api?function=get&method=user");
@@ -35,6 +35,20 @@ void MQ_GetUserList()
3535
_xtprintf("MQ_GetUserList:%s\n", ptszMsgBody);
3636
BaseLib_OperatorMemory_FreeCStyle((XPPMEM)&ptszMsgBody);
3737
}
38+
void MQ_GetOnlineList()
39+
{
40+
LPCXSTR lpszPostUrl = _X("http://127.0.0.1:5202/api?function=get&method=online&type=0");
41+
42+
int nLen = 0;
43+
XCHAR* ptszMsgBody = NULL;
44+
if (!APIClient_Http_Request(_X("GET"), lpszPostUrl, NULL, NULL, &ptszMsgBody, &nLen))
45+
{
46+
_xtprintf("发送投递失败!\n");
47+
return;
48+
}
49+
_xtprintf("MQ_GetOnlineList:%s\n", ptszMsgBody);
50+
BaseLib_OperatorMemory_FreeCStyle((XPPMEM)&ptszMsgBody);
51+
}
3852
void MQ_GetTopicList()
3953
{
4054
LPCXSTR lpszPostUrl = _X("http://127.0.0.1:5202/api?function=get&method=topic");
@@ -58,6 +72,7 @@ int main()
5872
#endif
5973

6074
MQ_GetUserList();
75+
MQ_GetOnlineList();
6176
MQ_GetTopicList();
6277

6378
#ifdef _MSC_BUILD

XEngine_Apps/MQCore_TCPApp/MQCore_TCPApp.cpp

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,58 @@ void MQ_Authorize()
121121
BaseLib_OperatorMemory_FreeCStyle((XPPMEM)&ptszMsgBuffer);
122122
}
123123
}
124+
//获取未读消息
125+
void MQ_GetUNRead(int nType = 0)
126+
{
127+
int nLen = 0;
128+
XENGINE_PROTOCOLHDR st_ProtocolHdr;
129+
XENGINE_PROTOCOL_XMQ st_XMQProtocol;
130+
XCHAR tszMsgBuffer[2048];
131+
132+
memset(tszMsgBuffer, '\0', sizeof(tszMsgBuffer));
133+
memset(&st_ProtocolHdr, '\0', sizeof(XENGINE_PROTOCOLHDR));
134+
memset(&st_XMQProtocol, '\0', sizeof(XENGINE_PROTOCOL_XMQ));
135+
136+
st_ProtocolHdr.wHeader = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_HEADER;
137+
st_ProtocolHdr.unOperatorType = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
138+
st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQUNREAD;
139+
st_ProtocolHdr.byVersion = 1;
140+
st_ProtocolHdr.byIsReply = true; //必须为真
141+
st_ProtocolHdr.wTail = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_TAIL;
142+
st_ProtocolHdr.unPacketSize = sizeof(XENGINE_PROTOCOL_XMQ);
143+
144+
nLen = sizeof(XENGINE_PROTOCOLHDR) + st_ProtocolHdr.unPacketSize;
145+
memcpy(tszMsgBuffer, &st_ProtocolHdr, sizeof(XENGINE_PROTOCOLHDR));
146+
memcpy(tszMsgBuffer + sizeof(XENGINE_PROTOCOLHDR), &st_XMQProtocol, sizeof(XENGINE_PROTOCOL_XMQ));
147+
148+
if (!XClient_TCPSelect_SendMsg(m_Socket, tszMsgBuffer, nLen))
149+
{
150+
_xtprintf("发送投递失败!\n");
151+
return;
152+
}
153+
154+
while (true)
155+
{
156+
nLen = 0;
157+
XCHAR* ptszMsgBuffer;
158+
159+
memset(&st_ProtocolHdr, '\0', sizeof(XENGINE_PROTOCOLHDR));
160+
if (XClient_TCPSelect_RecvPkt(m_Socket, &ptszMsgBuffer, &nLen, &st_ProtocolHdr))
161+
{
162+
if (0 == st_ProtocolHdr.wReserve)
163+
{
164+
_xtprintf("接受到数据,长度:%d,内容:%s\n", st_ProtocolHdr.unPacketSize, ptszMsgBuffer);
165+
}
166+
else
167+
{
168+
_xtprintf("获取消息队列数据失败,错误码:%d\n", st_ProtocolHdr.wReserve);
169+
}
170+
BaseLib_OperatorMemory_FreeCStyle((XPPMEM)&ptszMsgBuffer);
171+
break;
172+
}
173+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
174+
}
175+
}
124176
//删除用户
125177
void MQ_DeleteUser()
126178
{
@@ -222,7 +274,7 @@ typedef struct
222274
int a;
223275
int b;
224276
}XENGINE_SQLBINARY;
225-
void MQ_Post(LPCXSTR lpszMsgBuffer, int nType = 0)
277+
void MQ_Post(LPCXSTR lpszMsgBuffer, int nType = 0, int nPubTime = -1, bool bSelf = false)
226278
{
227279
int nLen = 0;
228280
XENGINE_PROTOCOLHDR st_ProtocolHdr;
@@ -240,9 +292,11 @@ void MQ_Post(LPCXSTR lpszMsgBuffer, int nType = 0)
240292
st_ProtocolHdr.wTail = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_TAIL;
241293

242294
st_XMQProtocol.nSerial = 0; //序列号,0服务会自动处理
243-
st_XMQProtocol.nKeepTime = -1; //保存时间,单位秒,如果为0,获取一次后被抛弃。-1 永久存在,PacketKey不能为空
295+
st_XMQProtocol.nKeepTime = 0;
296+
st_XMQProtocol.nPubTime = nPubTime;
244297
strcpy(st_XMQProtocol.tszMQKey, lpszKey);
245298

299+
st_XMQProtocol.st_MSGAttr.byAttrSelf = bSelf;
246300
if (ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_BIN == nType)
247301
{
248302
XENGINE_SQLBINARY st_SQLBinary;
@@ -379,6 +433,7 @@ void MQ_TimePublish()
379433
st_ProtocolHdr.unPacketSize = sizeof(XENGINE_PROTOCOL_XMQ) + _tcsxlen(lpszMsgBuffer);
380434
st_XMQProtocol.nSerial = 0; //要获取的序列号,如果为0,服务会自动处理
381435
st_XMQProtocol.nPubTime = time(NULL) + 30; //当前时间+60秒
436+
st_XMQProtocol.st_MSGAttr.byAttrSelf = 1;
382437
strcpy(st_XMQProtocol.tszMQKey, lpszKey);
383438

384439
nLen = sizeof(XENGINE_PROTOCOLHDR) + st_ProtocolHdr.unPacketSize;
@@ -571,14 +626,16 @@ int main(int argc, char** argv)
571626

572627
MQ_Register();
573628
MQ_Authorize();
629+
MQ_GetUNRead();
574630
MQ_Create();
575-
MQ_Post(NULL, ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_BIN);
631+
MQ_Post(NULL, ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_BIN, -1, true);
576632
MQ_BindTopic();
577633
MQ_Get(ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_BIN);
578634

635+
MQ_Post(lpszMsgBuffer, ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_STRING, 0);
579636
for (int i = 0; i < 10; i++)
580637
{
581-
MQ_Post(lpszMsgBuffer, ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_STRING);
638+
MQ_Post(lpszMsgBuffer, ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_STRING, -1, true);
582639
}
583640
MQ_GetNumber();
584641
MQ_Get(ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_STRING);

XEngine_Apps/MQCore_WSApp/MQCore_WSApp.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
#include <XEngine_Include/XEngine_RfcComponents/WSProtocol_Error.h>
2222
#include "../../XEngine_Source/XQueue_ProtocolHdr.h"
2323

24-
//g++ -std=c++17 -Wall -g MQCore_WSApp.cpp -o MQCore_WSApp.exe -I ../../XEngine_Source/XEngine_ThirdPart/jsoncpp -L /usr/local/lib/XEngine_Release/XEngine_BaseLib -L /usr/local/lib/XEngine_Release/XEngine_Client -L /usr/local/lib/XEngine_Release/XEngine_RfcComponents -L ../../XEngine_Source/XEngine_ThirdPart/jsoncpp -lXEngine_BaseLib -lXEngine_Algorithm -lXClient_Socket -lRfcComponents_WSProtocol -ljsoncpp -Wl,-rpath=../../XEngine_Source/XEngine_ThirdPart/jsoncpp,--disable-new-dtags
24+
//g++ -std=c++17 -Wall -g MQCore_WSApp.cpp -o MQCore_WSApp.exe -I ../../XEngine_Source/XEngine_ThirdPart/jsoncpp -L /usr/local/lib/XEngine_Release/XEngine_BaseLib -L /usr/local/lib/XEngine_Release/XEngine_Client -L /usr/local/lib/XEngine_Release/XEngine_RfcComponents -L ../../XEngine_Source/XEngine_ThirdPart/jsoncpp -lXEngine_BaseLib -lXEngine_Algorithm -lXClient_Socket -lRfcComponents_WSProtocol -ljsoncpp
2525

2626
XSOCKET m_Socket;
2727
LPCXSTR lpszKey = _X("XEngine_Notify"); //主题
@@ -149,10 +149,19 @@ void MQ_Post(LPCXSTR lpszMsgBuffer)
149149
st_JsonRoot["byVersion"] = ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_BIN;
150150
st_JsonRoot["byIsReply"] = 1;
151151

152+
XSHOT nMSGAttr = 0;
153+
XENGINE_PROTOCOL_MSGATTR st_MSGAttr;
154+
memset(&st_MSGAttr, '\0', sizeof(XENGINE_PROTOCOL_MSGATTR));
155+
156+
st_MSGAttr.byAttrSelf = 1;
157+
memcpy(&nMSGAttr, &st_MSGAttr, sizeof(XENGINE_PROTOCOL_MSGATTR));
158+
152159
st_JsonMQProtocol["tszMQKey"] = lpszKey;
153160
st_JsonMQProtocol["nSerial"] = 0; //序列号,0服务会自动处理
154-
st_JsonMQProtocol["nKeepTime"] = -1; //保存时间,单位秒,如果为0,获取一次后被抛弃。-1 永久存在,PacketKey不能为空
161+
st_JsonMQProtocol["nKeepTime"] = 0; //生效时间
162+
st_JsonMQProtocol["nPubTime"] = -1;
155163
st_JsonMQProtocol["nGetTimer"] = 0;
164+
st_JsonMQProtocol["nMSGAttr"] = nMSGAttr;
156165

157166
st_JsonPayload["nPayLen"] = (Json::Value::UInt)strlen(lpszMsgBuffer);
158167
st_JsonPayload["tszPayData"] = lpszMsgBuffer;

XEngine_Apps/VSCopy-x86.bat

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,4 @@ copy /y "%XEngine_Lib32%\XEngine_Core\XEngine_OPenSsl.dll" "./"
44
copy /y "%XEngine_Lib32%\XEngine_Core\XEngine_ManagePool.dll" "./"
55
copy /y "%XEngine_Lib32%\XEngine_RfcComponents\RfcComponents_WSProtocol.dll" "./"
66
copy /y "%XEngine_Lib32%\XEngine_NetHelp\NetHelp_APIClient.dll" "./"
7-
copy /y "%XEngine_Lib32%\XEngine_Client\XClient_Socket.dll" "./"
8-
9-
copy /y "%XEngine_Lib32%\XEngine_LibEx\libcrypto-3.dll" "./"
10-
copy /y "%XEngine_Lib32%\XEngine_LibEx\libssl-3.dll" "./"
11-
copy /y "%XEngine_Lib32%\XEngine_NetHelp\libcurl.dll" "./"
12-
copy /y "%XEngine_Lib32%\XEngine_NetHelp\nghttp2.dll" "./"
13-
copy /y "%XEngine_Lib32%\XEngine_HelpComponents\zlib1.dll" "./"
7+
copy /y "%XEngine_Lib32%\XEngine_Client\XClient_Socket.dll" "./"

0 commit comments

Comments
 (0)