Skip to content

Commit d4058ef

Browse files
committed
modify:unread message protocol is modify to left count
1 parent 4e6d2e1 commit d4058ef

File tree

10 files changed

+110
-66
lines changed

10 files changed

+110
-66
lines changed

XEngine_Source/MQCore_DBModule/DBModule_Define.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,30 @@ extern "C" bool DBModule_MQData_ModifyTable(LPCXSTR lpszSrcTable, LPCXSTR lpszDs
252252
备注:
253253
*********************************************************************/
254254
extern "C" bool DBModule_MQData_ShowTable(XCHAR*** pppszTableName, int* pInt_ListCount);
255+
/********************************************************************
256+
函数名称:DBModule_MQData_GetLeftCount
257+
函数功能:获取剩余个数
258+
参数.一:lpszTableName
259+
In/Out:In
260+
类型:常量字符指针
261+
可空:N
262+
意思:输入表名称
263+
参数.二:nSerial
264+
In/Out:In
265+
类型:整数型
266+
可空:N
267+
意思:输入开始的序列号
268+
参数.三:pInt_Count
269+
In/Out:Out
270+
类型:整数型指针
271+
可空:N
272+
意思:输出统计信息
273+
返回值
274+
类型:逻辑型
275+
意思:是否成功
276+
备注:
277+
*********************************************************************/
278+
extern "C" bool DBModule_MQData_GetLeftCount(LPCXSTR lpszTableName, int nSerial, int* pInt_Count);
255279
/*************************************************************************
256280
消息用户导出函数
257281
**************************************************************************/

XEngine_Source/MQCore_DBModule/DBModule_MQData/DBModule_MQData.cpp

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -800,4 +800,67 @@ bool CDBModule_MQData::DBModule_MQData_ShowTable(XCHAR*** pppszTableName, int* p
800800
}
801801
DataBase_MySQL_FreeResult(xhDBSQL, xhTable);
802802
return true;
803+
}
804+
/********************************************************************
805+
函数名称:DBModule_MQData_GetLeftCount
806+
函数功能:获取剩余个数
807+
参数.一:lpszTableName
808+
In/Out:In
809+
类型:常量字符指针
810+
可空:N
811+
意思:输入表名称
812+
参数.二:nSerial
813+
In/Out:In
814+
类型:整数型
815+
可空:N
816+
意思:输入开始的序列号
817+
参数.三:pInt_Count
818+
In/Out:Out
819+
类型:整数型指针
820+
可空:N
821+
意思:输出统计信息
822+
返回值
823+
类型:逻辑型
824+
意思:是否成功
825+
备注:
826+
*********************************************************************/
827+
bool CDBModule_MQData::DBModule_MQData_GetLeftCount(LPCXSTR lpszTableName, int nSerial, int* pInt_Count)
828+
{
829+
DBModule_IsErrorOccur = false;
830+
831+
if (NULL == pInt_Count)
832+
{
833+
DBModule_IsErrorOccur = true;
834+
DBModule_dwErrorCode = ERROR_XENGINE_MQCORE_DATABASE_PARAMENT;
835+
return false;
836+
}
837+
//查询
838+
XNETHANDLE xhTable = 0;
839+
__int64u nllLine = 0;
840+
__int64u nllRow = 0;
841+
842+
XCHAR tszSQLStatement[1024];
843+
memset(tszSQLStatement, '\0', sizeof(tszSQLStatement));
844+
845+
_xstprintf(tszSQLStatement, _X("SELECT COUNT(*) FROM %s WHERE serial > %d"), lpszTableName, nSerial);
846+
if (!DataBase_MySQL_ExecuteQuery(xhDBSQL, &xhTable, tszSQLStatement, &nllLine, &nllRow))
847+
{
848+
DBModule_IsErrorOccur = true;
849+
DBModule_dwErrorCode = DataBase_GetLastError();
850+
return false;
851+
}
852+
if (nllLine <= 0)
853+
{
854+
DBModule_IsErrorOccur = true;
855+
DBModule_dwErrorCode = ERROR_XENGINE_MQCORE_DATABASE_EMPTY;
856+
return false;
857+
}
858+
XCHAR** pptszResult = DataBase_MySQL_GetResult(xhDBSQL, xhTable);
859+
860+
if (NULL != pptszResult[0])
861+
{
862+
*pInt_Count = _ttxoi(pptszResult[0]);
863+
}
864+
DataBase_MySQL_FreeResult(xhDBSQL, xhTable);
865+
return true;
803866
}

