@@ -216,7 +216,13 @@ void MQ_Create()
216216 memcpy (&st_XMQProtocol, tszMsgBuffer + sizeof (XENGINE_PROTOCOLHDR), sizeof (XENGINE_PROTOCOL_XMQ));
217217}
218218
219- void MQ_Post (LPCXSTR lpszMsgBuffer)
219+ typedef struct
220+ {
221+ XCHAR tszMsgBuffer[128 ];
222+ int a;
223+ int b;
224+ }XENGINE_SQLBINARY;
225+ void MQ_Post (LPCXSTR lpszMsgBuffer, int nType = 0 )
220226{
221227 int nLen = 0 ;
222228 XENGINE_PROTOCOLHDR st_ProtocolHdr;
@@ -230,20 +236,40 @@ void MQ_Post(LPCXSTR lpszMsgBuffer)
230236 st_ProtocolHdr.wHeader = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_HEADER;
231237 st_ProtocolHdr.unOperatorType = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
232238 st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQPOST;
233- st_ProtocolHdr.byVersion = ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_STRING;
234239 st_ProtocolHdr.byIsReply = true ; // 获得处理返回结果
235240 st_ProtocolHdr.wTail = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_TAIL;
236241
237242 st_XMQProtocol.nSerial = 0 ; // 序列号,0服务会自动处理
238243 st_XMQProtocol.nKeepTime = -1 ; // 保存时间,单位秒,如果为0,获取一次后被抛弃。-1 永久存在,PacketKey不能为空
239244 strcpy (st_XMQProtocol.tszMQKey , lpszKey);
240245
241- st_ProtocolHdr.unPacketSize = sizeof (XENGINE_PROTOCOL_XMQ) + strlen (lpszMsgBuffer);
246+ if (ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_BIN == nType)
247+ {
248+ XENGINE_SQLBINARY st_SQLBinary;
249+ memset (&st_SQLBinary, ' \0 ' , sizeof (XENGINE_SQLBINARY));
242250
243- nLen = sizeof (XENGINE_PROTOCOLHDR) + st_ProtocolHdr.unPacketSize ;
244- memcpy (tszMsgBuffer, &st_ProtocolHdr, sizeof (XENGINE_PROTOCOLHDR));
245- memcpy (tszMsgBuffer + sizeof (XENGINE_PROTOCOLHDR), &st_XMQProtocol, sizeof (XENGINE_PROTOCOL_XMQ));
246- memcpy (tszMsgBuffer + sizeof (XENGINE_PROTOCOLHDR) + sizeof (XENGINE_PROTOCOL_XMQ), lpszMsgBuffer, strlen (lpszMsgBuffer));
251+ st_SQLBinary.a = 1 ;
252+ st_SQLBinary.b = 2 ;
253+ strcpy (st_SQLBinary.tszMsgBuffer , " wdawandono0012d02d01d0j1" );
254+
255+ st_ProtocolHdr.byVersion = ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_BIN;
256+
257+ st_ProtocolHdr.unPacketSize = sizeof (XENGINE_PROTOCOL_XMQ) + sizeof (XENGINE_SQLBINARY);
258+ nLen = sizeof (XENGINE_PROTOCOLHDR) + st_ProtocolHdr.unPacketSize ;
259+ memcpy (tszMsgBuffer, &st_ProtocolHdr, sizeof (XENGINE_PROTOCOLHDR));
260+ memcpy (tszMsgBuffer + sizeof (XENGINE_PROTOCOLHDR), &st_XMQProtocol, sizeof (XENGINE_PROTOCOL_XMQ));
261+ memcpy (tszMsgBuffer + sizeof (XENGINE_PROTOCOLHDR) + sizeof (XENGINE_PROTOCOL_XMQ), &st_SQLBinary, sizeof (XENGINE_SQLBINARY));
262+ }
263+ else
264+ {
265+ st_ProtocolHdr.byVersion = ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_STRING;
266+
267+ st_ProtocolHdr.unPacketSize = sizeof (XENGINE_PROTOCOL_XMQ) + strlen (lpszMsgBuffer);
268+ nLen = sizeof (XENGINE_PROTOCOLHDR) + st_ProtocolHdr.unPacketSize ;
269+ memcpy (tszMsgBuffer, &st_ProtocolHdr, sizeof (XENGINE_PROTOCOLHDR));
270+ memcpy (tszMsgBuffer + sizeof (XENGINE_PROTOCOLHDR), &st_XMQProtocol, sizeof (XENGINE_PROTOCOL_XMQ));
271+ memcpy (tszMsgBuffer + sizeof (XENGINE_PROTOCOLHDR) + sizeof (XENGINE_PROTOCOL_XMQ), lpszMsgBuffer, strlen (lpszMsgBuffer));
272+ }
247273
248274 if (!XClient_TCPSelect_SendMsg (m_Socket, tszMsgBuffer, nLen))
249275 {
@@ -263,7 +289,7 @@ void MQ_Post(LPCXSTR lpszMsgBuffer)
263289 memcpy (&st_XMQProtocol, ptszMsgBuffer, sizeof (XENGINE_PROTOCOL_XMQ));
264290 BaseLib_OperatorMemory_FreeCStyle ((XPPMEM)&ptszMsgBuffer);
265291}
266- void MQ_Get ()
292+ void MQ_Get (int nType = 0 )
267293{
268294 int nLen = 0 ;
269295 XENGINE_PROTOCOLHDR st_ProtocolHdr;
@@ -297,19 +323,29 @@ void MQ_Get()
297323
298324 while (true )
299325 {
300- nLen = 2048 ;
301- memset (tszMsgBuffer, ' \0 ' , sizeof (tszMsgBuffer));
302- if (XClient_TCPSelect_RecvMsg (m_Socket, tszMsgBuffer, &nLen))
303- {
304- memset (&st_ProtocolHdr, ' \0 ' , sizeof (XENGINE_PROTOCOLHDR));
305- memset (&st_XMQProtocol, ' \0 ' , sizeof (XENGINE_PROTOCOL_XMQ));
326+ nLen = 0 ;
327+ XCHAR* ptszMsgBuffer;
306328
307- memcpy (&st_ProtocolHdr, tszMsgBuffer, sizeof (XENGINE_PROTOCOLHDR));
308- memcpy (&st_XMQProtocol, tszMsgBuffer + sizeof (XENGINE_PROTOCOLHDR), sizeof (XENGINE_PROTOCOL_XMQ));
329+ memset (&st_ProtocolHdr, ' \0 ' , sizeof (XENGINE_PROTOCOLHDR));
330+ memset (&st_XMQProtocol, ' \0 ' , sizeof (XENGINE_PROTOCOL_XMQ));
331+ if (XClient_TCPSelect_RecvPkt (m_Socket, &ptszMsgBuffer, &nLen, &st_ProtocolHdr))
332+ {
333+ memcpy (&st_XMQProtocol, ptszMsgBuffer, sizeof (XENGINE_PROTOCOL_XMQ));
309334
310335 if (0 == st_ProtocolHdr.wReserve )
311336 {
312- _xtprintf (" 接受到数据,主题:%s,序列:%lld,长度:%d,内容:%s\n " , st_XMQProtocol.tszMQKey , st_XMQProtocol.nSerial , st_ProtocolHdr.unPacketSize - sizeof (XENGINE_PROTOCOL_XMQ), tszMsgBuffer + sizeof (XENGINE_PROTOCOLHDR) + sizeof (XENGINE_PROTOCOL_XMQ));
337+ if (ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_STRING == nType)
338+ {
339+ _xtprintf (" 接受到数据,主题:%s,序列:%lld,长度:%d,内容:%s\n " , st_XMQProtocol.tszMQKey , st_XMQProtocol.nSerial , st_ProtocolHdr.unPacketSize - sizeof (XENGINE_PROTOCOL_XMQ), ptszMsgBuffer + sizeof (XENGINE_PROTOCOL_XMQ));
340+ }
341+ else
342+ {
343+ XENGINE_SQLBINARY st_SQLBinary;
344+ memset (&st_SQLBinary, ' \0 ' , sizeof (XENGINE_SQLBINARY));
345+
346+ memcpy (&st_SQLBinary, ptszMsgBuffer + sizeof (XENGINE_PROTOCOL_XMQ), sizeof (XENGINE_SQLBINARY));
347+ break ;
348+ }
313349 }
314350 else
315351 {
@@ -451,7 +487,7 @@ void MQ_BindTopic()
451487
452488 st_ProtocolHdr.unPacketSize = sizeof (XENGINE_PROTOCOL_XMQ);
453489
454- st_XMQProtocol.nSerial = 5 ; // 设置为5开始读取
490+ st_XMQProtocol.nSerial = 1 ; // 设置为1开始读取
455491 strcpy (st_XMQProtocol.tszMQKey , lpszKey);
456492
457493 nLen = sizeof (XENGINE_PROTOCOLHDR) + st_ProtocolHdr.unPacketSize ;
@@ -536,15 +572,18 @@ int main(int argc, char** argv)
536572 MQ_Register ();
537573 MQ_Authorize ();
538574 MQ_Create ();
575+ MQ_Post (NULL , ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_BIN);
576+ MQ_BindTopic ();
577+ MQ_Get (ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_BIN);
578+
539579 for (int i = 0 ; i < 10 ; i++)
540580 {
541- MQ_Post (lpszMsgBuffer);
581+ MQ_Post (lpszMsgBuffer, ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_STRING );
542582 }
543583 MQ_GetNumber ();
544- MQ_BindTopic ();
545- MQ_Get ();
546- MQ_Get ();
547- MQ_Get ();
584+ MQ_Get (ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_STRING);
585+ MQ_Get (ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_STRING);
586+ MQ_Get (ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_STRING);
548587 MQ_TimePublish ();
549588 MQ_DeleteTopic ();
550589 MQ_DeleteUser ();
0 commit comments