Skip to content

Commit 65d98fa

Browse files
committed
added:write and read to message attr of the database supported
1 parent 0efd89a commit 65d98fa

File tree

3 files changed

+39
-9
lines changed

3 files changed

+39
-9
lines changed

XEngine_Source/MQCore_DBModule/DBModule_Define.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ typedef struct
2525
__int64x nQueueSerial; //包序列号
2626
__int64x nQueueGetTime; //可以获取的次数
2727
int nMsgLen; //消息大小
28-
XBYTE byMsgType; //消息类型,参考:ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE
28+
XBYTE byMsgType; //消息类型,参考:ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE
29+
XBYTE byMsgAttr; //消息属性
2930
}XENGINE_DBMESSAGEQUEUE;
3031
typedef struct
3132
{

XEngine_Source/MQCore_DBModule/DBModule_MQData/DBModule_MQData.cpp

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,12 @@ bool CDBModule_MQData::DBModule_MQData_Insert(XENGINE_DBMESSAGEQUEUE* pSt_DBInfo
100100
memset(tszSQLCoder, '\0', sizeof(tszSQLCoder));
101101

102102
DataBase_MySQL_Coder(xhDBSQL, pSt_DBInfo->tszMsgBuffer, tszSQLCoder, &pSt_DBInfo->nMsgLen);
103-
__int64u nRet = _xstprintf(tszSQLStatement, _X("INSERT INTO `%s` (tszUserName,tszQueueName,nQueueSerial,nQueueGetTime,tszQueueLeftTime,tszQueuePublishTime,tszQueueData,nDataType,tszQueueCreateTime) VALUES('%s','%s',%lld,%lld,'%s','%s','"), pSt_DBInfo->tszQueueName, pSt_DBInfo->tszUserName, pSt_DBInfo->tszQueueName, pSt_DBInfo->nQueueSerial, pSt_DBInfo->nQueueGetTime, pSt_DBInfo->tszQueueLeftTime, pSt_DBInfo->tszQueuePublishTime);
103+
__int64u nRet = _xstprintf(tszSQLStatement, _X("INSERT INTO `%s` (tszUserName,tszQueueName,nQueueSerial,nQueueGetTime,tszQueueLeftTime,tszQueuePublishTime,tszQueueData,nDataType,nDataAttr,tszQueueCreateTime) VALUES('%s','%s',%lld,%lld,'%s','%s','"), pSt_DBInfo->tszQueueName, pSt_DBInfo->tszUserName, pSt_DBInfo->tszQueueName, pSt_DBInfo->nQueueSerial, pSt_DBInfo->nQueueGetTime, pSt_DBInfo->tszQueueLeftTime, pSt_DBInfo->tszQueuePublishTime);
104104
memcpy(tszSQLStatement + nRet, tszSQLCoder, pSt_DBInfo->nMsgLen);
105105
nRet += pSt_DBInfo->nMsgLen;
106106

107107
memset(tszSQLCoder, '\0', sizeof(tszSQLCoder));
108-
int nLen = _xstprintf(tszSQLCoder, _X("',%d,now())"), pSt_DBInfo->byMsgType);
108+
int nLen = _xstprintf(tszSQLCoder, _X("',%d,%d,now())"), pSt_DBInfo->byMsgType, pSt_DBInfo->byMsgAttr);
109109
memcpy(tszSQLStatement + nRet, tszSQLCoder, nLen);
110110
nRet += nLen;
111111

@@ -199,7 +199,11 @@ bool CDBModule_MQData::DBModule_MQData_Query(XENGINE_DBMESSAGEQUEUE* pSt_DBInfo)
199199
}
200200
if (NULL != pptszResult[9])
201201
{
202-
_tcsxcpy(pSt_DBInfo->tszQueueCreateTime, pptszResult[9]);
202+
pSt_DBInfo->byMsgAttr = _ttxoi(pptszResult[9]);
203+
}
204+
if (NULL != pptszResult[10])
205+
{
206+
_tcsxcpy(pSt_DBInfo->tszQueueCreateTime, pptszResult[10]);
203207
}
204208
DataBase_MySQL_FreeResult(xhDBSQL, xhTable);
205209
return true;
@@ -240,7 +244,7 @@ bool CDBModule_MQData::DBModule_MQData_Modify(XENGINE_DBMESSAGEQUEUE* pSt_DBInfo
240244
nRet += pSt_DBInfo->nMsgLen;
241245

242246
memset(tszSQLCoder, '\0', sizeof(tszSQLCoder));
243-
int nLen = _xstprintf(tszSQLCoder, _X("',nDataType = %d WHERE tszUserName = '%s' AND tszQueueName = '%s' AND nQueueSerial = %lld"), pSt_DBInfo->byMsgType, pSt_DBInfo->tszUserName, pSt_DBInfo->tszQueueName, pSt_DBInfo->nQueueSerial);
247+
int nLen = _xstprintf(tszSQLCoder, _X("',nDataType = %d,nDataAttr = %d WHERE tszUserName = '%s' AND tszQueueName = '%s' AND nQueueSerial = %lld"), pSt_DBInfo->byMsgType, pSt_DBInfo->byMsgAttr, pSt_DBInfo->tszUserName, pSt_DBInfo->tszQueueName, pSt_DBInfo->nQueueSerial);
244248
memcpy(tszSQLStatement + nRet, tszSQLCoder, nLen);
245249
nRet += nLen;
246250

@@ -359,7 +363,11 @@ bool CDBModule_MQData::DBModule_MQData_List(LPCXSTR lpszQueueName, __int64x nSer
359363
}
360364
if (NULL != pptszResult[9])
361365
{
362-
_tcsxcpy((*pppSt_DBMessage)[i]->tszQueueCreateTime, pptszResult[9]);
366+
(*pppSt_DBMessage)[i]->byMsgAttr = _ttxoi(pptszResult[9]);
367+
}
368+
if (NULL != pptszResult[10])
369+
{
370+
_tcsxcpy((*pppSt_DBMessage)[i]->tszQueueCreateTime, pptszResult[10]);
363371
}
364372
}
365373
DataBase_MySQL_FreeResult(xhDBSQL, xhTable);
@@ -465,7 +473,11 @@ bool CDBModule_MQData::DBModule_MQData_GetSerial(LPCXSTR lpszName, __int64x* pIn
465473
}
466474
if (NULL != pptszResult[9])
467475
{
468-
_tcsxcpy(pSt_DBStart->tszQueueCreateTime, pptszResult[9]);
476+
pSt_DBStart->byMsgAttr = _ttxoi(pptszResult[9]);
477+
}
478+
if (NULL != pptszResult[10])
479+
{
480+
_tcsxcpy(pSt_DBStart->tszQueueCreateTime, pptszResult[10]);
469481
}
470482
DataBase_MySQL_FreeResult(xhDBSQL, xhTable);
471483
}
@@ -526,7 +538,11 @@ bool CDBModule_MQData::DBModule_MQData_GetSerial(LPCXSTR lpszName, __int64x* pIn
526538
}
527539
if (NULL != pptszResult[9])
528540
{
529-
_tcsxcpy(pSt_DBEnd->tszQueueCreateTime, pptszResult[9]);
541+
pSt_DBEnd->byMsgAttr = _ttxoi(pptszResult[9]);
542+
}
543+
if (NULL != pptszResult[10])
544+
{
545+
_tcsxcpy(pSt_DBEnd->tszQueueCreateTime, pptszResult[10]);
530546
}
531547
DataBase_MySQL_FreeResult(xhDBSQL, xhTable);
532548
}
@@ -592,6 +608,7 @@ bool CDBModule_MQData::DBModule_MQData_CreateTable(LPCXSTR lpszQueueName)
592608
"`tszQueuePublishTime` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '发布时间',"
593609
"`tszQueueData` longblob NOT NULL COMMENT '保存数据',"
594610
"`nDataType` tinyint NOT NULL COMMENT '数据类型',"
611+
"`nDataAttr` tinyint NULL DEFAULT NULL COMMENT '消息属性',"
595612
"`tszQueueCreateTime` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '插入时间',"
596613
"PRIMARY KEY (`ID`) USING BTREE"
597614
") ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci ROW_FORMAT = Dynamic;"

