Skip to content

Commit af83ced

Browse files
authored
Merge pull request #4 from libxengine/develop
V1.5版本合并
2 parents a2983ea + 82e6327 commit af83ced

33 files changed

+992
-153
lines changed

README.en.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
[中文](README.md) || [English](README.en.md)
12
# XEngine_MQService
3+
This repository has a development and master branch. If you want to use it, please use the master branch
24

35
#### Description
46
c c++消息队列服务 消息队列中间件
@@ -23,7 +25,7 @@ this software support following features
2325
10. support Permission Validation(planning)
2426
11. support multi protocol(TCP,HTTP,WEBSOCKET)
2527
12. Active delivery acquisition mode or passive subscription notification module
26-
13. get with Reverse order(planning)
28+
13. get order and start serial pos setting
2729
14. Unlimited load message types
2830

2931
## install

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
[中文](README.md) || [English](README.en.md)
12
# XEngine_MQService
3+
本仓库有开发和主分支,如果要使用,请使用master分支下的代码
24

35
## 介绍
46
c c++消息队列服务 消息队列中间件
@@ -22,7 +24,7 @@ c c++Message Queue Service
2224
10. 支持权限验证(planning)
2325
11. 支持多种协议(TCP,HTTP,WEBSOCKET)
2426
12. 主动投递获取模式或者被动订阅通知模块
25-
13. 倒序获取(planning)
27+
13. 获取顺序与开始序列号设置
2628
14. 不限制负载的消息类型
2729

2830
## 安装教程

XEngine_Apps/MQCore_HTTPApp/MQCore_HTTPApp.cpp

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ void MQ_Create()
3939
Json::Value st_JsonMQProtocol;
4040
st_JsonRoot["unOperatorType"] = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
4141
st_JsonRoot["unOperatorCode"] = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQCREATE;
42+
st_JsonRoot["byVersion"] = ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_JSON;
4243

4344
st_JsonMQProtocol["tszMQKey"] = lpszKey;
4445
st_JsonMQProtocol["nSerial"] = 0;
@@ -72,6 +73,7 @@ void MQ_Post(LPCTSTR lpszMsgBuffer)
7273
Json::Value st_JsonPayload;
7374
st_JsonRoot["unOperatorType"] = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
7475
st_JsonRoot["unOperatorCode"] = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQPOST;
76+
st_JsonRoot["byVersion"] = ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_JSON;
7577

