@@ -184,7 +184,7 @@ void MQ_Create()
184184
185185 st_ProtocolHdr.wHeader = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_HEADER;
186186 st_ProtocolHdr.unOperatorType = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
187- st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQCREATE ;
187+ st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQTOPICCREATE ;
188188 st_ProtocolHdr.byVersion = 1 ;
189189 st_ProtocolHdr.byIsReply = TRUE ; // 获得处理返回结果
190190 st_ProtocolHdr.wTail = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_TAIL;
@@ -369,6 +369,12 @@ void MQ_TimePublish()
369369 printf (" 接受数据失败!\n " );
370370 return ;
371371 }
372+ memset (tszMsgBuffer, ' \0 ' , sizeof (tszMsgBuffer));
373+ memset (&st_XMQProtocol, ' \0 ' , sizeof (XENGINE_PROTOCOL_XMQ));
374+
375+ memcpy (&st_XMQProtocol, ptszMsgBuffer, sizeof (st_XMQProtocol));
376+
377+ printf (" 接受到通知消息,主题:%s,序列:%lld,长度:%d,内容:%s\n " , st_XMQProtocol.tszMQKey , st_XMQProtocol.nSerial , st_ProtocolHdr.unPacketSize - sizeof (XENGINE_PROTOCOL_XMQ), ptszMsgBuffer + sizeof (XENGINE_PROTOCOL_XMQ));
372378 BaseLib_OperatorMemory_FreeCStyle ((XPPMEM)&ptszMsgBuffer);
373379}
374380
@@ -425,7 +431,7 @@ void MQ_GetNumber()
425431 }
426432 }
427433}
428- void MQ_GetSerial ()
434+ void MQ_BindTopic ()
429435{
430436 int nLen = 0 ;
431437 XENGINE_PROTOCOLHDR st_ProtocolHdr;
@@ -438,7 +444,7 @@ void MQ_GetSerial()
438444
439445 st_ProtocolHdr.wHeader = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_HEADER;
440446 st_ProtocolHdr.unOperatorType = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
441- st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQSERIAL ;
447+ st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQTOPICBIND ;
442448 st_ProtocolHdr.byVersion = 1 ;
443449 st_ProtocolHdr.byIsReply = TRUE ;
444450 st_ProtocolHdr.wTail = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_TAIL;
@@ -493,7 +499,7 @@ void MQ_DeleteTopic()
493499
494500 st_ProtocolHdr.wHeader = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_HEADER;
495501 st_ProtocolHdr.unOperatorType = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
496- st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQDELETE ;
502+ st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQTOPICDELETE ;
497503 st_ProtocolHdr.byVersion = 1 ;
498504 st_ProtocolHdr.byIsReply = TRUE ; // 不获取结果
499505 st_ProtocolHdr.wTail = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_TAIL;
@@ -512,47 +518,6 @@ void MQ_DeleteTopic()
512518 }
513519}
514520
515- // 订阅
516- void MQ_Subscribe ()
517- {
518- int nLen = 0 ;
519- XENGINE_PROTOCOLHDR st_ProtocolHdr;
520- XENGINE_PROTOCOL_XMQ st_XMQProtocol;
521- TCHAR tszMsgBuffer[2048 ];
522-
523- memset (tszMsgBuffer, ' \0 ' , sizeof (tszMsgBuffer));
524- memset (&st_ProtocolHdr, ' \0 ' , sizeof (XENGINE_PROTOCOLHDR));
525- memset (&st_XMQProtocol, ' \0 ' , sizeof (XENGINE_PROTOCOL_XMQ));
526-
527- st_ProtocolHdr.wHeader = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_HEADER;
528- st_ProtocolHdr.unOperatorType = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
529- st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQNOTIFY;
530- st_ProtocolHdr.byVersion = 1 ;
531- st_ProtocolHdr.wReserve = 1 ; // 1为请求订阅
532- st_ProtocolHdr.byIsReply = TRUE ;
533- st_ProtocolHdr.wTail = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_TAIL;
534-
535- st_ProtocolHdr.unPacketSize = sizeof (XENGINE_PROTOCOL_XMQ);
536- strcpy (st_XMQProtocol.tszMQKey , lpszKey);
537-
538- nLen = sizeof (XENGINE_PROTOCOLHDR) + st_ProtocolHdr.unPacketSize ;
539- memcpy (tszMsgBuffer, &st_ProtocolHdr, sizeof (XENGINE_PROTOCOLHDR));
540- memcpy (tszMsgBuffer + sizeof (XENGINE_PROTOCOLHDR), &st_XMQProtocol, sizeof (XENGINE_PROTOCOL_XMQ));
541-
542- if (!XClient_TCPSelect_SendMsg (m_Socket, tszMsgBuffer, nLen))
543- {
544- printf (" 发送投递失败!\n " );
545- return ;
546- }
547- nLen = 0 ;
548- CHAR* ptszMsgBuffer = NULL ;
549- if (!XClient_TCPSelect_RecvPkt (m_Socket, &ptszMsgBuffer, &nLen, &st_ProtocolHdr))
550- {
551- printf (" 接受数据失败!\n " );
552- return ;
553- }
554- BaseLib_OperatorMemory_FreeCStyle ((XPPMEM)&ptszMsgBuffer);
555- }
556521int main (int argc, char ** argv)
557522{
558523#ifdef _WINDOWS
@@ -575,12 +540,11 @@ int main(int argc, char** argv)
575540 {
576541 MQ_Post (lpszMsgBuffer);
577542 }
578- MQ_GetSerial ();
579543 MQ_GetNumber ();
544+ MQ_BindTopic ();
580545 MQ_Get ();
581546 MQ_Get ();
582547 MQ_Get ();
583- MQ_Subscribe ();
584548 MQ_TimePublish ();
585549 MQ_DeleteTopic ();
586550 MQ_DeleteUser ();
0 commit comments