XEngine_Source/XQueue_ProtocolHdr.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,26 @@
5454
///////////////////////////////////////////////////////////////////////////
5555
#pragma pack(push)
5656
#pragma pack(1)
57+
typedef struct
58+
{
59+
XBYTE byAttrAll : 1; //通知所有
60+
XBYTE byAttrDisk : 1; //消息存储标记
61+
XBYTE byAttrResver2 : 1;
62+
XBYTE byAttrResver3 : 1;
63+
XBYTE byAttrResver4 : 1;
64+
XBYTE byAttrResver5 : 1;
65+
XBYTE byAttrResver6 : 1;
66+
XBYTE byAttrResver7 : 1;
67+
}XENGINE_PROTOCOL_MSGATTR, * LPXENGINE_PROTOCOL_MSGATTR;
5768
//消息队列服务协议
5869
typedef struct
5970
{
60-
XCHAR tszMQKey[256]; //此消息的KEY,不能为空
71+
XCHAR tszMQKey[256]; //此消息的KEY,不能为空
6172
__int64x nSerial; //包序列号
6273
__int64x nPubTime; //发布时间,根据自己需求配置时区
6374
int nKeepTime; //保持时间,单位秒,-1 永久存在 0 一次就结束,>0 保存秒数
6475
int nGetTimer; //可以获取的次数
76+
XENGINE_PROTOCOL_MSGATTR st_MSGAttr; //消息属性
6577
}XENGINE_PROTOCOL_XMQ, * LPXENGINE_PROTOCOL_XMQ;
6678
typedef struct
6779
{

0 commit comments

Comments
 (0)