XEngine_Source/MQCore_DBModule/DBModule_MQData/DBModule_MQData.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class CDBModule_MQData
2828
bool DBModule_MQData_DeleteTable(LPCXSTR lpszQueueName);
2929
bool DBModule_MQData_ModifyTable(LPCXSTR lpszSrcTable, LPCXSTR lpszDstTable);
3030
bool DBModule_MQData_ShowTable(XCHAR*** pppszTableName, int* pInt_ListCount);
31+
bool DBModule_MQData_GetLeftCount(LPCXSTR lpszTableName, int nSerial, int* pInt_Count);
3132
private:
3233
XNETHANDLE xhDBSQL;
3334
};

XEngine_Source/MQCore_DBModule/MQCore_DBModule.def

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ EXPORTS
1414
DBModule_MQData_DeleteTable
1515
DBModule_MQData_ModifyTable
1616
DBModule_MQData_ShowTable
17+
DBModule_MQData_GetLeftCount
1718

1819
DBModule_MQUser_Init
1920
DBModule_MQUser_Destory

XEngine_Source/MQCore_DBModule/pch.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ extern "C" bool DBModule_MQData_ShowTable(XCHAR * **pppszTableName, int* pInt_Li
7575
{
7676
return m_DBData.DBModule_MQData_ShowTable(pppszTableName, pInt_ListCount);
7777
}
78+
extern "C" bool DBModule_MQData_GetLeftCount(LPCXSTR lpszTableName, int nSerial, int* pInt_Count)
79+
{
80+
return m_DBData.DBModule_MQData_GetLeftCount(lpszTableName, nSerial, pInt_Count);
81+
}
7882
/*************************************************************************
7983
消息用户导出函数
8084
**************************************************************************/

XEngine_Source/MQCore_ProtocolModule/ProtocolModule_Packet/ProtocolModule_Packet.cpp

Lines changed: 7 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -532,33 +532,28 @@ XHANDLE CProtocolModule_Packet::ProtocolModule_Packet_UNReadCreate(XENGINE_PROTO
532532
}
533533
/********************************************************************
534534
函数名称:ProtocolModule_Packet_UNReadInsert
535-
函数功能:维度消息打包数据插入
535+
函数功能:消息打包数据插入
536536
参数.一:xhToken
537537
In/Out:In
538538
类型:句柄
539539
可空:N
540540
意思:输入要操作的句柄
541-
参数.二:pppSt_DBMessage
541+
参数.二:lpszKeyName
542542
In/Out:In
543543
类型:三级指针
544544
可空:N
545-
意思:输入要打包的数据
545+
意思:输入队列名称
546546
参数.三:nListCount
547547
In/Out:In
548548
类型:整数型
549549
可空:N
550-
意思:输入要打包的数据个数
551-
参数.四:lpszUserName
552-
In/Out:In
553-
类型:常量字符指针
554-
可空:N
555-
意思:输入要过滤的用户
550+
意思:输入队列个数
556551
返回值
557552
类型:逻辑型
558553
意思:是否成功
559554
备注:
560555
*********************************************************************/
561-
bool CProtocolModule_Packet::ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken, XENGINE_DBMESSAGEQUEUE*** pppSt_DBMessage, int nListCount, LPCXSTR lpszUserName)
556+
bool CProtocolModule_Packet::ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken, LPCXSTR lpszKeyName, int nListCount)
562557
{
563558
Protocol_IsErrorOccur = false;
564559

@@ -569,35 +564,10 @@ bool CProtocolModule_Packet::ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken,
569564
Protocol_dwErrorCode = ERROR_MQ_MODULE_PROTOCOL_NOTFOUND;
570565
return false;
571566
}
572-
Json::Value st_JsonSub;
573567
Json::Value st_JsonSubArray;
574-
for (int i = 0; i < nListCount; i++)
575-
{
576-
XENGINE_PROTOCOL_MSGATTR st_MSGAttr;
577-
memcpy(&st_MSGAttr, &(*pppSt_DBMessage)[i]->nMsgAttr, sizeof(XENGINE_PROTOCOL_MSGATTR));
578-
579-
if ((0 == st_MSGAttr.byAttrSelf) && (0 == _tcsxnicmp(lpszUserName, (*pppSt_DBMessage)[i]->tszUserName, _tcsxlen((*pppSt_DBMessage)[i]->tszUserName))))
580-
{
581-
continue;
582-
}
583-
Json::Value st_JsonObject;
584-
st_JsonObject["tszQueueName"] = (*pppSt_DBMessage)[i]->tszQueueName;
585-
st_JsonObject["tszUserBelong"] = (*pppSt_DBMessage)[i]->tszUserBelong;
586-
st_JsonObject["tszUserName"] = (*pppSt_DBMessage)[i]->tszUserName;
587-
st_JsonObject["tszQueueLeftTime"] = (*pppSt_DBMessage)[i]->tszQueueLeftTime;
588-
st_JsonObject["tszQueuePublishTime"] = (*pppSt_DBMessage)[i]->tszQueuePublishTime;
589-
st_JsonObject["tszQueueCreateTime"] = (*pppSt_DBMessage)[i]->tszQueueCreateTime;
590-
st_JsonObject["nQueueSerial"] = (Json::Value::Int64)(*pppSt_DBMessage)[i]->nQueueSerial;
591-
st_JsonObject["nMsgLen"] = (*pppSt_DBMessage)[i]->nMsgLen;
592-
st_JsonObject["nMsgAttr"] = (*pppSt_DBMessage)[i]->nMsgAttr;
593-
st_JsonObject["byMsgType"] = (*pppSt_DBMessage)[i]->byMsgType;
594-
st_JsonObject["tszMsgBuffer"] = (*pppSt_DBMessage)[i]->tszMsgBuffer;
595-
st_JsonSub.append(st_JsonObject);
596-
}
597568

