Skip to content

Commit b430cf6

Browse files
committed
added:delete message support
1 parent 87c0b33 commit b430cf6

File tree

6 files changed

+84
-0
lines changed

6 files changed

+84
-0
lines changed

XEngine_Source/MQCore_DBModule/DBModule_Define.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,20 @@ extern "C" bool DBModule_MQData_Destory();
111111
*********************************************************************/
112112
extern "C" bool DBModule_MQData_Insert(XENGINE_DBMESSAGEQUEUE * pSt_DBInfo);
113113
/********************************************************************
114+
函数名称:DBModule_MQData_Delete
115+
函数功能:删除消息
116+
参数.一:pSt_DBInfo
117+
In/Out:In
118+
类型:数据结构指针
119+
可空:N
120+
意思:输入要删除的消息
121+
返回值
122+
类型:逻辑型
123+
意思:是否成功
124+
备注:
125+
*********************************************************************/
126+
extern "C" bool DBModule_MQData_Delete(XENGINE_DBMESSAGEQUEUE* pSt_DBInfo);
127+
/********************************************************************
114128
函数名称:DBModule_MQData_Query
115129
函数功能:查询数据
116130
参数.一:pSt_DBInfo

XEngine_Source/MQCore_DBModule/DBModule_MQData/DBModule_MQData.cpp

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,46 @@ bool CDBModule_MQData::DBModule_MQData_Insert(XENGINE_DBMESSAGEQUEUE* pSt_DBInfo
145145
return true;
146146
}
147147
/********************************************************************
148+
函数名称:DBModule_MQData_Delete
149+
函数功能:删除消息
150+
参数.一:pSt_DBInfo
151+
In/Out:In
152+
类型:数据结构指针
153+
可空:N
154+
意思:输入要删除的消息
155+
返回值
156+
类型:逻辑型
157+
意思:是否成功
158+
备注:
159+
*********************************************************************/
160+
bool CDBModule_MQData::DBModule_MQData_Delete(XENGINE_DBMESSAGEQUEUE* pSt_DBInfo)
161+
{
162+
DBModule_IsErrorOccur = false;
163+
164+
if (NULL == pSt_DBInfo)
165+
{
166+
DBModule_IsErrorOccur = true;
167+
DBModule_dwErrorCode = ERROR_XENGINE_MQCORE_DATABASE_PARAMENT;
168+
return false;
169+
}
170+
XCHAR tszSQLStatement[10240];
171+
memset(tszSQLStatement, '\0', sizeof(tszSQLStatement));
172+
173+
_xstprintf(tszSQLStatement, _X("DELETE FROM `%s` WHERE tszQueueName = '%s' AND nQueueSerial = '%lld'"), pSt_DBInfo->tszQueueName, pSt_DBInfo->tszQueueName, pSt_DBInfo->nQueueSerial);
174+
175+
if (!DataBase_MySQL_Execute(xhDBSQL, tszSQLStatement))
176+
{
177+
DBModule_IsErrorOccur = true;
178+
DBModule_dwErrorCode = DataBase_GetLastError();
179+
return false;
180+
}
181+
if (m_bMemoryQuery)
182+
{
183+
MemoryCache_DBData_DataDelete(pSt_DBInfo);
184+
}
185+
return true;
186+
}
187+
/********************************************************************
148188
函数名称:DBModule_MQData_Query
149189
函数功能:查询数据
150190
参数.一:pSt_DBInfo

XEngine_Source/MQCore_DBModule/DBModule_MQData/DBModule_MQData.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ class CDBModule_MQData
2020
bool DBModule_MQData_Init(DATABASE_MYSQL_CONNECTINFO* pSt_DBConnector, bool bMemoryQuery = true, bool bMemoryInsert = true);
2121
bool DBModule_MQData_Destory();
2222
bool DBModule_MQData_Insert(XENGINE_DBMESSAGEQUEUE* pSt_DBInfo);
23+
bool DBModule_MQData_Delete(XENGINE_DBMESSAGEQUEUE* pSt_DBInfo);
2324
bool DBModule_MQData_Query(XENGINE_DBMESSAGEQUEUE* pSt_DBInfo);
2425
bool DBModule_MQData_Modify(XENGINE_DBMESSAGEQUEUE* pSt_DBInfo);
2526
bool DBModule_MQData_GetSerial(LPCXSTR lpszName, __int64x* pInt_DBCount, XENGINE_DBMESSAGEQUEUE* pSt_DBStart, XENGINE_DBMESSAGEQUEUE* pSt_DBEnd);

XEngine_Source/MQCore_DBModule/MQCore_DBModule.def

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ EXPORTS
66
DBModule_MQData_Init
77
DBModule_MQData_Destory
88
DBModule_MQData_Insert
9+
DBModule_MQData_Delete
910
DBModule_MQData_Query
1011
DBModule_MQData_Modify
1112
DBModule_MQData_GetSerial

XEngine_Source/MQCore_DBModule/pch.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ extern "C" bool DBModule_MQData_Insert(XENGINE_DBMESSAGEQUEUE * pSt_DBManage)
4343
{
4444
return m_DBData.DBModule_MQData_Insert(pSt_DBManage);
4545
}
46+
extern "C" bool DBModule_MQData_Delete(XENGINE_DBMESSAGEQUEUE* pSt_DBInfo)
47+
{
48+
return m_DBData.DBModule_MQData_Delete(pSt_DBInfo);
49+
}
4650
extern "C" bool DBModule_MQData_Query(XENGINE_DBMESSAGEQUEUE * pSt_DBInfo)
4751
{
4852
return m_DBData.DBModule_MQData_Query(pSt_DBInfo);

XEngine_Source/XEngine_MQServiceApp/MQService_TCPTask.cpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,30 @@ bool MessageQueue_TCP_Handle(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, LPCXSTR lpszC
654654
XEngine_MQXService_Send(lpszClientAddr, tszSDBuffer, nSDLen, nNetType);
655655
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("%s消息端:%s,主题:%s,序列:%lld,获取消息数据成功,消息大小:%d"), lpszClientType, lpszClientAddr, st_MQProtocol.tszMQKey, st_MessageQueue.nQueueSerial, st_MessageQueue.nMsgLen);
656656
}
657+
else if (XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQDELETE == pSt_ProtocolHdr->unOperatorCode)
658+
{
659+
XENGINE_DBMESSAGEQUEUE st_MessageQueue;
660+
memset(&st_MessageQueue, '\0', sizeof(XENGINE_DBMESSAGEQUEUE));
661+
662+
pSt_ProtocolHdr->unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REPDELETE;
663+
if (st_MQProtocol.nSerial <= 0)
664+
{
665+
pSt_ProtocolHdr->wReserve = 722;
666+
ProtocolModule_Packet_Common(nNetType, pSt_ProtocolHdr, &st_MQProtocol, tszSDBuffer, &nSDLen);
667+
XEngine_MQXService_Send(lpszClientAddr, tszSDBuffer, nSDLen, nNetType);
668+
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_ERROR, _X("%s消息端:%s,主题:%s,获取消息数据失败,获取指定消息序列:%lld 失败,错误:%lX"), lpszClientType, lpszClientAddr, st_MQProtocol.tszMQKey, st_MQProtocol.nSerial, DBModule_GetLastError());
669+
return false;
670+
}
671+
st_MessageQueue.nQueueSerial = st_MQProtocol.nSerial;
672+
_tcsxcpy(st_MessageQueue.tszQueueName, st_MQProtocol.tszMQKey);
673+
DBModule_MQData_Delete(&st_MessageQueue);
674+
675+
pSt_ProtocolHdr->wReserve = 0;
676+
pSt_ProtocolHdr->byVersion = st_MessageQueue.byMsgType;
677+
ProtocolModule_Packet_Common(nNetType, pSt_ProtocolHdr, &st_MQProtocol, tszSDBuffer, &nSDLen);
678+
XEngine_MQXService_Send(lpszClientAddr, tszSDBuffer, nSDLen, nNetType);
679+
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("%s消息端:%s,主题:%s,序列:%lld,删除消息数据成功"), lpszClientType, lpszClientAddr, st_MQProtocol.tszMQKey, st_MessageQueue.nQueueSerial);
680+
}
657681
else if (XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQTOPICCREATE == pSt_ProtocolHdr->unOperatorCode)
658682
{
659683
pSt_ProtocolHdr->unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REPTOPICCREATE;

0 commit comments

Comments
 (0)