Skip to content

Commit db1d124

Browse files
committed
delete:get time number supported
1 parent 1b801b9 commit db1d124

File tree

6 files changed

+69
-83
lines changed

6 files changed

+69
-83
lines changed

XEngine_Source/MQCore_DBModule/DBModule_Define.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ typedef struct
2323
XCHAR tszQueuePublishTime[64]; //发布时间
2424
XCHAR tszQueueCreateTime[64]; //创建时间
2525
__int64x nQueueSerial; //包序列号
26-
__int64x nQueueGetTime; //可以获取的次数
2726
int nMsgLen; //消息大小
2827
XBYTE byMsgType; //消息类型,参考:ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE
2928
XBYTE byMsgAttr; //消息属性

XEngine_Source/MQCore_DBModule/DBModule_MQData/DBModule_MQData.cpp

Lines changed: 30 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ bool CDBModule_MQData::DBModule_MQData_Insert(XENGINE_DBMESSAGEQUEUE* pSt_DBInfo
100100
memset(tszSQLCoder, '\0', sizeof(tszSQLCoder));
101101

102102
DataBase_MySQL_Coder(xhDBSQL, pSt_DBInfo->tszMsgBuffer, tszSQLCoder, &pSt_DBInfo->nMsgLen);
103-
__int64u nRet = _xstprintf(tszSQLStatement, _X("INSERT INTO `%s` (tszUserName,tszQueueName,nQueueSerial,nQueueGetTime,tszQueueLeftTime,tszQueuePublishTime,tszQueueData,nDataType,nDataAttr,tszQueueCreateTime) VALUES('%s','%s',%lld,%lld,'%s','%s','"), pSt_DBInfo->tszQueueName, pSt_DBInfo->tszUserName, pSt_DBInfo->tszQueueName, pSt_DBInfo->nQueueSerial, pSt_DBInfo->nQueueGetTime, pSt_DBInfo->tszQueueLeftTime, pSt_DBInfo->tszQueuePublishTime);
103+
__int64u nRet = _xstprintf(tszSQLStatement, _X("INSERT INTO `%s` (tszUserName,tszQueueName,nQueueSerial,tszQueueLeftTime,tszQueuePublishTime,tszQueueData,nDataType,nDataAttr,tszQueueCreateTime) VALUES('%s','%s',%lld,'%s','%s','"), pSt_DBInfo->tszQueueName, pSt_DBInfo->tszUserName, pSt_DBInfo->tszQueueName, pSt_DBInfo->nQueueSerial, pSt_DBInfo->tszQueueLeftTime, pSt_DBInfo->tszQueuePublishTime);
104104
memcpy(tszSQLStatement + nRet, tszSQLCoder, pSt_DBInfo->nMsgLen);
105105
nRet += pSt_DBInfo->nMsgLen;
106106

@@ -178,32 +178,28 @@ bool CDBModule_MQData::DBModule_MQData_Query(XENGINE_DBMESSAGEQUEUE* pSt_DBInfo)
178178
}
179179
if (NULL != pptszResult[4])
180180
{
181-
pSt_DBInfo->nQueueGetTime = _ttxoll(pptszResult[4]);
181+
_tcsxcpy(pSt_DBInfo->tszQueueLeftTime, pptszResult[4]);
182182
}
183183
if (NULL != pptszResult[5])
184184
{
185-
_tcsxcpy(pSt_DBInfo->tszQueueLeftTime, pptszResult[5]);
185+
_tcsxcpy(pSt_DBInfo->tszQueuePublishTime, pptszResult[5]);
186186
}
187187
if (NULL != pptszResult[6])
188188
{
189-
_tcsxcpy(pSt_DBInfo->tszQueuePublishTime, pptszResult[6]);
189+
pSt_DBInfo->nMsgLen = pInt_Length[6];
190+
memcpy(pSt_DBInfo->tszMsgBuffer, pptszResult[6], pSt_DBInfo->nMsgLen);
190191
}
191192
if (NULL != pptszResult[7])
192193
{
193-
pSt_DBInfo->nMsgLen = pInt_Length[7];
194-
memcpy(pSt_DBInfo->tszMsgBuffer, pptszResult[7], pSt_DBInfo->nMsgLen);
194+
pSt_DBInfo->byMsgType = _ttxoi(pptszResult[7]);
195195
}
196196
if (NULL != pptszResult[8])
197197
{
198-
pSt_DBInfo->byMsgType = _ttxoi(pptszResult[8]);
198+
pSt_DBInfo->byMsgAttr = _ttxoi(pptszResult[8]);
199199
}
200200
if (NULL != pptszResult[9])
201201
{
202-
pSt_DBInfo->byMsgAttr = _ttxoi(pptszResult[9]);
203-
}
204-
if (NULL != pptszResult[10])
205-
{
206-
_tcsxcpy(pSt_DBInfo->tszQueueCreateTime, pptszResult[10]);
202+
_tcsxcpy(pSt_DBInfo->tszQueueCreateTime, pptszResult[9]);
207203
}
208204
DataBase_MySQL_FreeResult(xhDBSQL, xhTable);
209205
return true;
@@ -238,7 +234,7 @@ bool CDBModule_MQData::DBModule_MQData_Modify(XENGINE_DBMESSAGEQUEUE* pSt_DBInfo
238234
memset(tszSQLCoder, '\0', sizeof(tszSQLCoder));
239235

240236
DataBase_MySQL_Coder(xhDBSQL, pSt_DBInfo->tszMsgBuffer, tszSQLCoder, &pSt_DBInfo->nMsgLen);
241-
__int64u nRet = _xstprintf(tszSQLStatement, _X("UPDATE `%s` SET nQueueGetTime = %lld,tszQueueLeftTime = '%s',tszQueuePublishTime = '%s',tszQueueData = '"), pSt_DBInfo->tszQueueName, pSt_DBInfo->nQueueGetTime, pSt_DBInfo->tszQueueLeftTime, pSt_DBInfo->tszQueuePublishTime);
237+
__int64u nRet = _xstprintf(tszSQLStatement, _X("UPDATE `%s` SET tszQueueLeftTime = '%s',tszQueuePublishTime = '%s',tszQueueData = '"), pSt_DBInfo->tszQueueName, pSt_DBInfo->tszQueueLeftTime, pSt_DBInfo->tszQueuePublishTime);
242238

243239
memcpy(tszSQLStatement + nRet, tszSQLCoder, pSt_DBInfo->nMsgLen);
244240
nRet += pSt_DBInfo->nMsgLen;
@@ -342,32 +338,28 @@ bool CDBModule_MQData::DBModule_MQData_List(LPCXSTR lpszQueueName, __int64x nSer
342338
}
343339
if (NULL != pptszResult[4])
344340
{
345-
(*pppSt_DBMessage)[i]->nQueueGetTime = _ttxoll(pptszResult[4]);
341+
_tcsxcpy((*pppSt_DBMessage)[i]->tszQueueLeftTime, pptszResult[4]);
346342
}
347343
if (NULL != pptszResult[5])
348344
{
349-
_tcsxcpy((*pppSt_DBMessage)[i]->tszQueueLeftTime, pptszResult[5]);
345+
_tcsxcpy((*pppSt_DBMessage)[i]->tszQueuePublishTime, pptszResult[5]);
350346
}
351347
if (NULL != pptszResult[6])
352348
{
353-
_tcsxcpy((*pppSt_DBMessage)[i]->tszQueuePublishTime, pptszResult[6]);
349+
(*pppSt_DBMessage)[i]->nMsgLen = pInt_Length[6];
350+
memcpy((*pppSt_DBMessage)[i]->tszMsgBuffer, pptszResult[6], (*pppSt_DBMessage)[i]->nMsgLen);
354351
}
355352
if (NULL != pptszResult[7])
356353
{
357-
(*pppSt_DBMessage)[i]->nMsgLen = pInt_Length[7];
358-
memcpy((*pppSt_DBMessage)[i]->tszMsgBuffer, pptszResult[7], (*pppSt_DBMessage)[i]->nMsgLen);
354+
(*pppSt_DBMessage)[i]->byMsgType = _ttxoi(pptszResult[7]);
359355
}
360356
if (NULL != pptszResult[8])
361357
{
362-
(*pppSt_DBMessage)[i]->byMsgType = _ttxoi(pptszResult[8]);
358+
(*pppSt_DBMessage)[i]->byMsgAttr = _ttxoi(pptszResult[8]);
363359
}
364360
if (NULL != pptszResult[9])
365361
{
366-
(*pppSt_DBMessage)[i]->byMsgAttr = _ttxoi(pptszResult[9]);
367-
}
368-
if (NULL != pptszResult[10])
369-
{
370-
_tcsxcpy((*pppSt_DBMessage)[i]->tszQueueCreateTime, pptszResult[10]);
362+
_tcsxcpy((*pppSt_DBMessage)[i]->tszQueueCreateTime, pptszResult[9]);
371363
}
372364
}
373365
DataBase_MySQL_FreeResult(xhDBSQL, xhTable);
@@ -452,32 +444,28 @@ bool CDBModule_MQData::DBModule_MQData_GetSerial(LPCXSTR lpszName, __int64x* pIn
452444
}
453445
if (NULL != pptszResult[4])
454446
{
455-
pSt_DBStart->nQueueGetTime = _ttxoll(pptszResult[4]);
447+
_tcsxcpy(pSt_DBStart->tszQueueLeftTime, pptszResult[4]);
456448
}
457449
if (NULL != pptszResult[5])
458450
{
459-
_tcsxcpy(pSt_DBStart->tszQueueLeftTime, pptszResult[5]);
451+
_tcsxcpy(pSt_DBStart->tszQueuePublishTime, pptszResult[5]);
460452
}
461453
if (NULL != pptszResult[6])
462454
{
463-
_tcsxcpy(pSt_DBStart->tszQueuePublishTime, pptszResult[6]);
455+
pSt_DBStart->nMsgLen = pInt_Length[6];
456+
memcpy(pSt_DBStart->tszMsgBuffer, pptszResult[6], pSt_DBStart->nMsgLen);
464457
}
465458
if (NULL != pptszResult[7])
466459
{
467-
pSt_DBStart->nMsgLen = pInt_Length[7];
468-
memcpy(pSt_DBStart->tszMsgBuffer, pptszResult[7], pSt_DBStart->nMsgLen);
460+
pSt_DBStart->byMsgType = _ttxoi(pptszResult[7]);
469461
}
470462
if (NULL != pptszResult[8])
471463
{
472-
pSt_DBStart->byMsgType = _ttxoi(pptszResult[8]);
464+
pSt_DBStart->byMsgAttr = _ttxoi(pptszResult[8]);
473465
}
474466
if (NULL != pptszResult[9])
475467
{
476-
pSt_DBStart->byMsgAttr = _ttxoi(pptszResult[9]);
477-
}
478-
if (NULL != pptszResult[10])
479-
{
480-
_tcsxcpy(pSt_DBStart->tszQueueCreateTime, pptszResult[10]);
468+
_tcsxcpy(pSt_DBStart->tszQueueCreateTime, pptszResult[9]);
481469
}
482470
DataBase_MySQL_FreeResult(xhDBSQL, xhTable);
483471
}
@@ -517,32 +505,28 @@ bool CDBModule_MQData::DBModule_MQData_GetSerial(LPCXSTR lpszName, __int64x* pIn
517505
}
518506
if (NULL != pptszResult[4])
519507
{
520-
pSt_DBEnd->nQueueGetTime = _ttxoll(pptszResult[4]);
508+
_tcsxcpy(pSt_DBEnd->tszQueueLeftTime, pptszResult[4]);
521509
}
522510
if (NULL != pptszResult[5])
523511
{
524-
_tcsxcpy(pSt_DBEnd->tszQueueLeftTime, pptszResult[5]);
512+
_tcsxcpy(pSt_DBEnd->tszQueuePublishTime, pptszResult[5]);
525513
}
526514
if (NULL != pptszResult[6])
527515
{
528-
_tcsxcpy(pSt_DBEnd->tszQueuePublishTime, pptszResult[6]);
516+
pSt_DBEnd->nMsgLen = pInt_Length[6];
517+
memcpy(pSt_DBEnd->tszMsgBuffer, pptszResult[6], pSt_DBEnd->nMsgLen);
529518
}
530519
if (NULL != pptszResult[7])
531520
{
532-
pSt_DBEnd->nMsgLen = pInt_Length[7];
533-
memcpy(pSt_DBEnd->tszMsgBuffer, pptszResult[7], pSt_DBEnd->nMsgLen);
521+
pSt_DBEnd->byMsgType = _ttxoi(pptszResult[7]);
534522
}
535523
if (NULL != pptszResult[8])
536524
{
537-
pSt_DBEnd->byMsgType = _ttxoi(pptszResult[8]);
525+
pSt_DBEnd->byMsgAttr = _ttxoi(pptszResult[8]);
538526
}
539527
if (NULL != pptszResult[9])
540528
{
541-
pSt_DBEnd->byMsgAttr = _ttxoi(pptszResult[9]);
542-
}
543-
if (NULL != pptszResult[10])
544-
{
545-
_tcsxcpy(pSt_DBEnd->tszQueueCreateTime, pptszResult[10]);
529+
_tcsxcpy(pSt_DBEnd->tszQueueCreateTime, pptszResult[9]);
546530
}
547531
DataBase_MySQL_FreeResult(xhDBSQL, xhTable);
548532
}
@@ -603,7 +587,6 @@ bool CDBModule_MQData::DBModule_MQData_CreateTable(LPCXSTR lpszQueueName)
603587
"`tszUserName` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '谁发布的消息',"
604588
"`tszQueueName` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '所属队列',"
605589
"`nQueueSerial` bigint NOT NULL COMMENT '消息序列',"
606-
"`nQueueGetTime` bigint NOT NULL COMMENT '获取次数',"
607590
"`tszQueueLeftTime` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '过期时间',"
608591
"`tszQueuePublishTime` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '发布时间',"
609592
"`tszQueueData` longblob NOT NULL COMMENT '保存数据',"