598-
st_JsonSubArray["Array"] = st_JsonSub;
599-
st_JsonSubArray["Name"] = (*pppSt_DBMessage)[0]->tszQueueName;
600-
st_JsonSubArray["Count"] = st_JsonSub.size();
569+
st_JsonSubArray["Name"] = lpszKeyName;
570+
st_JsonSubArray["Count"] = nListCount;
601571
pSt_UNRead->st_JsonArray.append(st_JsonSubArray);
602572
return true;
603573
}

XEngine_Source/MQCore_ProtocolModule/ProtocolModule_Packet/ProtocolModule_Packet.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class CProtocolModule_Packet
3535
bool ProtocolModule_Packet_OnlineList(XCHAR* ptszMsgBuffer, int* pInt_MsgLen, XCHAR*** ppptszListUser, int nListCount);
3636
public:
3737
XHANDLE ProtocolModule_Packet_UNReadCreate(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE enPayType);
38-
bool ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken, XENGINE_DBMESSAGEQUEUE*** pppSt_DBMessage, int nListCount, LPCXSTR lpszUserName);
38+
bool ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken, LPCXSTR lpszKeyName, int nListCount);
3939
bool ProtocolModule_Packet_UNReadDelete(XHANDLE xhToken, XCHAR* ptszMsgBuffer, int* pInt_MsgLen);
4040
protected:
4141
bool ProtocolModule_Packet_TCPCommon(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, XENGINE_PROTOCOL_XMQ* pSt_MQProtocol, XCHAR* ptszMsgBuffer, int* pInt_MsgLen, LPCXSTR lpszMsgBuffer = NULL, int nMsgLen = 0);

