@@ -415,37 +415,65 @@ bool MessageQueue_TCP_Handle(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, LPCXSTR lpszC
415415 XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, nNetType);
416416 }
417417 // 是否需要通知
418- int nListCount = 0 ;
419418 if (0 == st_MQProtocol.nPubTime )
420419 {
421- // 定时任务不通知
422- XENGINE_DBUSERKEY** ppSt_ListUser;
423- if (DBModule_MQUser_KeyList (NULL , st_MQProtocol.tszMQKey , &ppSt_ListUser, &nListCount))
420+ // 设置为0,不是定时发布
421+ if (1 == st_MQProtocol.st_MSGAttr .byAttrAll )
424422 {
425- int nTCPLen = 0 ;
426- XCHAR tszTCPBuffer[4096 ];
427- memset (tszTCPBuffer, ' \0 ' , sizeof (tszTCPBuffer));
428-
429- pSt_ProtocolHdr->unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_MSGNOTIFY;
430- ProtocolModule_Packet_Common (nNetType, pSt_ProtocolHdr, &st_MQProtocol, tszTCPBuffer, &nTCPLen, lpszMsgBuffer + sizeof (XENGINE_PROTOCOL_XMQ), nMsgLen - sizeof (XENGINE_PROTOCOL_XMQ));
423+ int nListCount = 0 ;
424+ XCHAR** pptszListAddr;
425+ SessionModule_Client_GetListAddr (&pptszListAddr, &nListCount);
431426 for (int i = 0 ; i < nListCount; i++)
432427 {
433428 // 跳过自己
434- if (0 == _tcsxncmp (tszUserName, ppSt_ListUser [i]-> tszUserName , _tcsxlen (tszUserName )))
429+ if (0 == _tcsxncmp (lpszClientAddr, pptszListAddr [i], _tcsxlen (lpszClientAddr )))
435430 {
436431 continue ;
437432 }
438- XCHAR tszUserAddr[128 ];
439- memset (tszUserAddr, ' \0 ' , sizeof (tszUserAddr));
433+ nSDLen = 0 ;
434+ int nMSGLen = 0 ;
435+ int nClientType = 0 ;
436+ memset (tszSDBuffer, ' \0 ' , sizeof (tszSDBuffer));
437+
438+ pSt_ProtocolHdr->unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_MSGNOTIFY;
439+
440+ SessionModule_Client_GetType (pptszListAddr[i], &nClientType);
441+ ProtocolModule_Packet_Common (nClientType, pSt_ProtocolHdr, &st_MQProtocol, tszSDBuffer, &nSDLen, lpszMsgBuffer + sizeof (XENGINE_PROTOCOL_XMQ), nMsgLen - sizeof (XENGINE_PROTOCOL_XMQ));
442+ XEngine_MQXService_Send (pptszListAddr[i], tszSDBuffer, nSDLen, nClientType);
443+ BaseLib_OperatorMemory_Free ((XPPPMEM)&pptszListAddr, nListCount);
444+ }
445+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X (" %s消息端:%s,主题:%s,序列:%lld,投递数据到消息队列成功,通知客户端个数:%d" ), lpszClientType, lpszClientAddr, st_DBQueue.tszQueueName , st_DBQueue.nQueueSerial , nListCount);
446+ }
447+ else
448+ {
449+ int nListCount = 0 ;
450+ XENGINE_DBUSERKEY** ppSt_ListUser;
451+ if (DBModule_MQUser_KeyList (NULL , st_MQProtocol.tszMQKey , &ppSt_ListUser, &nListCount))
452+ {
453+ int nTCPLen = 0 ;
454+ XCHAR tszTCPBuffer[4096 ];
455+ memset (tszTCPBuffer, ' \0 ' , sizeof (tszTCPBuffer));
440456
441- SessionModule_Client_GetAddr (ppSt_ListUser[i]->tszUserName , tszUserAddr);
442- XEngine_MQXService_Send (tszUserAddr, tszTCPBuffer, nTCPLen, nNetType);
457+ pSt_ProtocolHdr->unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_MSGNOTIFY;
458+ ProtocolModule_Packet_Common (nNetType, pSt_ProtocolHdr, &st_MQProtocol, tszTCPBuffer, &nTCPLen, lpszMsgBuffer + sizeof (XENGINE_PROTOCOL_XMQ), nMsgLen - sizeof (XENGINE_PROTOCOL_XMQ));
459+ for (int i = 0 ; i < nListCount; i++)
460+ {
461+ // 跳过自己
462+ if (0 == _tcsxncmp (tszUserName, ppSt_ListUser[i]->tszUserName , _tcsxlen (tszUserName)))
463+ {
464+ continue ;
465+ }
466+ XCHAR tszUserAddr[128 ];
467+ memset (tszUserAddr, ' \0 ' , sizeof (tszUserAddr));
468+
469+ SessionModule_Client_GetAddr (ppSt_ListUser[i]->tszUserName , tszUserAddr);
470+ XEngine_MQXService_Send (tszUserAddr, tszTCPBuffer, nTCPLen, nNetType);
471+ }
472+ BaseLib_OperatorMemory_Free ((XPPPMEM)&ppSt_ListUser, nListCount);
443473 }
444- BaseLib_OperatorMemory_Free ((XPPPMEM)&ppSt_ListUser , nListCount);
474+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X ( " %s消息端:%s,主题:%s,序列:%lld,投递数据到消息队列成功,通知客户端个数:%d " ), lpszClientType, lpszClientAddr, st_DBQueue. tszQueueName , st_DBQueue. nQueueSerial , nListCount);
445475 }
446476 }
447- XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X (" %s消息端:%s,主题:%s,序列:%lld,投递数据到消息队列成功,通知客户端个数:%d" ), lpszClientType, lpszClientAddr, st_DBQueue.tszQueueName , st_DBQueue.nQueueSerial , nListCount);
448-
449477 }
450478 else if (XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQGET == pSt_ProtocolHdr->unOperatorCode )
451479 {
0 commit comments