XEngine_Source/MQCore_ProtocolModule/ProtocolModule_Packet/ProtocolModule_Packet.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,6 @@ bool CProtocolModule_Packet::ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken,
523523
st_JsonObject["tszQueuePublishTime"] = (*pppSt_DBMessage)[i]->tszQueuePublishTime;
524524
st_JsonObject["tszQueueCreateTime"] = (*pppSt_DBMessage)[i]->tszQueueCreateTime;
525525
st_JsonObject["nQueueSerial"] = (Json::Value::Int64)(*pppSt_DBMessage)[i]->nQueueSerial;
526-
st_JsonObject["nQueueGetTime"] = (Json::Value::Int64)(*pppSt_DBMessage)[i]->nQueueGetTime;
527526
st_JsonObject["nMsgLen"] = (*pppSt_DBMessage)[i]->nMsgLen;
528527
st_JsonObject["byMsgType"] = (*pppSt_DBMessage)[i]->byMsgType;
529528
st_JsonObject["tszMsgBuffer"] = (*pppSt_DBMessage)[i]->tszMsgBuffer;
@@ -722,9 +721,12 @@ bool CProtocolModule_Packet::ProtocolModule_Packet_WSCommon(XENGINE_PROTOCOLHDR*
722721
{
723722
st_JsonMQProtocol["tszMQKey"] = pSt_MQProtocol->tszMQKey;
724723
st_JsonMQProtocol["nSerial"] = (Json::Value::Int64)pSt_MQProtocol->nSerial;
725-
st_JsonMQProtocol["nGetTimer"] = pSt_MQProtocol->nGetTimer;
726724
st_JsonMQProtocol["nKeepTime"] = pSt_MQProtocol->nKeepTime;
727725
st_JsonMQProtocol["nPubTime"] = (Json::Value::Int64)pSt_MQProtocol->nPubTime;
726+
727+
XBYTE byMSGAttr = 0;
728+
memcpy(&byMSGAttr, &pSt_MQProtocol->st_MSGAttr, sizeof(XENGINE_PROTOCOL_MSGATTR));
729+
st_JsonMQProtocol["byMSGAttr"] = byMSGAttr;
728730
st_JsonRoot["st_MQProtocol"] = st_JsonMQProtocol;
729731
}
730732

XEngine_Source/MQCore_ProtocolModule/ProtocolModule_Parse/ProtocolModule_Parse.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,6 @@ bool CProtocolModule_Parse::ProtocolModule_Parse_Websocket(LPCXSTR lpszMsgBuffer
125125
{
126126
st_MQProtocol.nSerial = st_JsonMQProtocol["nSerial"].asInt();
127127
}
128-
if (!st_JsonMQProtocol["nGetTimer"].isNull())
129-
{
130-
st_MQProtocol.nGetTimer = st_JsonMQProtocol["nGetTimer"].asInt();
131-
}
132128
if (!st_JsonMQProtocol["nKeepTime"].isNull())
133129
{
134130
st_MQProtocol.nKeepTime = st_JsonMQProtocol["nKeepTime"].asInt();
@@ -137,6 +133,11 @@ bool CProtocolModule_Parse::ProtocolModule_Parse_Websocket(LPCXSTR lpszMsgBuffer
137133
{
138134
st_MQProtocol.nPubTime = st_JsonMQProtocol["nPubTime"].asInt64();
139135
}
136+
if (!st_JsonMQProtocol["byMSGAttr"].isNull())
137+
{
138+
XBYTE byMSGAttr = st_JsonMQProtocol["byMSGAttr"].asUInt();
139+
memcpy(&st_MQProtocol.st_MSGAttr, &byMSGAttr, sizeof(XENGINE_PROTOCOL_MSGATTR));
140+
}
140141
*pInt_MsgLen += sizeof(XENGINE_PROTOCOL_XMQ);
141142
memcpy(ptszMsgBuffer + nPos, &st_MQProtocol, sizeof(XENGINE_PROTOCOL_XMQ));
142143
nPos += sizeof(XENGINE_PROTOCOL_XMQ);

XEngine_Source/XEngine_MQServiceApp/MQService_TCPTask.cpp

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,6 @@ bool MessageQueue_TCP_Handle(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, LPCXSTR lpszC
328328

329329
st_DBQueue.byMsgType = pSt_ProtocolHdr->byVersion;
330330
st_DBQueue.nQueueSerial = st_MQProtocol.nSerial;
331-
st_DBQueue.nQueueGetTime = st_MQProtocol.nGetTimer;
332331
st_DBQueue.nMsgLen = nMsgLen - sizeof(XENGINE_PROTOCOL_XMQ);
333332
_tcsxcpy(st_DBQueue.tszUserName, tszUserName);
334333
_tcsxcpy(st_DBQueue.tszQueueName, st_MQProtocol.tszMQKey);
@@ -395,29 +394,9 @@ bool MessageQueue_TCP_Handle(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, LPCXSTR lpszC
395394
BaseLib_OperatorTime_TimeToStr(st_DBQueue.tszQueuePublishTime, NULL, true, &st_LibTime);
396395
DBModule_MQUser_TimeInsert(&st_DBTime);
397396
}
398-
//插入数据库
399-
if (!DBModule_MQData_Insert(&st_DBQueue))
400-
{
401-
if (pSt_ProtocolHdr->byIsReply)
402-
{
403-
pSt_ProtocolHdr->wReserve = 702;
404-
ProtocolModule_Packet_Common(nNetType, pSt_ProtocolHdr, &st_MQProtocol, tszSDBuffer, &nSDLen);
405-
XEngine_MQXService_Send(lpszClientAddr, tszSDBuffer, nSDLen, nNetType);
406-
}
407-
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_ERROR, _X("%s客户端:%s,主题:%s,序列:%lld,投递数据报失败,插入数据库失败,错误:%lX"), lpszClientType, lpszClientAddr, st_MQProtocol.tszMQKey, st_MQProtocol.nSerial, DBModule_GetLastError());
408-
return false;
409-
}
410-
//返回成功没如果需要
411-
if (pSt_ProtocolHdr->byIsReply)
397+
else if (0 == st_MQProtocol.nPubTime)
412398
{
413-
pSt_ProtocolHdr->wReserve = 0;
414-
ProtocolModule_Packet_Common(nNetType, pSt_ProtocolHdr, &st_MQProtocol, tszSDBuffer, &nSDLen);
415-
XEngine_MQXService_Send(lpszClientAddr, tszSDBuffer, nSDLen, nNetType);
416-
}
417-
//是否需要通知
418-
if (0 == st_MQProtocol.nPubTime)
419-
{
420-
//设置为0,不是定时发布
399+
//设置为0,不是定时发布,即时通知
421400
if (1 == st_MQProtocol.st_MSGAttr.byAttrAll)
422401
{
423402
int nListCount = 0;
@@ -435,7 +414,7 @@ bool MessageQueue_TCP_Handle(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, LPCXSTR lpszC
435414
memset(tszSDBuffer, '\0', sizeof(tszSDBuffer));
436415

437416
pSt_ProtocolHdr->unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_MSGNOTIFY;
438-
417+
439418
SessionModule_Client_GetType(pptszListAddr[i], &nClientType);
440419
ProtocolModule_Packet_Common(nClientType, pSt_ProtocolHdr, &st_MQProtocol, tszSDBuffer, &nSDLen, lpszMsgBuffer + sizeof(XENGINE_PROTOCOL_XMQ), nMsgLen - sizeof(XENGINE_PROTOCOL_XMQ));
441420
XEngine_MQXService_Send(pptszListAddr[i], tszSDBuffer, nSDLen, nClientType);
@@ -448,7 +427,7 @@ bool MessageQueue_TCP_Handle(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, LPCXSTR lpszC
448427
int nListCount = 0;
449428
XENGINE_DBUSERKEY** ppSt_ListUser;
450429
DBModule_MQUser_KeyList(NULL, st_MQProtocol.tszMQKey, &ppSt_ListUser, &nListCount);
451-
430+
452431
for (int i = 0; i < nListCount; i++)
453432
{
454433
//跳过自己
@@ -476,6 +455,30 @@ bool MessageQueue_TCP_Handle(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, LPCXSTR lpszC
476455
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("%s消息端:%s,主题:%s,序列:%lld,投递数据到消息队列成功,通知客户端个数:%d"), lpszClientType, lpszClientAddr, st_DBQueue.tszQueueName, st_DBQueue.nQueueSerial, nListCount);
477456
}
478457
}
458+
else
459+
{
460+
//用户获取
461+
_xstprintf(st_DBQueue.tszQueuePublishTime, _X("-1"));
462+
}
463+
//插入数据库
464+
if (!DBModule_MQData_Insert(&st_DBQueue))
465+
{
466+
if (pSt_ProtocolHdr->byIsReply)
467+
{
468+
pSt_ProtocolHdr->wReserve = 702;
469+
ProtocolModule_Packet_Common(nNetType, pSt_ProtocolHdr, &st_MQProtocol, tszSDBuffer, &nSDLen);
470+
XEngine_MQXService_Send(lpszClientAddr, tszSDBuffer, nSDLen, nNetType);
471+
}
472+
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_ERROR, _X("%s客户端:%s,主题:%s,序列:%lld,投递数据报失败,插入数据库失败,错误:%lX"), lpszClientType, lpszClientAddr, st_MQProtocol.tszMQKey, st_MQProtocol.nSerial, DBModule_GetLastError());
473+
return false;
474+
}
475+
//返回成功没如果需要
476+
if (pSt_ProtocolHdr->byIsReply)
477+
{
478+
pSt_ProtocolHdr->wReserve = 0;
479+
ProtocolModule_Packet_Common(nNetType, pSt_ProtocolHdr, &st_MQProtocol, tszSDBuffer, &nSDLen);
480+
XEngine_MQXService_Send(lpszClientAddr, tszSDBuffer, nSDLen, nNetType);
481+
}
479482
}
480483
else if (XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQGET == pSt_ProtocolHdr->unOperatorCode)
481484
{
@@ -525,8 +528,8 @@ bool MessageQueue_TCP_Handle(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, LPCXSTR lpszC
525528
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_ERROR, _X("%s消息端:%s,主题:%s,序列:%lld,获取消息数据失败,无法继续,错误:%lX"), lpszClientType, lpszClientAddr, st_MQProtocol.tszMQKey, st_MQProtocol.nSerial, DBModule_GetLastError());
526529
return false;
527530
}
528-
//跳过定时任务
529-
if (_tcsxlen(st_MessageQueue.tszQueuePublishTime) > 0)
531+
//只有-1的未指定投递任务才能获取
532+
if (-1 != _ttxoi(st_MessageQueue.tszQueuePublishTime))
530533
{
531534
st_UserKey.nKeySerial++;
532535
continue;
@@ -782,7 +785,6 @@ bool MessageQueue_TCP_Handle(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, LPCXSTR lpszC
782785

783786
st_DBQueue.byMsgType = pSt_ProtocolHdr->byVersion;
784787
st_DBQueue.nQueueSerial = st_MQProtocol.nSerial;
785-
st_DBQueue.nQueueGetTime = st_MQProtocol.nGetTimer;
786788
st_DBQueue.nMsgLen = nMsgLen - sizeof(XENGINE_PROTOCOL_XMQ);
787789

788790
_tcsxcpy(st_DBQueue.tszUserName, tszUserName);

XEngine_Source/XQueue_ProtocolHdr.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ typedef struct
7272
__int64x nSerial; //包序列号
7373
__int64x nPubTime; //发布时间,根据自己需求配置时区
7474
int nKeepTime; //保持时间,单位秒,-1 永久存在 0 一次就结束,>0 保存秒数
75-
int nGetTimer; //可以获取的次数
7675
XENGINE_PROTOCOL_MSGATTR st_MSGAttr; //消息属性
7776
}XENGINE_PROTOCOL_XMQ, * LPXENGINE_PROTOCOL_XMQ;
7877
typedef struct

0 commit comments

Comments
 (0)