7678
st_JsonMQProtocol["tszMQKey"] = lpszKey;
7779
st_JsonMQProtocol["nSerial"] = 0; //序列号,0服务会自动处理
@@ -97,6 +99,36 @@ void MQ_Post(LPCTSTR lpszMsgBuffer)
9799
printf("MQ_Post:%s\n", ptszMsgBody);
98100
BaseLib_OperatorMemory_FreeCStyle((XPPMEM)&ptszMsgBody);
99101
}
102+
103+
void MQ_GetNumber()
104+
{
105+
int nLen = 0;
106+
TCHAR tszMsgBuffer[2048];
107+
memset(tszMsgBuffer, '\0', sizeof(tszMsgBuffer));
108+
109+
Json::Value st_JsonRoot;
110+
Json::Value st_JsonMQProtocol;
111+
Json::Value st_JsonPayload;
112+
st_JsonRoot["unOperatorType"] = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
113+
st_JsonRoot["unOperatorCode"] = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQNUMBER;
114+
st_JsonRoot["byVersion"] = ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_JSON;
115+
116+
st_JsonMQProtocol["tszMQKey"] = lpszKey;
117+
118+
st_JsonRoot["st_MQProtocol"] = st_JsonMQProtocol;
119+
120+
nLen = st_JsonRoot.toStyledString().length();
121+
memcpy(tszMsgBuffer, st_JsonRoot.toStyledString().c_str(), nLen);
122+
123+
TCHAR* ptszMsgBody = NULL;
124+
if (!APIHelp_HttpRequest_Post(lpszPostUrl, tszMsgBuffer, NULL, &ptszMsgBody))
125+
{
126+
printf("发送投递失败!\n");
127+
return;
128+
}
129+
printf("MQ_GetNumber:%s\n", ptszMsgBody);
130+
BaseLib_OperatorMemory_FreeCStyle((XPPMEM)&ptszMsgBody);
131+
}
100132
void MQ_Get()
101133
{
102134
int nLen = 0;
@@ -108,6 +140,7 @@ void MQ_Get()
108140
Json::Value st_JsonPayload;
109141
st_JsonRoot["unOperatorType"] = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
110142
st_JsonRoot["unOperatorCode"] = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQGET;
143+
st_JsonRoot["byVersion"] = ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_JSON;
111144

112145
st_JsonMQProtocol["tszMQKey"] = lpszKey;
113146
st_JsonMQProtocol["nSerial"] = 1;
@@ -126,7 +159,7 @@ void MQ_Get()
126159
printf("MQ_Get:%s\n", ptszMsgBody);
127160
BaseLib_OperatorMemory_FreeCStyle((XPPMEM)&ptszMsgBody);
128161
}
129-
//离开包
162+
//删除
130163
void MQ_Delete()
131164
{
132165
int nLen = 0;
@@ -138,6 +171,7 @@ void MQ_Delete()
138171
Json::Value st_JsonPayload;
139172
st_JsonRoot["unOperatorType"] = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
140173
st_JsonRoot["unOperatorCode"] = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQDEL;
174+
st_JsonRoot["byVersion"] = ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_JSON;
141175

142176
st_JsonMQProtocol["tszMQKey"] = lpszKey;
143177
st_JsonMQProtocol["nSerial"] = 1;
@@ -166,6 +200,7 @@ int main()
166200

167201
MQ_Create();
168202
MQ_Post("123hello");
203+
MQ_GetNumber();
169204
MQ_Get();
170205
MQ_Delete();
171206

XEngine_Apps/MQCore_TCPApp/MQCore_TCPApp.cpp

Lines changed: 164 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ using namespace std;
1818
//g++ -std=c++17 -Wall -g MQCore_APPService.cpp -o MQCore_APPService.exe -L /usr/local/lib/XEngine_Release/XEngine_BaseLib -L /usr/local/lib/XEngine_Release/XEngine_Client -lXEngine_BaseLib -lXClient_Socket
1919

2020
SOCKET m_Socket;
21+
__int64x nLastNumber = 0;
2122
LPCTSTR lpszKey = _T("XEngine_Notify"); //主题
2223

2324
void MQ_Create()
@@ -169,6 +170,166 @@ void MQ_Get()
169170
std::this_thread::sleep_for(std::chrono::milliseconds(1));
170171
}
171172
}
173+
void MQ_GetNumber()
174+
{
175+
int nLen = 0;
176+
XENGINE_PROTOCOLHDR st_ProtocolHdr;
177+
XENGINE_PROTOCOL_XMQ st_XMQProtocol;
178+
TCHAR tszMsgBuffer[2048];
179+
180+
memset(tszMsgBuffer, '\0', sizeof(tszMsgBuffer));
181+
memset(&st_ProtocolHdr, '\0', sizeof(XENGINE_PROTOCOLHDR));
182+
memset(&st_XMQProtocol, '\0', sizeof(XENGINE_PROTOCOL_XMQ));
183+
184+
st_ProtocolHdr.wHeader = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_HEADER;
185+
st_ProtocolHdr.unOperatorType = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
186+
st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQNUMBER;
187+
st_ProtocolHdr.byVersion = 1;
188+
st_ProtocolHdr.byIsReply = TRUE;
189+
st_ProtocolHdr.wTail = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_TAIL;
190+
191+
st_ProtocolHdr.unPacketSize = sizeof(XENGINE_PROTOCOL_XMQ);
192+
strcpy(st_XMQProtocol.tszMQKey, lpszKey);
193+
194+
nLen = sizeof(XENGINE_PROTOCOLHDR) + st_ProtocolHdr.unPacketSize;
195+
memcpy(tszMsgBuffer, &st_ProtocolHdr, sizeof(XENGINE_PROTOCOLHDR));
196+
memcpy(tszMsgBuffer + sizeof(XENGINE_PROTOCOLHDR), &st_XMQProtocol, sizeof(XENGINE_PROTOCOL_XMQ));
197+
198+
if (!XClient_TCPSelect_SendMsg(m_Socket, tszMsgBuffer, nLen))
199+
{
200+
printf("发送投递失败!\n");
201+
return;
202+
}
203+
204+
nLen = 2048;
205+
memset(tszMsgBuffer, '\0', sizeof(tszMsgBuffer));
206+
if (XClient_TCPSelect_RecvMsg(m_Socket, tszMsgBuffer, &nLen))
207+
{
208+
XENGINE_MQNUMBER st_MQNumber;
209+
memset(&st_ProtocolHdr, '\0', sizeof(XENGINE_PROTOCOLHDR));
210+
memset(&st_MQNumber, '\0', sizeof(XENGINE_MQNUMBER));
211+
212+
memcpy(&st_ProtocolHdr, tszMsgBuffer, sizeof(XENGINE_PROTOCOLHDR));
213+
memcpy(&st_MQNumber, tszMsgBuffer + sizeof(XENGINE_PROTOCOLHDR), sizeof(XENGINE_MQNUMBER));
214+
215+
if (0 == st_ProtocolHdr.wReserve)
216+
{
217+
nLastNumber = st_MQNumber.nLastNumber;
218+
printf("接受到消息信息,主题:%s,个数:%lld,起始编号:%lld,结束编号:%lld\n", st_MQNumber.tszMQKey, st_MQNumber.nCount, st_MQNumber.nFirstNumber, st_MQNumber.nLastNumber);
219+
}
220+
else
221+
{
222+
printf("接受到消息信息失败,错误码:%d\n", st_ProtocolHdr.wReserve);
223+
}
224+
}
225+
}
226+
void MQ_GetOrder()
227+
{
228+
int nLen = 0;
229+
XENGINE_PROTOCOLHDR st_ProtocolHdr;
230+
XENGINE_PROTOCOL_XMQ st_XMQProtocol;
231+
TCHAR tszMsgBuffer[2048];
232+
233+
memset(tszMsgBuffer, '\0', sizeof(tszMsgBuffer));
234+
memset(&st_ProtocolHdr, '\0', sizeof(XENGINE_PROTOCOLHDR));
235+
memset(&st_XMQProtocol, '\0', sizeof(XENGINE_PROTOCOL_XMQ));
236+
237+
st_ProtocolHdr.wHeader = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_HEADER;
238+
st_ProtocolHdr.unOperatorType = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
239+
st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQSERIAL;
240+
st_ProtocolHdr.byVersion = 1;
241+
st_ProtocolHdr.byIsReply = TRUE;
242+
st_ProtocolHdr.wTail = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_TAIL;
243+
244+
st_ProtocolHdr.unPacketSize = sizeof(XENGINE_PROTOCOL_XMQ);
245+
246+
st_XMQProtocol.nKeepTime = 0; //倒序
247+
st_XMQProtocol.nSerial = nLastNumber;
248+
strcpy(st_XMQProtocol.tszMQKey, lpszKey);
249+
250+
nLen = sizeof(XENGINE_PROTOCOLHDR) + st_ProtocolHdr.unPacketSize;
251+
memcpy(tszMsgBuffer, &st_ProtocolHdr, sizeof(XENGINE_PROTOCOLHDR));
252+
memcpy(tszMsgBuffer + sizeof(XENGINE_PROTOCOLHDR), &st_XMQProtocol, sizeof(XENGINE_PROTOCOL_XMQ));
253+
254+
if (!XClient_TCPSelect_SendMsg(m_Socket, tszMsgBuffer, nLen))
255+
{
256+
printf("发送投递失败!\n");
257+
return;
258+
}
259+
260+
nLen = 2048;
261+
memset(tszMsgBuffer, '\0', sizeof(tszMsgBuffer));
262+
if (XClient_TCPSelect_RecvMsg(m_Socket, tszMsgBuffer, &nLen))
263+
{
264+
memset(&st_ProtocolHdr, '\0', sizeof(XENGINE_PROTOCOLHDR));
265+
memset(&st_XMQProtocol, '\0', sizeof(XENGINE_PROTOCOL_XMQ));
266+
267+
memcpy(&st_ProtocolHdr, tszMsgBuffer, sizeof(XENGINE_PROTOCOLHDR));
268+
memcpy(&st_XMQProtocol, tszMsgBuffer + sizeof(XENGINE_PROTOCOLHDR), sizeof(XENGINE_PROTOCOL_XMQ));
269+
270+
if (0 == st_ProtocolHdr.wReserve)
271+
{
272+
printf("请求某个位置开始获取消息成功,主题:%s,序列号:%lld,顺序:%s\n", st_XMQProtocol.tszMQKey, st_XMQProtocol.nSerial, st_XMQProtocol.nKeepTime == 1 ? "顺序" : "倒序");
273+
}
274+
else
275+
{
276+
printf("请求某个位置开始获取消息失败,错误码:%d\n", st_ProtocolHdr.wReserve);
277+
}
278+
}
279+
}
280+
281+
void MQ_GetSerial()
282+
{
283+
int nLen = 0;
284+
XENGINE_PROTOCOLHDR st_ProtocolHdr;
285+
XENGINE_PROTOCOL_XMQ st_XMQProtocol;
286+
TCHAR tszMsgBuffer[2048];
287+
288+
memset(tszMsgBuffer, '\0', sizeof(tszMsgBuffer));
289+
memset(&st_ProtocolHdr, '\0', sizeof(XENGINE_PROTOCOLHDR));
290+
memset(&st_XMQProtocol, '\0', sizeof(XENGINE_PROTOCOL_XMQ));
291+
292+
st_ProtocolHdr.wHeader = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_HEADER;
293+
st_ProtocolHdr.unOperatorType = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
294+
st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQNUMBER;
295+
st_ProtocolHdr.byVersion = 1;
296+
st_ProtocolHdr.byIsReply = TRUE;
297+
st_ProtocolHdr.wTail = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_TAIL;
298+
299+
st_ProtocolHdr.unPacketSize = sizeof(XENGINE_PROTOCOL_XMQ);
300+
strcpy(st_XMQProtocol.tszMQKey, lpszKey);
301+
302+
nLen = sizeof(XENGINE_PROTOCOLHDR) + st_ProtocolHdr.unPacketSize;
303+
memcpy(tszMsgBuffer, &st_ProtocolHdr, sizeof(XENGINE_PROTOCOLHDR));
304+
memcpy(tszMsgBuffer + sizeof(XENGINE_PROTOCOLHDR), &st_XMQProtocol, sizeof(XENGINE_PROTOCOL_XMQ));
305+
306+
if (!XClient_TCPSelect_SendMsg(m_Socket, tszMsgBuffer, nLen))
307+
{
308+
printf("发送投递失败!\n");
309+
return;
310+
}
311+
312+
nLen = 2048;
313+
memset(tszMsgBuffer, '\0', sizeof(tszMsgBuffer));
314+
if (XClient_TCPSelect_RecvMsg(m_Socket, tszMsgBuffer, &nLen))
315+
{
316+
XENGINE_MQNUMBER st_MQNumber;
317+
memset(&st_ProtocolHdr, '\0', sizeof(XENGINE_PROTOCOLHDR));
318+
memset(&st_MQNumber, '\0', sizeof(XENGINE_MQNUMBER));
319+
320+
memcpy(&st_ProtocolHdr, tszMsgBuffer, sizeof(XENGINE_PROTOCOLHDR));
321+
memcpy(&st_MQNumber, tszMsgBuffer + sizeof(XENGINE_PROTOCOLHDR), sizeof(XENGINE_MQNUMBER));
322+
323+
if (0 == st_ProtocolHdr.wReserve)
324+
{
325+
printf("接受到消息信息,主题:%s,个数:%lld,起始编号:%lld,结束编号:%lld\n", st_MQNumber.tszMQKey, st_MQNumber.nCount, st_MQNumber.nFirstNumber, st_MQNumber.nLastNumber);
326+
}
327+
else
328+
{
329+
printf("接受到消息信息失败,错误码:%d\n", st_ProtocolHdr.wReserve);
330+
}
331+
}
332+
}
172333
//离开包
173334
void MQ_Delete()
174335
{
@@ -268,7 +429,7 @@ int main(int argc, char** argv)
268429
LPCTSTR lpszMsgBuffer = _T("123456789aaa");
269430
if (!XClient_TCPSelect_Create(&m_Socket, lpszServiceAddr, 5200))
270431
{
271-
printf("连接失败!\n");
432+
printf("连接失败!%d\n",WSAGetLastError());
272433
return -1;
273434
}
274435
printf("连接成功!\n");
@@ -285,6 +446,8 @@ int main(int argc, char** argv)
285446
MQ_Post(lpszMsgBuffer);
286447
MQ_Post(lpszMsgBuffer);
287448

449+
MQ_GetNumber();
450+
MQ_GetOrder();
288451
MQ_Get();
289452
MQ_Get();
290453
MQ_Get();

0 commit comments

Comments
 (0)