Skip to content

Commit 6244f4f

Browse files
committed
fixed:unread message have no user belong
added:send to self message for attr supported
1 parent adcac75 commit 6244f4f

File tree

5 files changed

+36
-8
lines changed

5 files changed

+36
-8
lines changed

XEngine_Source/MQCore_ProtocolModule/ProtocolModule_Packet/ProtocolModule_Packet.cpp

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,12 +497,17 @@ XHANDLE CProtocolModule_Packet::ProtocolModule_Packet_UNReadCreate(XENGINE_PROTO
497497
类型:整数型
498498
可空:N
499499
意思:输入要打包的数据个数
500+
参数.四:lpszUserName
501+
In/Out:In
502+
类型:常量字符指针
503+
可空:N
504+
意思:输入要过滤的用户
500505
返回值
501506
类型:逻辑型
502507
意思:是否成功
503508
备注:
504509
*********************************************************************/
505-
bool CProtocolModule_Packet::ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken, XENGINE_DBMESSAGEQUEUE*** pppSt_DBMessage, int nListCount)
510+
bool CProtocolModule_Packet::ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken, XENGINE_DBMESSAGEQUEUE*** pppSt_DBMessage, int nListCount, LPCXSTR lpszUserName)
506511
{
507512
Protocol_IsErrorOccur = false;
508513

@@ -517,14 +522,23 @@ bool CProtocolModule_Packet::ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken,
517522
Json::Value st_JsonSubArray;
518523
for (int i = 0; i < nListCount; i++)
519524
{
525+
XENGINE_PROTOCOL_MSGATTR st_MSGAttr;
526+
memcpy(&st_MSGAttr, &(*pppSt_DBMessage)[i]->byMsgAttr, sizeof(XENGINE_PROTOCOL_MSGATTR));
527+
528+
if ((0 == st_MSGAttr.byAttrSelf) && (0 == _tcsxnicmp(lpszUserName, (*pppSt_DBMessage)[i]->tszUserName, _tcsxlen((*pppSt_DBMessage)[i]->tszUserName))))
529+
{
530+
continue;
531+
}
520532
Json::Value st_JsonObject;
521533
st_JsonObject["tszQueueName"] = (*pppSt_DBMessage)[i]->tszQueueName;
534+
st_JsonObject["tszUserName"] = (*pppSt_DBMessage)[i]->tszUserName;
522535
st_JsonObject["tszQueueLeftTime"] = (*pppSt_DBMessage)[i]->tszQueueLeftTime;
523536
st_JsonObject["tszQueuePublishTime"] = (*pppSt_DBMessage)[i]->tszQueuePublishTime;
524537
st_JsonObject["tszQueueCreateTime"] = (*pppSt_DBMessage)[i]->tszQueueCreateTime;
525538
st_JsonObject["nQueueSerial"] = (Json::Value::Int64)(*pppSt_DBMessage)[i]->nQueueSerial;
526539
st_JsonObject["nMsgLen"] = (*pppSt_DBMessage)[i]->nMsgLen;
527540
st_JsonObject["byMsgType"] = (*pppSt_DBMessage)[i]->byMsgType;
541+
st_JsonObject["byMsgAttr"] = (*pppSt_DBMessage)[i]->byMsgAttr;
528542
st_JsonObject["tszMsgBuffer"] = (*pppSt_DBMessage)[i]->tszMsgBuffer;
529543
st_JsonSub.append(st_JsonObject);
530544
}

XEngine_Source/MQCore_ProtocolModule/ProtocolModule_Packet/ProtocolModule_Packet.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class CProtocolModule_Packet
3434
bool ProtocolModule_Packet_TopicList(XCHAR* ptszMsgBuffer, int* pInt_MsgLen, XCHAR*** pppszTableName, int nListCount);
3535
public:
3636
XHANDLE ProtocolModule_Packet_UNReadCreate(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE enPayType);
37-
bool ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken, XENGINE_DBMESSAGEQUEUE*** pppSt_DBMessage, int nListCount);
37+
bool ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken, XENGINE_DBMESSAGEQUEUE*** pppSt_DBMessage, int nListCount, LPCXSTR lpszUserName);
3838
bool ProtocolModule_Packet_UNReadDelete(XHANDLE xhToken, XCHAR* ptszMsgBuffer, int* pInt_MsgLen);
3939
protected:
4040
bool ProtocolModule_Packet_TCPCommon(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, XENGINE_PROTOCOL_XMQ* pSt_MQProtocol, XCHAR* ptszMsgBuffer, int* pInt_MsgLen, LPCXSTR lpszMsgBuffer = NULL, int nMsgLen = 0);

