Skip to content

Commit 864b8aa

Browse files
committed
added:whether break self for time release
1 parent 3f4a521 commit 864b8aa

File tree

5 files changed

+30
-23
lines changed

5 files changed

+30
-23
lines changed

XEngine_Source/MQCore_DBModule/DBModule_Define.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,6 @@
1111
// History:
1212
*********************************************************************/
1313
//////////////////////////////////////////////////////////////////////////
14-
// 导出的回调
15-
//////////////////////////////////////////////////////////////////////////
16-
typedef void(CALLBACK* CALLBACK_MESSAGEQUEUE_MODULE_DATABASE_TIMEPUBLISH)(LPCXSTR lpszQueueName, __int64x nIDMsg, __int64x nIDTime, XPVOID lParam);
17-
//////////////////////////////////////////////////////////////////////////
1814
// 导出的数据结构
1915
//////////////////////////////////////////////////////////////////////////
2016
//消息队列
@@ -38,6 +34,7 @@ typedef struct
3834
__int64x nIDMsg; //消息ID
3935
__int64x nIDTime; //发布时间
4036
bool bActive; //是否激活过
37+
bool bBreak; //跳过自身
4138
}XENGINE_DBTIMERELEASE;
4239
//用户消息
4340
typedef struct
@@ -56,6 +53,10 @@ typedef struct
5653
XCHAR tszCreateTime[64]; //创建时间
5754
}XENGINE_DBTOPICOWNER;
5855
//////////////////////////////////////////////////////////////////////////
56+
// 导出的回调
57+
//////////////////////////////////////////////////////////////////////////
58+
typedef void(CALLBACK* CALLBACK_MESSAGEQUEUE_MODULE_DATABASE_TIMEPUBLISH)(XENGINE_DBTIMERELEASE* pSt_DBInfo, XPVOID lParam);
59+
//////////////////////////////////////////////////////////////////////////
5960
// 导出的函数
6061
//////////////////////////////////////////////////////////////////////////
6162
extern "C" XLONG DBModule_GetLastError(int *pInt_SysError = NULL);

