Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions source/client/src/clientTmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#define tqInfoC(...) do { if (cDebugFlag & DEBUG_INFO || tqClientDebugFlag & DEBUG_INFO) { taosPrintLog("TQ INFO ", DEBUG_INFO, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
#define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ DEBUG ", DEBUG_DEBUG, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)

#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
#define EMPTY_BLOCK_POLL_IDLE_DURATION 100
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000
#define DEFAULT_HEARTBEAT_INTERVAL 3000
#define DEFAULT_ASKEP_INTERVAL 1000
Expand Down Expand Up @@ -2394,7 +2394,7 @@ static int32_t tmqPollImpl(tmq_t* tmq) {
}

int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs;
if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed >= 0) { // less than 10ms
if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed >= 0) { // less than 100ms
tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
tmq->epoch, pVg->vgId);
Comment on lines 2398 to 2399
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The debug message here is misleading. The idle duration was changed to 100ms (EMPTY_BLOCK_POLL_IDLE_DURATION), but the log message still refers to '10ms'. It would be better to make this message reflect the actual value to avoid confusion during debugging.

        tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %dms before start next poll", tmq->consumerId,
                 tmq->epoch, pVg->vgId, EMPTY_BLOCK_POLL_IDLE_DURATION);

Comment on lines 2398 to 2399
continue;
Expand All @@ -2407,6 +2407,10 @@ static int32_t tmqPollImpl(tmq_t* tmq) {
continue;
}

