Skip to content

Commit 7bb856e

Browse files
authored
Merge pull request #13 from libxengine/develop
V3.6.0.1001 Merge
2 parents 8a85438 + ade8772 commit 7bb856e

40 files changed

+902
-851
lines changed

CHANGELOG

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,39 @@
1+
XEngine_MQService V3.6.0.1001
2+
3+
增加:主题所有者数据支持
4+
增加:为发布表增加时间字段
5+
修改:匹配XEngine到V7.48
6+
修改:消息数据库增加用户字段用于消息验证
7+
修改:主题修改需要验证用户了
8+
修改:消息修改需要验证用户
9+
修改:修改主题定时表和key和所有者表现在会一起修改了
10+
修改:改编表名称也会改变字段名了
11+
修改:删除消息表也会删除userkey和time表相对应的数据了
12+
修改:请求序列协议修改为绑定主题协议功能
13+
修改:协议名称统一
14+
修改:定时发布任务现在不会发布给自己了
15+
修改:获取消息会跳过超时和定时发布的消息了
16+
修复:错误码不正确的问题
17+
修复:未读消息初始化错误
18+
删除:会话通知代码
19+
20+
added:topic owner
21+
added:create time for publish time
22+
modify:match xengine v7.48
23+
modify:message database added user field and use to message verification
24+
modify:topic name need ver user
25+
modify:message modify need to ver user
26+
modify:bucket name for timerelease and key and owner
27+
modify:change table name that to be change field name
28+
modify:can be delete userkey and timerelease datas when delete table
29+
modify:req serial protocol to bind topic
30+
modify:protocol name
31+
modify:release time task does not to notify user
32+
modify:break time task and left task when get message
33+
fixed:error code is incorrent
34+
fixed:unread memset is incorrent
35+
delete:unused notify code
36+
======================================================================================
137
XEngine_MQService V3.5.0.1001
238

339
增加:未读消息协议处理支持

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ MQTT支持
111111
支持时区设置
112112
完善消息订阅(永存)
113113
完善HTTP_CALL
114+
允许主题解除绑定
115+
允许删除主题
114116

115117
## 提交问题
116118

XEngine_Apps/MQCore_HTTPApp/MQCore_HTTPApp.cpp

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
#include <json/json.h>
55
#pragma comment(lib,"Ws2_32")
66
#pragma comment(lib,"x86/XEngine_BaseLib/XEngine_BaseLib")
7-
#pragma comment(lib,"x86/XEngine_NetHelp/NetHelp_APIHelp")
7+
#pragma comment(lib,"x86/XEngine_NetHelp/NetHelp_APIClient")
88
#pragma comment(lib,"../../XEngine_Source/Debug/jsoncpp")
99
#else
1010
#include <stdio.h>
@@ -16,11 +16,11 @@
1616
#include <XEngine_Include/XEngine_ProtocolHdr.h>
1717
#include <XEngine_Include/XEngine_BaseLib/BaseLib_Define.h>
1818
#include <XEngine_Include/XEngine_BaseLib/BaseLib_Error.h>
19-
#include <XEngine_Include/XEngine_NetHelp/APIHelp_Define.h>
20-
#include <XEngine_Include/XEngine_NetHelp/APIHelp_Error.h>
19+
#include <XEngine_Include/XEngine_NetHelp/APIClient_Define.h>
20+
#include <XEngine_Include/XEngine_NetHelp/APIClient_Error.h>
2121
#include "../../XEngine_Source/XQueue_ProtocolHdr.h"
2222