XEngine_Source/MQCore_ProtocolModule/Protocol_Define.h

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -290,33 +290,28 @@ extern "C" bool ProtocolModule_Packet_OnlineList(XCHAR* ptszMsgBuffer, int* pInt
290290
extern "C" XHANDLE ProtocolModule_Packet_UNReadCreate(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE enPayType);
291291
/********************************************************************
292292
函数名称:ProtocolModule_Packet_UNReadInsert
293-
函数功能:维度消息打包数据插入
293+
函数功能:消息打包数据插入
294294
参数.一:xhToken
295295
In/Out:In
296296
类型:句柄
297297
可空:N
298298
意思:输入要操作的句柄
299-
参数.二:pppSt_DBMessage
299+
参数.二:lpszKeyName
300300
In/Out:In
301301
类型:三级指针
302302
可空:N
303-
意思:输入要打包的数据
303+
意思:输入队列名称
304304
参数.三:nListCount
305305
In/Out:In
306306
类型:整数型
307307
可空:N
308-
意思:输入要打包的数据个数
309-
参数.四:lpszUserName
310-
In/Out:In
311-
类型:常量字符指针
312-
可空:N
313-
意思:输入要过滤的用户
308+
意思:输入队列个数
314309
返回值
315310
类型:逻辑型
316311
意思:是否成功
317312
备注:
318313
*********************************************************************/
319-
extern "C" bool ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken, XENGINE_DBMESSAGEQUEUE*** pppSt_DBMessage, int nListCount, LPCXSTR lpszUserName);
314+
extern "C" bool ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken, LPCXSTR lpszKeyName, int nListCount);
320315
/********************************************************************
321316
函数名称:ProtocolModule_Packet_UNReadDelete
322317
函数功能:删除数据并且导出

XEngine_Source/MQCore_ProtocolModule/pch.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ extern "C" XHANDLE ProtocolModule_Packet_UNReadCreate(XENGINE_PROTOCOLHDR * pSt_
6767
{
6868
return m_ProtocolPacket.ProtocolModule_Packet_UNReadCreate(pSt_ProtocolHdr, enPayType);
6969
}
70-
extern "C" bool ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken, XENGINE_DBMESSAGEQUEUE * **pppSt_DBMessage, int nListCount, LPCXSTR lpszUserName)
70+
extern "C" bool ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken, LPCXSTR lpszKeyName, int nListCount)
7171
{
72-
return m_ProtocolPacket.ProtocolModule_Packet_UNReadInsert(xhToken, pppSt_DBMessage, nListCount, lpszUserName);
72+
return m_ProtocolPacket.ProtocolModule_Packet_UNReadInsert(xhToken, lpszKeyName, nListCount);
7373
}
7474
extern "C" bool ProtocolModule_Packet_UNReadDelete(XHANDLE xhToken, XCHAR * ptszMsgBuffer, int* pInt_MsgLen)
7575
{

XEngine_Source/XEngine_MQServiceApp/MQService_TCPTask.cpp

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -901,22 +901,8 @@ bool MessageQueue_TCP_Handle(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, LPCXSTR lpszC
901901
for (int i = 0; i < nListCount; i++)
902902
{
903903
int nDBCount = 0;
904-
XENGINE_DBUSERKEY st_UserKey;
905-
XENGINE_DBMESSAGEQUEUE** ppSt_DBMessage;
906-
907-
memset(&st_UserKey, '\0', sizeof(XENGINE_DBUSERKEY));
908-
909-
DBModule_MQData_List(ppSt_UserKey[i]->tszKeyName, ppSt_UserKey[i]->nKeySerial, &ppSt_DBMessage, &nDBCount);
910-
if (nDBCount > 0)
911-
{
912-
//更新用户KEY
913-
st_UserKey.nKeySerial = ppSt_UserKey[i]->nKeySerial + nDBCount;
914-
_tcsxcpy(st_UserKey.tszUserName, tszUserName);
915-
_tcsxcpy(st_UserKey.tszKeyName, ppSt_UserKey[i]->tszKeyName);
916-
DBModule_MQUser_KeyUPDate(&st_UserKey);
917-
ProtocolModule_Packet_UNReadInsert(xhUNRead, &ppSt_DBMessage, nDBCount, tszUserName);
918-
BaseLib_OperatorMemory_Free((XPPPMEM)&ppSt_DBMessage, nDBCount);
919-
}
904+
DBModule_MQData_GetLeftCount(ppSt_UserKey[i]->tszKeyName, ppSt_UserKey[i]->nKeySerial, &nDBCount);
905+
ProtocolModule_Packet_UNReadInsert(xhUNRead, ppSt_UserKey[i]->tszKeyName, nDBCount);
920906
}
921907
ProtocolModule_Packet_UNReadDelete(xhUNRead, tszSDBuffer, &nSDLen);
922908
BaseLib_OperatorMemory_Free((XPPPMEM)&ppSt_UserKey, nListCount);

0 commit comments

Comments
 (0)