XEngine_Source/MQCore_DBModule/DBModule_MQUser/DBModule_MQUser.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -642,7 +642,7 @@ bool CDBModule_MQUser::DBModule_MQUser_TimeInsert(XENGINE_DBTIMERELEASE* pSt_DBI
642642
XCHAR tszSQLStatement[10240];
643643
memset(tszSQLStatement, '\0', sizeof(tszSQLStatement));
644644

645-
_xstprintf(tszSQLStatement, _X("INSERT INTO `UserTime` (tszQueueName,nIDMsg,nIDTime,bActive,tszCreateTime) VALUES('%s',%lld,%lld,0,now())"), pSt_DBInfo->tszQueueName, pSt_DBInfo->nIDMsg, pSt_DBInfo->nIDTime);
645+
_xstprintf(tszSQLStatement, _X("INSERT INTO `UserTime` (tszQueueName,nIDMsg,nIDTime,bActive,bBreak,tszCreateTime) VALUES('%s',%lld,%lld,0,%d,now())"), pSt_DBInfo->tszQueueName, pSt_DBInfo->nIDMsg, pSt_DBInfo->nIDTime, pSt_DBInfo->bBreak);
646646
if (!DataBase_MySQL_Execute(xhDBSQL, tszSQLStatement))
647647
{
648648
DBModule_IsErrorOccur = true;
@@ -715,7 +715,11 @@ bool CDBModule_MQUser::DBModule_MQUser_TimeQuery(XENGINE_DBTIMERELEASE*** pppSt_
715715
}
716716
if (NULL != pptszResult[4])
717717
{
718-
_tcsxcpy((*pppSt_DBInfo)[i]->tszCreateTime, pptszResult[4]);
718+
(*pppSt_DBInfo)[i]->bBreak = _ttoi(pptszResult[4]);
719+
}
720+
if (NULL != pptszResult[5])
721+
{
722+
_tcsxcpy((*pppSt_DBInfo)[i]->tszCreateTime, pptszResult[5]);
719723
}
720724
}
721725
DataBase_MySQL_FreeResult(xhDBSQL, xhTable);
@@ -1050,7 +1054,7 @@ XHTHREAD CALLBACK CDBModule_MQUser::DBModule_MQUser_TimeThread(XPVOID lParam)
10501054
pClass_This->DBModule_MQUser_TimeQuery(&ppSt_DBInfo, &nListCount);
10511055
for (int i = 0; i < nListCount; i++)
10521056
{
1053-
pClass_This->lpCall_TimePublish(ppSt_DBInfo[i]->tszQueueName, ppSt_DBInfo[i]->nIDMsg, ppSt_DBInfo[i]->nIDTime, pClass_This->m_lParam);
1057+
pClass_This->lpCall_TimePublish(ppSt_DBInfo[i], pClass_This->m_lParam);
10541058
}
10551059
BaseLib_OperatorMemory_Free((XPPPMEM)&ppSt_DBInfo, nListCount);
10561060
std::this_thread::sleep_for(std::chrono::seconds(1));

XEngine_Source/XEngine_MQServiceApp/MQService_TCPTask.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,7 @@ bool MessageQueue_TCP_Handle(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, LPCXSTR lpszC
373373

374374
st_DBTime.nIDMsg = st_DBQueue.nQueueSerial;
375375
st_DBTime.nIDTime = st_MQProtocol.nPubTime;
376+
st_DBTime.bBreak = pSt_ProtocolHdr->wReserve;
376377
_tcsxcpy(st_DBTime.tszQueueName, st_DBQueue.tszQueueName);
377378

378379
BaseLib_OperatorTime_TTimeToStuTime(st_MQProtocol.nPubTime, &st_LibTime);
Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
11
#include "MQService_Hdr.h"
22

3-
void CALLBACK MessageQueue_CBTask_TimePublish(LPCXSTR lpszQueueName, __int64x nIDMsg, __int64x nIDTime, XPVOID lParam)
3+
void CALLBACK MessageQueue_CBTask_TimePublish(XENGINE_DBTIMERELEASE* pSt_DBInfo, XPVOID lParam)
44
{
55
int nMsgLen = 0;
66
XCHAR tszMsgBuffer[4096];
77
XENGINE_PROTOCOLHDR st_ProtocolHdr;
88
XENGINE_PROTOCOL_XMQ st_MQProtocol;
99
XENGINE_DBMESSAGEQUEUE st_DBInfo;
10-
XENGINE_DBTIMERELEASE st_DBTime;
1110

1211
memset(tszMsgBuffer, '\0', sizeof(tszMsgBuffer));
1312
memset(&st_ProtocolHdr, '\0', sizeof(XENGINE_PROTOCOLHDR));
1413
memset(&st_MQProtocol, '\0', sizeof(XENGINE_PROTOCOL_XMQ));
1514
memset(&st_DBInfo, '\0', sizeof(XENGINE_DBMESSAGEQUEUE));
16-
memset(&st_DBTime, '\0', sizeof(XENGINE_DBTIMERELEASE));
1715

1816
st_ProtocolHdr.wHeader = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_HEADER;
1917
st_ProtocolHdr.unOperatorType = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
@@ -23,13 +21,12 @@ void CALLBACK MessageQueue_CBTask_TimePublish(LPCXSTR lpszQueueName, __int64x nI
2321
st_ProtocolHdr.unPacketSize = sizeof(XENGINE_PROTOCOL_XMQ);
2422
st_ProtocolHdr.wTail = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_TAIL;
2523

26-
st_MQProtocol.nPubTime = nIDTime;
27-
st_MQProtocol.nSerial = nIDMsg;
28-
_tcsxcpy(st_MQProtocol.tszMQKey, lpszQueueName);
24+
st_MQProtocol.nPubTime = pSt_DBInfo->nIDTime;
25+
st_MQProtocol.nSerial = pSt_DBInfo->nIDMsg;
26+
_tcsxcpy(st_MQProtocol.tszMQKey, pSt_DBInfo->tszQueueName);
2927

30-
st_DBTime.nIDMsg = nIDMsg;
31-
st_DBInfo.nQueueSerial = nIDMsg;
32-
_tcsxcpy(st_DBInfo.tszQueueName, lpszQueueName);
28+
st_DBInfo.nQueueSerial = pSt_DBInfo->nIDMsg;
29+
_tcsxcpy(st_DBInfo.tszQueueName, pSt_DBInfo->tszQueueName);
3330
DBModule_MQData_Query(&st_DBInfo);
3431
//是否需要通知
3532
int nListCount = 0;
@@ -40,10 +37,14 @@ void CALLBACK MessageQueue_CBTask_TimePublish(LPCXSTR lpszQueueName, __int64x nI
4037
int nNetType = 0;
4138
XCHAR tszUserAddr[128];
4239
memset(tszUserAddr, '\0', sizeof(tszUserAddr));
43-
//跳过自己
44-
if (0 == _tcsxncmp(st_DBInfo.tszUserName, ppSt_ListUser[i]->tszUserName, _tcsxlen(st_DBInfo.tszUserName)))
40+
41+
if (pSt_DBInfo->bBreak)
4542
{
46-
continue;
43+
//跳过自己
44+
if (0 == _tcsxncmp(st_DBInfo.tszUserName, ppSt_ListUser[i]->tszUserName, _tcsxlen(st_DBInfo.tszUserName)))
45+
{
46+
continue;
47+
}
4748
}
4849
SessionModule_Client_GetAddr(ppSt_ListUser[i]->tszUserName, tszUserAddr);
4950
SessionModule_Client_GetType(tszUserAddr, &nNetType);
@@ -55,12 +56,12 @@ void CALLBACK MessageQueue_CBTask_TimePublish(LPCXSTR lpszQueueName, __int64x nI
5556
if (st_DBConfig.st_MQUser.st_UserTime.bPubClear)
5657
{
5758
//移除这条消息
58-
DBModule_MQUser_TimeDelete(&st_DBTime);
59+
DBModule_MQUser_TimeDelete(pSt_DBInfo);
5960
}
6061
else
6162
{
6263
//更新
63-
DBModule_MQUser_TimeUPDate(&st_DBTime);
64+
DBModule_MQUser_TimeUPDate(pSt_DBInfo);
6465
}
65-
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("定时任务,消息主题:%s,序列:%lld,定时任务分发成功,客户端个数:%d"), lpszQueueName, nIDMsg, nListCount);
66+
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("定时任务,消息主题:%s,序列:%lld,定时任务分发成功,客户端个数:%d"), pSt_DBInfo->tszQueueName, pSt_DBInfo->nIDMsg, nListCount);
6667
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
#pragma once
22

3-
void CALLBACK MessageQueue_CBTask_TimePublish(LPCXSTR lpszQueueName, __int64x nIDMsg, __int64x nIDTime, XPVOID lParam);
3+
void CALLBACK MessageQueue_CBTask_TimePublish(XENGINE_DBTIMERELEASE* pSt_DBInfo, XPVOID lParam);

0 commit comments

Comments
 (0)