23-
//g++ -std=c++17 -Wall -g MQCore_HTTPApp.cpp -o MQCore_HTTPApp.exe -I ../../XEngine_Source/XEngine_ThirdPart/jsoncpp -L /usr/local/lib/XEngine_Release/XEngine_BaseLib -L /usr/local/lib/XEngine_Release/XEngine_NetHelp -L ../../XEngine_Source/XEngine_ThirdPart/jsoncpp -lXEngine_BaseLib -lNetHelp_APIHelp -ljsoncpp
23+
//g++ -std=c++17 -Wall -g MQCore_HTTPApp.cpp -o MQCore_HTTPApp.exe -I ../../XEngine_Source/XEngine_ThirdPart/jsoncpp -L /usr/local/lib/XEngine_Release/XEngine_BaseLib -L /usr/local/lib/XEngine_Release/XEngine_NetHelp -L ../../XEngine_Source/XEngine_ThirdPart/jsoncpp -lXEngine_BaseLib -lNetHelp_APIClient -ljsoncpp
2424

2525
SOCKET m_Socket;
2626
XNETHANDLE xhToken = 0;
@@ -48,7 +48,7 @@ void MQ_Authorize()
4848
memcpy(tszMsgBuffer, st_JsonRoot.toStyledString().c_str(), nLen);
4949

5050
TCHAR* ptszMsgBody = NULL;
51-
if (!APIHelp_HttpRequest_Custom(_T("POST"), lpszPostUrl, tszMsgBuffer, NULL, &ptszMsgBody, &nLen))
51+
if (!APIClient_Http_Request(_T("POST"), lpszPostUrl, tszMsgBuffer, NULL, &ptszMsgBody, &nLen))
5252
{
5353
printf("发送投递失败!\n");
5454
return;
@@ -89,12 +89,12 @@ void MQ_UNRead()
8989
memcpy(tszMsgBuffer, st_JsonRoot.toStyledString().c_str(), nLen);
9090

9191
TCHAR* ptszMsgBody = NULL;
92-
if (!APIHelp_HttpRequest_Custom(_T("POST"), lpszPostUrl, tszMsgBuffer, NULL, &ptszMsgBody))
92+
if (!APIClient_Http_Request(_T("POST"), lpszPostUrl, tszMsgBuffer, NULL, &ptszMsgBody))
9393
{
9494
printf("发送投递失败!\n");
9595
return;
9696
}
97-
printf("MQ_Create:%s\n", ptszMsgBody);
97+
printf("MQ_UNRead:%s\n", ptszMsgBody);
9898
BaseLib_OperatorMemory_FreeCStyle((XPPMEM)&ptszMsgBody);
9999
}
100100
void MQ_Create()
@@ -106,7 +106,7 @@ void MQ_Create()
106106
Json::Value st_JsonRoot;
107107
Json::Value st_JsonMQProtocol;
108108
st_JsonRoot["unOperatorType"] = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
109-
st_JsonRoot["unOperatorCode"] = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQCREATE;
109+
st_JsonRoot["unOperatorCode"] = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQTOPICCREATE;
110110
st_JsonRoot["byVersion"] = ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_JSON;
111111
st_JsonRoot["xhToken"] = xhToken;
112112

@@ -117,7 +117,7 @@ void MQ_Create()
117117
memcpy(tszMsgBuffer, st_JsonRoot.toStyledString().c_str(), nLen);
118118

