Skip to content

Commit e1cccd5

Browse files
committed
added:belong user field
1 parent b825024 commit e1cccd5

File tree

5 files changed

+129
-79
lines changed

5 files changed

+129
-79
lines changed

XEngine_Source/MQCore_DBModule/DBModule_Define.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ typedef struct
1818
{
1919
XCHAR tszMsgBuffer[8192]; //消息内容
2020
XCHAR tszUserName[256]; //谁发布的消息
21+
XCHAR tszUserBelong[256]; //谁可以读取此消息
2122
XCHAR tszQueueName[256]; //此消息的KEY
2223
XCHAR tszQueueLeftTime[64]; //过期时间
2324
XCHAR tszQueuePublishTime[64]; //发布时间

XEngine_Source/MQCore_DBModule/DBModule_MQData/DBModule_MQData.cpp

Lines changed: 120 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ bool CDBModule_MQData::DBModule_MQData_Insert(XENGINE_DBMESSAGEQUEUE* pSt_DBInfo
100100
memset(tszSQLCoder, '\0', sizeof(tszSQLCoder));
101101

102102
DataBase_MySQL_Coder(xhDBSQL, pSt_DBInfo->tszMsgBuffer, tszSQLCoder, &pSt_DBInfo->nMsgLen);
103-
__int64u nRet = _xstprintf(tszSQLStatement, _X("INSERT INTO `%s` (tszUserName,tszQueueName,nQueueSerial,tszQueueLeftTime,tszQueuePublishTime,tszQueueData,nDataType,nDataAttr,tszQueueCreateTime) VALUES('%s','%s',%lld,'%s','%s','"), pSt_DBInfo->tszQueueName, pSt_DBInfo->tszUserName, pSt_DBInfo->tszQueueName, pSt_DBInfo->nQueueSerial, pSt_DBInfo->tszQueueLeftTime, pSt_DBInfo->tszQueuePublishTime);
103+
__int64u nRet = _xstprintf(tszSQLStatement, _X("INSERT INTO `%s` (tszUserName,tszUserBelong,tszQueueName,nQueueSerial,tszQueueLeftTime,tszQueuePublishTime,tszQueueData,nDataType,nDataAttr,tszQueueCreateTime) VALUES('%s','%s','%s',%lld,'%s','%s','"), pSt_DBInfo->tszQueueName, pSt_DBInfo->tszUserBelong, pSt_DBInfo->tszUserName, pSt_DBInfo->tszQueueName, pSt_DBInfo->nQueueSerial, pSt_DBInfo->tszQueueLeftTime, pSt_DBInfo->tszQueuePublishTime);
104104
memcpy(tszSQLStatement + nRet, tszSQLCoder, pSt_DBInfo->nMsgLen);
105105
nRet += pSt_DBInfo->nMsgLen;
106106

@@ -164,42 +164,56 @@ bool CDBModule_MQData::DBModule_MQData_Query(XENGINE_DBMESSAGEQUEUE* pSt_DBInfo)
164164
XCHAR** pptszResult = DataBase_MySQL_GetResult(xhDBSQL, xhTable);
165165
XLONG* pInt_Length = DataBase_MySQL_GetLength(xhDBSQL, xhTable);
166166

167-
if (NULL != pptszResult[1])
167+
int nPos = 1;
168+
if (NULL != pptszResult[nPos])
168169
{
169-
_tcsxcpy(pSt_DBInfo->tszUserName, pptszResult[1]);
170+
_tcsxcpy(pSt_DBInfo->tszUserName, pptszResult[nPos]);
170171
}
171-
if (NULL != pptszResult[2])
172+
nPos++;
173+
if (NULL != pptszResult[nPos])
172174
{
173-
_tcsxcpy(pSt_DBInfo->tszQueueName, pptszResult[2]);
175+
_tcsxcpy(pSt_DBInfo->tszUserBelong, pptszResult[nPos]);
174176
}
175-
if (NULL != pptszResult[3])
177+
nPos++;
178+
if (NULL != pptszResult[nPos])
176179
{
177-
pSt_DBInfo->nQueueSerial = _ttxoll(pptszResult[3]);
180+
_tcsxcpy(pSt_DBInfo->tszQueueName, pptszResult[nPos]);
178181
}
179-
if (NULL != pptszResult[4])
182+
nPos++;
183+
if (NULL != pptszResult[nPos])
180184
{
181-
_tcsxcpy(pSt_DBInfo->tszQueueLeftTime, pptszResult[4]);
185+
pSt_DBInfo->nQueueSerial = _ttxoll(pptszResult[nPos]);
182186
}
183-
if (NULL != pptszResult[5])
187+
nPos++;
188+
if (NULL != pptszResult[nPos])
184189
{
185-
_tcsxcpy(pSt_DBInfo->tszQueuePublishTime, pptszResult[5]);
190+
_tcsxcpy(pSt_DBInfo->tszQueueLeftTime, pptszResult[nPos]);
186191
}
187-
if (NULL != pptszResult[6])
192+
nPos++;
193+
if (NULL != pptszResult[nPos])
188194
{
189-
pSt_DBInfo->nMsgLen = pInt_Length[6];
190-
memcpy(pSt_DBInfo->tszMsgBuffer, pptszResult[6], pSt_DBInfo->nMsgLen);
195+
_tcsxcpy(pSt_DBInfo->tszQueuePublishTime, pptszResult[nPos]);
191196
}
192-
if (NULL != pptszResult[7])
197+
nPos++;
198+
if (NULL != pptszResult[nPos])
193199
{
194-
pSt_DBInfo->byMsgType = _ttxoi(pptszResult[7]);
200+
pSt_DBInfo->nMsgLen = pInt_Length[nPos];
201+
memcpy(pSt_DBInfo->tszMsgBuffer, pptszResult[nPos], pSt_DBInfo->nMsgLen);
195202
}
196-
if (NULL != pptszResult[8])
203+
nPos++;
204+
if (NULL != pptszResult[nPos])
197205
{
198-
pSt_DBInfo->byMsgAttr = _ttxoi(pptszResult[8]);
206+
pSt_DBInfo->byMsgType = _ttxoi(pptszResult[nPos]);
199207
}
200-
if (NULL != pptszResult[9])
208+
nPos++;
209+
if (NULL != pptszResult[nPos])
201210
{
202-
_tcsxcpy(pSt_DBInfo->tszQueueCreateTime, pptszResult[9]);
211+
pSt_DBInfo->byMsgAttr = _ttxoi(pptszResult[nPos]);
212+
}
213+
nPos++;
214+
if (NULL != pptszResult[nPos])
215+
{
216+
_tcsxcpy(pSt_DBInfo->tszQueueCreateTime, pptszResult[nPos]);
203217
}
204218
DataBase_MySQL_FreeResult(xhDBSQL, xhTable);
205219
return true;
@@ -234,7 +248,7 @@ bool CDBModule_MQData::DBModule_MQData_Modify(XENGINE_DBMESSAGEQUEUE* pSt_DBInfo
234248
memset(tszSQLCoder, '\0', sizeof(tszSQLCoder));
235249

236250
DataBase_MySQL_Coder(xhDBSQL, pSt_DBInfo->tszMsgBuffer, tszSQLCoder, &pSt_DBInfo->nMsgLen);
237-
__int64u nRet = _xstprintf(tszSQLStatement, _X("UPDATE `%s` SET tszQueueLeftTime = '%s',tszQueuePublishTime = '%s',tszQueueData = '"), pSt_DBInfo->tszQueueName, pSt_DBInfo->tszQueueLeftTime, pSt_DBInfo->tszQueuePublishTime);
251+
__int64u nRet = _xstprintf(tszSQLStatement, _X("UPDATE `%s` SET tszUserBelong = '%s',tszQueueLeftTime = '%s',tszQueuePublishTime = '%s',tszQueueData = '"), pSt_DBInfo->tszQueueName, pSt_DBInfo->tszUserBelong, pSt_DBInfo->tszQueueLeftTime, pSt_DBInfo->tszQueuePublishTime);
238252

239253
memcpy(tszSQLStatement + nRet, tszSQLCoder, pSt_DBInfo->nMsgLen);
240254
nRet += pSt_DBInfo->nMsgLen;
@@ -324,42 +338,51 @@ bool CDBModule_MQData::DBModule_MQData_List(LPCXSTR lpszQueueName, __int64x nSer
324338
XCHAR** pptszResult = DataBase_MySQL_GetResult(xhDBSQL, xhTable);
325339
XLONG* pInt_Length = DataBase_MySQL_GetLength(xhDBSQL, xhTable);
326340

327-
if (NULL != pptszResult[1])
341+
int nPos = 1;
342+
if (NULL != pptszResult[nPos])
328343
{
329-
_tcsxcpy((*pppSt_DBMessage)[i]->tszUserName, pptszResult[1]);
344+
_tcsxcpy((*pppSt_DBMessage)[i]->tszUserName, pptszResult[nPos]);
330345
}
331-
if (NULL != pptszResult[2])
346+
nPos++;
347+
if (NULL != pptszResult[nPos])
332348
{
333-
_tcsxcpy((*pppSt_DBMessage)[i]->tszQueueName, pptszResult[2]);
349+
_tcsxcpy((*pppSt_DBMessage)[i]->tszQueueName, pptszResult[nPos]);
334350
}
335-
if (NULL != pptszResult[3])
351+
nPos++;
352+
if (NULL != pptszResult[nPos])
336353
{
337-
(*pppSt_DBMessage)[i]->nQueueSerial = _ttxoll(pptszResult[3]);
354+
(*pppSt_DBMessage)[i]->nQueueSerial = _ttxoll(pptszResult[nPos]);
338355
}
339-
if (NULL != pptszResult[4])
356+
nPos++;
357+
if (NULL != pptszResult[nPos])
340358
{
341-
_tcsxcpy((*pppSt_DBMessage)[i]->tszQueueLeftTime, pptszResult[4]);
359+
_tcsxcpy((*pppSt_DBMessage)[i]->tszQueueLeftTime, pptszResult[nPos]);
342360
}
343-
if (NULL != pptszResult[5])
361+
nPos++;
362+
if (NULL != pptszResult[nPos])
344363
{
345-
_tcsxcpy((*pppSt_DBMessage)[i]->tszQueuePublishTime, pptszResult[5]);
364+
_tcsxcpy((*pppSt_DBMessage)[i]->tszQueuePublishTime, pptszResult[nPos]);
346365
}
347-
if (NULL != pptszResult[6])
366+
nPos++;
367+
if (NULL != pptszResult[nPos])
348368
{
349-
(*pppSt_DBMessage)[i]->nMsgLen = pInt_Length[6];
350-
memcpy((*pppSt_DBMessage)[i]->tszMsgBuffer, pptszResult[6], (*pppSt_DBMessage)[i]->nMsgLen);
369+
(*pppSt_DBMessage)[i]->nMsgLen = pInt_Length[nPos];
370+
memcpy((*pppSt_DBMessage)[i]->tszMsgBuffer, pptszResult[nPos], (*pppSt_DBMessage)[i]->nMsgLen);
351371
}
352-
if (NULL != pptszResult[7])
372+
nPos++;
373+
if (NULL != pptszResult[nPos])
353374
{
354-
(*pppSt_DBMessage)[i]->byMsgType = _ttxoi(pptszResult[7]);
375+
(*pppSt_DBMessage)[i]->byMsgType = _ttxoi(pptszResult[nPos]);
355376
}
356-
if (NULL != pptszResult[8])
377+
nPos++;
378+
if (NULL != pptszResult[nPos])
357379
{
358-
(*pppSt_DBMessage)[i]->byMsgAttr = _ttxoi(pptszResult[8]);
380+
(*pppSt_DBMessage)[i]->byMsgAttr = _ttxoi(pptszResult[nPos]);
359381
}
360-
if (NULL != pptszResult[9])
382+
nPos++;
383+
if (NULL != pptszResult[nPos])
361384
{
362-
_tcsxcpy((*pppSt_DBMessage)[i]->tszQueueCreateTime, pptszResult[9]);
385+
_tcsxcpy((*pppSt_DBMessage)[i]->tszQueueCreateTime, pptszResult[nPos]);
363386
}
364387
}
365388
DataBase_MySQL_FreeResult(xhDBSQL, xhTable);
@@ -430,42 +453,51 @@ bool CDBModule_MQData::DBModule_MQData_GetSerial(LPCXSTR lpszName, __int64x* pIn
430453
pptszResult = DataBase_MySQL_GetResult(xhDBSQL, xhTable);
431454
XLONG* pInt_Length = DataBase_MySQL_GetLength(xhDBSQL, xhTable);
432455

433-
if (NULL != pptszResult[1])
456+
int nPos = 1;
457+
if (NULL != pptszResult[nPos])
434458
{
435-
_tcsxcpy(pSt_DBStart->tszUserName, pptszResult[1]);
459+
_tcsxcpy(pSt_DBStart->tszUserName, pptszResult[nPos]);
436460
}
437-
if (NULL != pptszResult[2])
461+
nPos++;
462+
if (NULL != pptszResult[nPos])
438463
{
439-
_tcsxcpy(pSt_DBStart->tszQueueName, pptszResult[2]);
464+
_tcsxcpy(pSt_DBStart->tszQueueName, pptszResult[nPos]);
440465
}
441-
if (NULL != pptszResult[3])
466+
nPos++;
467+
if (NULL != pptszResult[nPos])
442468
{
443-
pSt_DBStart->nQueueSerial = _ttxoll(pptszResult[3]);
469+
pSt_DBStart->nQueueSerial = _ttxoll(pptszResult[nPos]);
444470
}
445-
if (NULL != pptszResult[4])
471+
nPos++;
472+
if (NULL != pptszResult[nPos])
446473
{
447-
_tcsxcpy(pSt_DBStart->tszQueueLeftTime, pptszResult[4]);
474+
_tcsxcpy(pSt_DBStart->tszQueueLeftTime, pptszResult[nPos]);
448475
}
449-
if (NULL != pptszResult[5])
476+
nPos++;
477+
if (NULL != pptszResult[nPos])
450478
{
451-
_tcsxcpy(pSt_DBStart->tszQueuePublishTime, pptszResult[5]);
479+
_tcsxcpy(pSt_DBStart->tszQueuePublishTime, pptszResult[nPos]);
452480
}
453-
if (NULL != pptszResult[6])
481+
nPos++;
482+
if (NULL != pptszResult[nPos])
454483
{
455-
pSt_DBStart->nMsgLen = pInt_Length[6];
456-
memcpy(pSt_DBStart->tszMsgBuffer, pptszResult[6], pSt_DBStart->nMsgLen);
484+
pSt_DBStart->nMsgLen = pInt_Length[nPos];
485+
memcpy(pSt_DBStart->tszMsgBuffer, pptszResult[nPos], pSt_DBStart->nMsgLen);
457486
}
458-
if (NULL != pptszResult[7])
487+
nPos++;
488+
if (NULL != pptszResult[nPos])
459489
{
460-
pSt_DBStart->byMsgType = _ttxoi(pptszResult[7]);
490+
pSt_DBStart->byMsgType = _ttxoi(pptszResult[nPos]);
461491
}
462-
if (NULL != pptszResult[8])
492+
nPos++;
493+
if (NULL != pptszResult[nPos])
463494
{
464-
pSt_DBStart->byMsgAttr = _ttxoi(pptszResult[8]);
495+
pSt_DBStart->byMsgAttr = _ttxoi(pptszResult[nPos]);
465496
}
466-
if (NULL != pptszResult[9])
497+
nPos++;
498+
if (NULL != pptszResult[nPos])
467499
{
468-
_tcsxcpy(pSt_DBStart->tszQueueCreateTime, pptszResult[9]);
500+
_tcsxcpy(pSt_DBStart->tszQueueCreateTime, pptszResult[nPos]);
469501
}
470502
DataBase_MySQL_FreeResult(xhDBSQL, xhTable);
471503
}
@@ -491,42 +523,51 @@ bool CDBModule_MQData::DBModule_MQData_GetSerial(LPCXSTR lpszName, __int64x* pIn
491523
pptszResult = DataBase_MySQL_GetResult(xhDBSQL, xhTable);
492524
XLONG* pInt_Length = DataBase_MySQL_GetLength(xhDBSQL, xhTable);
493525

494-
if (NULL != pptszResult[1])
526+
int nPos = 1;
527+
if (NULL != pptszResult[nPos])
495528
{
496-
_tcsxcpy(pSt_DBEnd->tszUserName, pptszResult[1]);
529+
_tcsxcpy(pSt_DBEnd->tszUserName, pptszResult[nPos]);
497530
}
498-
if (NULL != pptszResult[2])
531+
nPos++;
532+
if (NULL != pptszResult[nPos])
499533
{
500-
_tcsxcpy(pSt_DBEnd->tszQueueName, pptszResult[2]);
534+
_tcsxcpy(pSt_DBEnd->tszQueueName, pptszResult[nPos]);
501535
}
502-
if (NULL != pptszResult[3])
536+
nPos++;
537+
if (NULL != pptszResult[nPos])
503538
{
504-
pSt_DBEnd->nQueueSerial = _ttxoll(pptszResult[3]);
539+
pSt_DBEnd->nQueueSerial = _ttxoll(pptszResult[nPos]);
505540
}
506-
if (NULL != pptszResult[4])
541+
nPos++;
542+
if (NULL != pptszResult[nPos])
507543
{
508-
_tcsxcpy(pSt_DBEnd->tszQueueLeftTime, pptszResult[4]);
544+
_tcsxcpy(pSt_DBEnd->tszQueueLeftTime, pptszResult[nPos]);
509545
}
510-
if (NULL != pptszResult[5])
546+
nPos++;
547+
if (NULL != pptszResult[nPos])
511548
{
512-
_tcsxcpy(pSt_DBEnd->tszQueuePublishTime, pptszResult[5]);
549+
_tcsxcpy(pSt_DBEnd->tszQueuePublishTime, pptszResult[nPos]);
513550
}
514-
if (NULL != pptszResult[6])
551+
nPos++;
552+
if (NULL != pptszResult[nPos])
515553
{
516-
pSt_DBEnd->nMsgLen = pInt_Length[6];
517-
memcpy(pSt_DBEnd->tszMsgBuffer, pptszResult[6], pSt_DBEnd->nMsgLen);
554+
pSt_DBEnd->nMsgLen = pInt_Length[nPos];
555+
memcpy(pSt_DBEnd->tszMsgBuffer, pptszResult[nPos], pSt_DBEnd->nMsgLen);
518556
}
519-
if (NULL != pptszResult[7])
557+
nPos++;
558+
if (NULL != pptszResult[nPos])
520559
{
521-
pSt_DBEnd->byMsgType = _ttxoi(pptszResult[7]);
560+
pSt_DBEnd->byMsgType = _ttxoi(pptszResult[nPos]);
522561
}
523-
if (NULL != pptszResult[8])
562+
nPos++;
563+
if (NULL != pptszResult[nPos])
524564
{
525-
pSt_DBEnd->byMsgAttr = _ttxoi(pptszResult[8]);
565+
pSt_DBEnd->byMsgAttr = _ttxoi(pptszResult[nPos]);
526566
}
527-
if (NULL != pptszResult[9])
567+
nPos++;
568+
if (NULL != pptszResult[nPos])
528569
{
529-
_tcsxcpy(pSt_DBEnd->tszQueueCreateTime, pptszResult[9]);
570+
_tcsxcpy(pSt_DBEnd->tszQueueCreateTime, pptszResult[nPos]);
530571
}
531572
DataBase_MySQL_FreeResult(xhDBSQL, xhTable);
532573
}
@@ -585,6 +626,7 @@ bool CDBModule_MQData::DBModule_MQData_CreateTable(LPCXSTR lpszQueueName)
585626
_xstprintf(tszSQLQuery, _X("CREATE TABLE IF NOT EXISTS `%s` ("
586627
"`ID` int NOT NULL AUTO_INCREMENT,"
587628
"`tszUserName` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '谁发布的消息',"
629+
"`tszUserBelong` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '所属用户',"
588630
"`tszQueueName` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '所属队列',"
589631
"`nQueueSerial` bigint NOT NULL COMMENT '消息序列',"
590632
"`tszQueueLeftTime` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '过期时间',"

XEngine_Source/MQCore_ProtocolModule/ProtocolModule_Packet/ProtocolModule_Packet.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,7 @@ bool CProtocolModule_Packet::ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken,
582582
}
583583
Json::Value st_JsonObject;
584584
st_JsonObject["tszQueueName"] = (*pppSt_DBMessage)[i]->tszQueueName;
585+
st_JsonObject["tszUserBelong"] = (*pppSt_DBMessage)[i]->tszUserBelong;
585586
st_JsonObject["tszUserName"] = (*pppSt_DBMessage)[i]->tszUserName;
586587
st_JsonObject["tszQueueLeftTime"] = (*pppSt_DBMessage)[i]->tszQueueLeftTime;
587588
st_JsonObject["tszQueuePublishTime"] = (*pppSt_DBMessage)[i]->tszQueuePublishTime;
@@ -785,6 +786,7 @@ bool CProtocolModule_Packet::ProtocolModule_Packet_WSCommon(XENGINE_PROTOCOLHDR*
785786
if (NULL != pSt_MQProtocol)
786787
{
787788
st_JsonMQProtocol["tszMQKey"] = pSt_MQProtocol->tszMQKey;
789+
st_JsonMQProtocol["tszMQUsr"] = pSt_MQProtocol->tszMQUsr;
788790
st_JsonMQProtocol["nSerial"] = (Json::Value::Int64)pSt_MQProtocol->nSerial;
789791
st_JsonMQProtocol["nKeepTime"] = pSt_MQProtocol->nKeepTime;
790792
st_JsonMQProtocol["nPubTime"] = (Json::Value::Int64)pSt_MQProtocol->nPubTime;

XEngine_Source/MQCore_ProtocolModule/ProtocolModule_Parse/ProtocolModule_Parse.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,10 @@ bool CProtocolModule_Parse::ProtocolModule_Parse_Websocket(LPCXSTR lpszMsgBuffer
121121
{
122122
_tcsxcpy(st_MQProtocol.tszMQKey, st_JsonMQProtocol["tszMQKey"].asCString());
123123
}
124+
if (!st_JsonMQProtocol["tszMQUsr"].isNull())
125+
{
126+
_tcsxcpy(st_MQProtocol.tszMQUsr, st_JsonMQProtocol["tszMQUsr"].asCString());
127+
}
124128
if (!st_JsonMQProtocol["nSerial"].isNull())
125129
{
126130
st_MQProtocol.nSerial = st_JsonMQProtocol["nSerial"].asInt();

XEngine_Source/XQueue_ProtocolHdr.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ typedef struct
5757
{
5858
XBYTE byAttrAll : 1; //通知所有
5959
XBYTE byAttrSelf : 1; //自己也能接受
60-
XBYTE byAttrResver2 : 1;
60+
XBYTE byAttrReply : 1; //对方必须回复
6161
XBYTE byAttrResver3 : 1;
6262
XBYTE byAttrResver4 : 1;
6363
XBYTE byAttrResver5 : 1;
@@ -70,6 +70,7 @@ typedef struct
7070
typedef struct
7171
{
7272
XCHAR tszMQKey[MAX_PATH]; //此消息的KEY,不能为空
73+
XCHAR tszMQUsr[MAX_PATH]; //此消息接受用户
7374
__int64x nSerial; //包序列号
7475
__int64x nPubTime; //发布时间,根据自己需求配置时区
7576
int nKeepTime; //可用时间,单位秒,>0 超时秒数

0 commit comments

Comments
 (0)