@@ -121,6 +121,58 @@ void MQ_Authorize()
121121 BaseLib_OperatorMemory_FreeCStyle ((XPPMEM)&ptszMsgBuffer);
122122 }
123123}
124+ // 获取未读消息
125+ void MQ_GetUNRead (int nType = 0 )
126+ {
127+ int nLen = 0 ;
128+ XENGINE_PROTOCOLHDR st_ProtocolHdr;
129+ XENGINE_PROTOCOL_XMQ st_XMQProtocol;
130+ XCHAR tszMsgBuffer[2048 ];
131+
132+ memset (tszMsgBuffer, ' \0 ' , sizeof (tszMsgBuffer));
133+ memset (&st_ProtocolHdr, ' \0 ' , sizeof (XENGINE_PROTOCOLHDR));
134+ memset (&st_XMQProtocol, ' \0 ' , sizeof (XENGINE_PROTOCOL_XMQ));
135+
136+ st_ProtocolHdr.wHeader = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_HEADER;
137+ st_ProtocolHdr.unOperatorType = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
138+ st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQUNREAD;
139+ st_ProtocolHdr.byVersion = 1 ;
140+ st_ProtocolHdr.byIsReply = true ; // 必须为真
141+ st_ProtocolHdr.wTail = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_TAIL;
142+ st_ProtocolHdr.unPacketSize = sizeof (XENGINE_PROTOCOL_XMQ);
143+
144+ nLen = sizeof (XENGINE_PROTOCOLHDR) + st_ProtocolHdr.unPacketSize ;
145+ memcpy (tszMsgBuffer, &st_ProtocolHdr, sizeof (XENGINE_PROTOCOLHDR));
146+ memcpy (tszMsgBuffer + sizeof (XENGINE_PROTOCOLHDR), &st_XMQProtocol, sizeof (XENGINE_PROTOCOL_XMQ));
147+
148+ if (!XClient_TCPSelect_SendMsg (m_Socket, tszMsgBuffer, nLen))
149+ {
150+ _xtprintf (" 发送投递失败!\n " );
151+ return ;
152+ }
153+
154+ while (true )
155+ {
156+ nLen = 0 ;
157+ XCHAR* ptszMsgBuffer;
158+
159+ memset (&st_ProtocolHdr, ' \0 ' , sizeof (XENGINE_PROTOCOLHDR));
160+ if (XClient_TCPSelect_RecvPkt (m_Socket, &ptszMsgBuffer, &nLen, &st_ProtocolHdr))
161+ {
162+ if (0 == st_ProtocolHdr.wReserve )
163+ {
164+ _xtprintf (" 接受到数据,长度:%d,内容:%s\n " , st_ProtocolHdr.unPacketSize , ptszMsgBuffer);
165+ }
166+ else
167+ {
168+ _xtprintf (" 获取消息队列数据失败,错误码:%d\n " , st_ProtocolHdr.wReserve );
169+ }
170+ BaseLib_OperatorMemory_FreeCStyle ((XPPMEM)&ptszMsgBuffer);
171+ break ;
172+ }
173+ std::this_thread::sleep_for (std::chrono::milliseconds (1 ));
174+ }
175+ }
124176// 删除用户
125177void MQ_DeleteUser ()
126178{
@@ -222,7 +274,7 @@ typedef struct
222274 int a;
223275 int b;
224276}XENGINE_SQLBINARY;
225- void MQ_Post (LPCXSTR lpszMsgBuffer, int nType = 0 )
277+ void MQ_Post (LPCXSTR lpszMsgBuffer, int nType = 0 , int nPubTime = - 1 , bool bSelf = false )
226278{
227279 int nLen = 0 ;
228280 XENGINE_PROTOCOLHDR st_ProtocolHdr;
@@ -240,9 +292,11 @@ void MQ_Post(LPCXSTR lpszMsgBuffer, int nType = 0)
240292 st_ProtocolHdr.wTail = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_TAIL;
241293
242294 st_XMQProtocol.nSerial = 0 ; // 序列号,0服务会自动处理
243- st_XMQProtocol.nKeepTime = -1 ; // 保存时间,单位秒,如果为0,获取一次后被抛弃。-1 永久存在,PacketKey不能为空
295+ st_XMQProtocol.nKeepTime = 0 ;
296+ st_XMQProtocol.nPubTime = nPubTime;
244297 strcpy (st_XMQProtocol.tszMQKey , lpszKey);
245298
299+ st_XMQProtocol.st_MSGAttr .byAttrSelf = bSelf;
246300 if (ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_BIN == nType)
247301 {
248302 XENGINE_SQLBINARY st_SQLBinary;
@@ -379,6 +433,7 @@ void MQ_TimePublish()
379433 st_ProtocolHdr.unPacketSize = sizeof (XENGINE_PROTOCOL_XMQ) + _tcsxlen (lpszMsgBuffer);
380434 st_XMQProtocol.nSerial = 0 ; // 要获取的序列号,如果为0,服务会自动处理
381435 st_XMQProtocol.nPubTime = time (NULL ) + 30 ; // 当前时间+60秒
436+ st_XMQProtocol.st_MSGAttr .byAttrSelf = 1 ;
382437 strcpy (st_XMQProtocol.tszMQKey , lpszKey);
383438
384439 nLen = sizeof (XENGINE_PROTOCOLHDR) + st_ProtocolHdr.unPacketSize ;
@@ -571,14 +626,16 @@ int main(int argc, char** argv)
571626
572627 MQ_Register ();
573628 MQ_Authorize ();
629+ MQ_GetUNRead ();
574630 MQ_Create ();
575- MQ_Post (NULL , ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_BIN);
631+ MQ_Post (NULL , ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_BIN, - 1 , true );
576632 MQ_BindTopic ();
577633 MQ_Get (ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_BIN);
578634
635+ MQ_Post (lpszMsgBuffer, ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_STRING, 0 );
579636 for (int i = 0 ; i < 10 ; i++)
580637 {
581- MQ_Post (lpszMsgBuffer, ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_STRING);
638+ MQ_Post (lpszMsgBuffer, ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_STRING, - 1 , true );
582639 }
583640 MQ_GetNumber ();
584641 MQ_Get (ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_STRING);
0 commit comments