XEngine_Source/MQCore_ProtocolModule/Protocol_Define.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,12 +277,17 @@ extern "C" XHANDLE ProtocolModule_Packet_UNReadCreate(XENGINE_PROTOCOLHDR* pSt_P
277277
类型:整数型
278278
可空:N
279279
意思:输入要打包的数据个数
280+
参数.四:lpszUserName
281+
In/Out:In
282+
类型:常量字符指针
283+
可空:N
284+
意思:输入要过滤的用户
280285
返回值
281286
类型:逻辑型
282287
意思:是否成功
283288
备注:
284289
*********************************************************************/
285-
extern "C" bool ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken, XENGINE_DBMESSAGEQUEUE*** pppSt_DBMessage, int nListCount);
290+
extern "C" bool ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken, XENGINE_DBMESSAGEQUEUE*** pppSt_DBMessage, int nListCount, LPCXSTR lpszUserName);
286291
/********************************************************************
287292
函数名称:ProtocolModule_Packet_UNReadDelete
288293
函数功能:删除数据并且导出

XEngine_Source/MQCore_ProtocolModule/pch.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,9 @@ extern "C" XHANDLE ProtocolModule_Packet_UNReadCreate(XENGINE_PROTOCOLHDR * pSt_
6363
{
6464
return m_ProtocolPacket.ProtocolModule_Packet_UNReadCreate(pSt_ProtocolHdr, enPayType);
6565
}
66-
extern "C" bool ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken, XENGINE_DBMESSAGEQUEUE * **pppSt_DBMessage, int nListCount)
66+
extern "C" bool ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken, XENGINE_DBMESSAGEQUEUE * **pppSt_DBMessage, int nListCount, LPCXSTR lpszUserName)
6767
{
68-
return m_ProtocolPacket.ProtocolModule_Packet_UNReadInsert(xhToken, pppSt_DBMessage, nListCount);
68+
return m_ProtocolPacket.ProtocolModule_Packet_UNReadInsert(xhToken, pppSt_DBMessage, nListCount, lpszUserName);
6969
}
7070
extern "C" bool ProtocolModule_Packet_UNReadDelete(XHANDLE xhToken, XCHAR * ptszMsgBuffer, int* pInt_MsgLen)
7171
{

XEngine_Source/XEngine_MQServiceApp/MQService_TCPTask.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ bool MessageQueue_TCP_Handle(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, LPCXSTR lpszC
405405
for (int i = 0; i < nListCount; i++)
406406
{
407407
//跳过自己
408-
if (0 == _tcsxncmp(lpszClientAddr, pptszListAddr[i], _tcsxlen(lpszClientAddr)))
408+
if (0 == _tcsxncmp(lpszClientAddr, pptszListAddr[i], _tcsxlen(lpszClientAddr)) && (0 == st_MQProtocol.st_MSGAttr.byAttrSelf))
409409
{
410410
continue;
411411
}
@@ -431,7 +431,7 @@ bool MessageQueue_TCP_Handle(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, LPCXSTR lpszC
431431
for (int i = 0; i < nListCount; i++)
432432
{
433433
//跳过自己
434-
if (0 == _tcsxncmp(tszUserName, ppSt_ListUser[i]->tszUserName, _tcsxlen(tszUserName)))
434+
if (0 == _tcsxncmp(tszUserName, ppSt_ListUser[i]->tszUserName, _tcsxlen(tszUserName)) && (0 == st_MQProtocol.st_MSGAttr.byAttrSelf))
435435
{
436436
continue;
437437
}
@@ -550,6 +550,15 @@ bool MessageQueue_TCP_Handle(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, LPCXSTR lpszC
550550
continue;
551551
}
552552
}
553+
//是不是自己发布的
554+
XENGINE_PROTOCOL_MSGATTR st_MSGAttr;
555+
memcpy(&st_MSGAttr, &st_MessageQueue.byMsgAttr, sizeof(XENGINE_PROTOCOL_MSGATTR));
556+
//如果不能发送自己并且是自己的消息,那么就跳过
557+
if (0 == st_MSGAttr.byAttrSelf && (0 == _tcsxnicmp(st_MessageQueue.tszUserName, tszUserName, _tcsxlen(tszUserName))))
558+
{
559+
st_UserKey.nKeySerial++;
560+
continue;
561+
}
553562
break;
554563
}
555564
st_MQProtocol.nSerial = st_UserKey.nKeySerial;
@@ -858,7 +867,7 @@ bool MessageQueue_TCP_Handle(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, LPCXSTR lpszC
858867
_tcsxcpy(st_UserKey.tszUserName, tszUserName);
859868
_tcsxcpy(st_UserKey.tszKeyName, ppSt_UserKey[i]->tszKeyName);
860869
DBModule_MQUser_KeyUPDate(&st_UserKey);
861-
ProtocolModule_Packet_UNReadInsert(xhUNRead, &ppSt_DBMessage, nDBCount);
870+
ProtocolModule_Packet_UNReadInsert(xhUNRead, &ppSt_DBMessage, nDBCount, tszUserName);
862871
BaseLib_OperatorMemory_Free((XPPPMEM)&ppSt_DBMessage, nDBCount);
863872
}
864873
}

0 commit comments

Comments
 (0)