// set status = idle if no response from vnode in a long time to avoid not polling data from vnode
if (atomic_load_32(&pVg->vgSkipCnt) == 100000) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The value 100000 is a magic number, making the code harder to understand and maintain. It's recommended to define it as a named constant, for example TMQ_MAX_VG_SKIP_COUNT, near other constants at the top of the file.

      if (atomic_load_32(&pVg->vgSkipCnt) == TMQ_MAX_VG_SKIP_COUNT) {

atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
}
Comment on lines +2410 to +2413
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus == TMQ_VG_STATUS__WAIT) {
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
Expand Down
1 change: 0 additions & 1 deletion source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
Original file line number Diff line number Diff line change
Expand Up @@ -1869,7 +1869,6 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
Expand Down
1 change: 0 additions & 1 deletion source/dnode/vnode/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ set(
"src/tq/tqMeta.c"
"src/tq/tqRead.c"
"src/tq/tqOffset.c"
"src/tq/tqPush.c"
"src/tq/tqSink.c"
"src/tq/tqSnapshot.c"

Expand Down
1 change: 0 additions & 1 deletion source/dnode/vnode/src/inc/tq.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ struct STQ {
SVnode* pVnode;
char* path;
SRWLatch lock;
SHashObj* pPushMgr; // subKey -> STqHandle
SHashObj* pHandle; // subKey -> STqHandle
SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
SHashObj* pOffset; // subKey -> STqOffsetVal
Expand Down
2 changes: 0 additions & 2 deletions source/dnode/vnode/src/inc/vnodeInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ int64_t tsdbGetEarliestTs(STsdb* pTsdb);
// tq
int32_t tqOpen(const char* path, SVnode* pVnode);
void tqClose(STQ*);
int tqPushMsg(STQ*, tmsg_t msgType);
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
void tqUnregisterPushHandle(STQ* pTq, void* pHandle);
Comment on lines 260 to 261
void tqScanWalAsync(STQ* pTq);
Expand All @@ -282,7 +281,6 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessPollPush(STQ* pTq);
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg);

Expand Down
62 changes: 3 additions & 59 deletions source/dnode/vnode/src/tq/tq.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,6 @@ int32_t tqOpen(const char* path, SVnode* pVnode) {

taosInitRWLatch(&pTq->lock);

pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
if (pTq->pPushMgr == NULL) {
return terrno;
}

pTq->pCheckInfo = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (pTq->pCheckInfo == NULL) {
return terrno;
Expand Down Expand Up @@ -138,20 +133,7 @@ void tqClose(STQ* pTq) {
vgId = TD_VID(pTq->pVnode);
}

void* pIter = taosHashIterate(pTq->pPushMgr, NULL);
while (pIter) {
STqHandle* pHandle = *(STqHandle**)pIter;
if (pHandle->msg != NULL) {
tqPushEmptyDataRsp(pHandle, vgId);
rpcFreeCont(pHandle->msg->pCont);
taosMemoryFree(pHandle->msg);
pHandle->msg = NULL;
}
pIter = taosHashIterate(pTq->pPushMgr, pIter);
}

taosHashCleanup(pTq->pHandle);
taosHashCleanup(pTq->pPushMgr);
taosHashCleanup(pTq->pCheckInfo);
taosHashCleanup(pTq->pOffset);
taosMemoryFree(pTq->path);
Expand Down Expand Up @@ -303,7 +285,7 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {

// if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to
// TMQ_VG_STATUS__IDLE, otherwise poll data failed after seek.
tqUnregisterPushHandle(pTq, pHandle);
// tqUnregisterPushHandle(pTq, pHandle);
taosWUnLockLatch(&pTq->lock);
Comment on lines 286 to 289

end:
Expand Down Expand Up @@ -345,44 +327,6 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
return 0;
}

int32_t tqProcessPollPush(STQ* pTq) {
if (pTq == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t vgId = TD_VID(pTq->pVnode);
taosWLockLatch(&pTq->lock);
if (taosHashGetSize(pTq->pPushMgr) > 0) {
void* pIter = taosHashIterate(pTq->pPushMgr, NULL);

while (pIter) {
STqHandle* pHandle = *(STqHandle**)pIter;
tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);

if (pHandle->msg == NULL) {
tqError("pHandle->msg should not be null");
taosHashCancelIterate(pTq->pPushMgr, pIter);
break;
} else {
SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME,
.pCont = pHandle->msg->pCont,
.contLen = pHandle->msg->contLen,
.info = pHandle->msg->info};
if (tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg) != 0){
tqError("vgId:%d tmsgPutToQueue failed, consumer:0x%" PRIx64, vgId, pHandle->consumerId);
}
taosMemoryFree(pHandle->msg);
pHandle->msg = NULL;
}

pIter = taosHashIterate(pTq->pPushMgr, pIter);
}

taosHashClear(pTq->pPushMgr);
}
taosWUnLockLatch(&pTq->lock);
return 0;
}

int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (pTq == NULL || pMsg == NULL) {
return TSDB_CODE_INVALID_PARA;
Expand Down Expand Up @@ -628,7 +572,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
taosMsleep(10);
continue;
}
tqUnregisterPushHandle(pTq, pHandle);
// tqUnregisterPushHandle(pTq, pHandle);
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (code != 0) {
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
Expand Down Expand Up @@ -751,7 +695,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg

atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_store_32(&pHandle->epoch, 0);
tqUnregisterPushHandle(pTq, pHandle);
// tqUnregisterPushHandle(pTq, pHandle);
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
}
taosWUnLockLatch(&pTq->lock);
Expand Down
118 changes: 0 additions & 118 deletions source/dnode/vnode/src/tq/tqPush.c

This file was deleted.

13 changes: 0 additions & 13 deletions source/dnode/vnode/src/tq/tqUtil.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,19 +162,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
goto end;
}

// till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) {
// lock
taosWLockLatch(&pTq->lock);
int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
if (dataRsp.rspOffset.version > ver) { // check if there are data again to avoid lost data
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
taosWUnLockLatch(&pTq->lock);
goto end;
}
taosWUnLockLatch(&pTq->lock);
}

// reqOffset represents the current date offset, may be changed if wal not exists
tOffsetCopy(&dataRsp.reqOffset, pOffset);
code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
Expand Down
10 changes: 0 additions & 10 deletions source/dnode/vnode/src/vnd/vnodeSvr.c
Original file line number Diff line number Diff line change
Expand Up @@ -991,14 +991,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg

walApplyVer(pVnode->pWal, ver);

if (pVnode->pTq) {
code = tqPushMsg(pVnode->pTq, pMsg->msgType);
if (code) {
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return code;
}
}

// commit if need
if (needCommit) {
vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), ver);
Expand Down Expand Up @@ -1081,8 +1073,6 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
case TDMT_VND_TMQ_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_CONSUME_PUSH:
return tqProcessPollPush(pVnode->pTq);
default:
vError("unknown msg type:%d in query queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR;
Expand Down
Loading