119119
TCHAR* ptszMsgBody = NULL;
120-
if (!APIHelp_HttpRequest_Custom(_T("POST"), lpszPostUrl, tszMsgBuffer, NULL, &ptszMsgBody))
120+
if (!APIClient_Http_Request(_T("POST"), lpszPostUrl, tszMsgBuffer, NULL, &ptszMsgBody))
121121
{
122122
printf("发送投递失败!\n");
123123
return;
@@ -156,7 +156,7 @@ void MQ_Post(LPCTSTR lpszMsgBuffer)
156156
memcpy(tszMsgBuffer, st_JsonRoot.toStyledString().c_str(), nLen);
157157

158158
TCHAR* ptszMsgBody = NULL;
159-
if (!APIHelp_HttpRequest_Custom(_T("POST"), lpszPostUrl, tszMsgBuffer, NULL, &ptszMsgBody))
159+
if (!APIClient_Http_Request(_T("POST"), lpszPostUrl, tszMsgBuffer, NULL, &ptszMsgBody))
160160
{
161161
printf("发送投递失败!\n");
162162
return;
@@ -165,7 +165,7 @@ void MQ_Post(LPCTSTR lpszMsgBuffer)
165165
BaseLib_OperatorMemory_FreeCStyle((XPPMEM)&ptszMsgBody);
166166
}
167167

168-
void MQ_GetNumber()
168+
void MQ_BindTopic()
169169
{
170170
int nLen = 0;
171171
TCHAR tszMsgBuffer[2048];
@@ -175,18 +175,19 @@ void MQ_GetNumber()
175175
Json::Value st_JsonMQProtocol;
176176
Json::Value st_JsonPayload;
177177
st_JsonRoot["unOperatorType"] = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
178-
st_JsonRoot["unOperatorCode"] = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQNUMBER;
178+
st_JsonRoot["unOperatorCode"] = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQTOPICBIND;
179179
st_JsonRoot["byVersion"] = ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_JSON;
180180
st_JsonRoot["xhToken"] = xhToken;
181181

182+
st_JsonMQProtocol["nSerial"] = 1; //设置为1开始读取
182183
st_JsonMQProtocol["tszMQKey"] = lpszKey;
183184
st_JsonRoot["st_MQProtocol"] = st_JsonMQProtocol;
184185

185186
nLen = st_JsonRoot.toStyledString().length();
186187
memcpy(tszMsgBuffer, st_JsonRoot.toStyledString().c_str(), nLen);
187188

188189
TCHAR* ptszMsgBody = NULL;
189-
if (!APIHelp_HttpRequest_Custom(_T("POST"), lpszPostUrl, tszMsgBuffer, NULL, &ptszMsgBody))
190+
if (!APIClient_Http_Request(_T("POST"), lpszPostUrl, tszMsgBuffer, NULL, &ptszMsgBody))
190191
{
191192
printf("发送投递失败!\n");
192193
return;
@@ -209,15 +210,14 @@ void MQ_Get()
209210
st_JsonRoot["xhToken"] = xhToken;
210211

211212
st_JsonMQProtocol["tszMQKey"] = lpszKey;
212-
st_JsonMQProtocol["nSerial"] = 1;
213213

214214
st_JsonRoot["st_MQProtocol"] = st_JsonMQProtocol;
215215

216216
nLen = st_JsonRoot.toStyledString().length();
217217
memcpy(tszMsgBuffer, st_JsonRoot.toStyledString().c_str(), nLen);
218218

219219
TCHAR* ptszMsgBody = NULL;
220-
if (!APIHelp_HttpRequest_Custom(_T("POST"), lpszPostUrl, tszMsgBuffer, NULL, &ptszMsgBody))
220+
if (!APIClient_Http_Request(_T("POST"), lpszPostUrl, tszMsgBuffer, NULL, &ptszMsgBody))
221221
{
222222
printf("发送投递失败!\n");
223223
return;
@@ -240,7 +240,7 @@ void MQ_ModifyMsg()
240240
st_JsonRoot["xhToken"] = xhToken;
241241

242242
st_JsonMQProtocol["tszMQKey"] = lpszKey;
243-
st_JsonMQProtocol["nSerial"] = 1; //序列号,0服务会自动处理
243+
st_JsonMQProtocol["nSerial"] = 1; //序列号
244244
st_JsonMQProtocol["nKeepTime"] = -1;
245245
st_JsonMQProtocol["nGetTimer"] = 0;
246246

@@ -257,7 +257,7 @@ void MQ_ModifyMsg()
257257
memcpy(tszMsgBuffer, st_JsonRoot.toStyledString().c_str(), nLen);
258258

259259
TCHAR* ptszMsgBody = NULL;
260-
if (!APIHelp_HttpRequest_Custom(_T("POST"), lpszPostUrl, tszMsgBuffer, NULL, &ptszMsgBody))
260+
if (!APIClient_Http_Request(_T("POST"), lpszPostUrl, tszMsgBuffer, NULL, &ptszMsgBody))
261261
{
262262
printf("发送投递失败!\n");
263263
return;
@@ -292,7 +292,7 @@ void MQ_ModifyTopic()
292292
memcpy(tszMsgBuffer, st_JsonRoot.toStyledString().c_str(), nLen);
293293

294294
TCHAR* ptszMsgBody = NULL;
295-
if (!APIHelp_HttpRequest_Custom(_T("POST"), lpszPostUrl, tszMsgBuffer, NULL, &ptszMsgBody))
295+
if (!APIClient_Http_Request(_T("POST"), lpszPostUrl, tszMsgBuffer, NULL, &ptszMsgBody))
296296
{
297297
printf("发送投递失败!\n");
298298
return;
@@ -310,7 +310,7 @@ void MQ_Delete()
310310
Json::Value st_JsonMQProtocol;
311311
Json::Value st_JsonPayload;
312312
st_JsonRoot["unOperatorType"] = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
313-
st_JsonRoot["unOperatorCode"] = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQDELETE;
313+
st_JsonRoot["unOperatorCode"] = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQTOPICDELETE;
314314
st_JsonRoot["byVersion"] = ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_JSON;
315315
st_JsonRoot["xhToken"] = xhToken;
316316

@@ -321,7 +321,7 @@ void MQ_Delete()
321321
memcpy(tszMsgBuffer, st_JsonRoot.toStyledString().c_str(), nLen);
322322

323323
TCHAR* ptszMsgBody = NULL;
324-
if (!APIHelp_HttpRequest_Custom(_T("POST"), lpszPostUrl, tszMsgBuffer, NULL, &ptszMsgBody))
324+
if (!APIClient_Http_Request(_T("POST"), lpszPostUrl, tszMsgBuffer, NULL, &ptszMsgBody))
325325
{
326326
printf("发送投递失败!\n");
327327
return;
@@ -341,7 +341,7 @@ int main()
341341
MQ_UNRead();
342342
MQ_Create();
343343
MQ_Post("123hello");
344-
MQ_GetNumber();
344+
MQ_BindTopic();
345345
MQ_Get();
346346
MQ_ModifyMsg();
347347
MQ_ModifyTopic();

XEngine_Apps/MQCore_TCPApp/MQCore_TCPApp.cpp

Lines changed: 11 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ void MQ_Create()
184184

185185
st_ProtocolHdr.wHeader = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_HEADER;
186186
st_ProtocolHdr.unOperatorType = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
187-
st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQCREATE;
187+
st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQTOPICCREATE;
188188
st_ProtocolHdr.byVersion = 1;
189189
st_ProtocolHdr.byIsReply = TRUE; //获得处理返回结果
190190
st_ProtocolHdr.wTail = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_TAIL;
@@ -369,6 +369,12 @@ void MQ_TimePublish()
369369
printf("接受数据失败!\n");
370370
return;
371371
}
372+
memset(tszMsgBuffer, '\0', sizeof(tszMsgBuffer));
373+
memset(&st_XMQProtocol, '\0', sizeof(XENGINE_PROTOCOL_XMQ));
374+
375+
memcpy(&st_XMQProtocol, ptszMsgBuffer, sizeof(st_XMQProtocol));
376+
377+
printf("接受到通知消息,主题:%s,序列:%lld,长度:%d,内容:%s\n", st_XMQProtocol.tszMQKey, st_XMQProtocol.nSerial, st_ProtocolHdr.unPacketSize - sizeof(XENGINE_PROTOCOL_XMQ), ptszMsgBuffer + sizeof(XENGINE_PROTOCOL_XMQ));
372378
BaseLib_OperatorMemory_FreeCStyle((XPPMEM)&ptszMsgBuffer);
373379
}
374380

@@ -425,7 +431,7 @@ void MQ_GetNumber()
425431
}
426432
}
427433
}
428-
void MQ_GetSerial()
434+
void MQ_BindTopic()
429435
{
430436
int nLen = 0;
431437
XENGINE_PROTOCOLHDR st_ProtocolHdr;
@@ -438,7 +444,7 @@ void MQ_GetSerial()
438444

439445
st_ProtocolHdr.wHeader = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_HEADER;
440446
st_ProtocolHdr.unOperatorType = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
441-
st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQSERIAL;
447+
st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQTOPICBIND;
442448
st_ProtocolHdr.byVersion = 1;
443449
st_ProtocolHdr.byIsReply = TRUE;
444450
st_ProtocolHdr.wTail = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_TAIL;
@@ -493,7 +499,7 @@ void MQ_DeleteTopic()
493499

494500
st_ProtocolHdr.wHeader = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_HEADER;
495501
st_ProtocolHdr.unOperatorType = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
496-
st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQDELETE;
502+
st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQTOPICDELETE;
497503
st_ProtocolHdr.byVersion = 1;
498504
st_ProtocolHdr.byIsReply = TRUE; //不获取结果
499505
st_ProtocolHdr.wTail = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_TAIL;
@@ -512,47 +518,6 @@ void MQ_DeleteTopic()
512518
}
513519
}
514520

515-
//订阅
516-
void MQ_Subscribe()
517-
{
518-
int nLen = 0;
519-
XENGINE_PROTOCOLHDR st_ProtocolHdr;
520-
XENGINE_PROTOCOL_XMQ st_XMQProtocol;
521-
TCHAR tszMsgBuffer[2048];
522-
523-
memset(tszMsgBuffer, '\0', sizeof(tszMsgBuffer));
524-
memset(&st_ProtocolHdr, '\0', sizeof(XENGINE_PROTOCOLHDR));
525-
memset(&st_XMQProtocol, '\0', sizeof(XENGINE_PROTOCOL_XMQ));
526-
527-
st_ProtocolHdr.wHeader = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_HEADER;
528-
st_ProtocolHdr.unOperatorType = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
529-
st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQNOTIFY;
530-
st_ProtocolHdr.byVersion = 1;
531-
st_ProtocolHdr.wReserve = 1; //1为请求订阅
532-
st_ProtocolHdr.byIsReply = TRUE;
533-
st_ProtocolHdr.wTail = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_TAIL;
534-
535-
st_ProtocolHdr.unPacketSize = sizeof(XENGINE_PROTOCOL_XMQ);
536-
strcpy(st_XMQProtocol.tszMQKey, lpszKey);
537-
538-
nLen = sizeof(XENGINE_PROTOCOLHDR) + st_ProtocolHdr.unPacketSize;
539-
memcpy(tszMsgBuffer, &st_ProtocolHdr, sizeof(XENGINE_PROTOCOLHDR));
540-
memcpy(tszMsgBuffer + sizeof(XENGINE_PROTOCOLHDR), &st_XMQProtocol, sizeof(XENGINE_PROTOCOL_XMQ));
541-
542-
if (!XClient_TCPSelect_SendMsg(m_Socket, tszMsgBuffer, nLen))
543-
{
544-
printf("发送投递失败!\n");
545-
return;
546-
}
547-
nLen = 0;
548-
CHAR* ptszMsgBuffer = NULL;
549-
if (!XClient_TCPSelect_RecvPkt(m_Socket, &ptszMsgBuffer, &nLen, &st_ProtocolHdr))
550-
{
551-
printf("接受数据失败!\n");
552-
return;
553-
}
554-
BaseLib_OperatorMemory_FreeCStyle((XPPMEM)&ptszMsgBuffer);
555-
}
556521
int main(int argc, char** argv)
557522
{
558523
#ifdef _WINDOWS
@@ -575,12 +540,11 @@ int main(int argc, char** argv)
575540
{
576541
MQ_Post(lpszMsgBuffer);
577542
}
578-
MQ_GetSerial();
579543
MQ_GetNumber();
544+
MQ_BindTopic();
580545
MQ_Get();
581546
MQ_Get();
582547
MQ_Get();
583-
MQ_Subscribe();
584548
MQ_TimePublish();
585549
MQ_DeleteTopic();
586550
MQ_DeleteUser();

0 commit comments

Comments
 (0)