diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 947526ede39d..070e03821c11 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -29,7 +29,7 @@ #define tqInfoC(...) do { if (cDebugFlag & DEBUG_INFO || tqClientDebugFlag & DEBUG_INFO) { taosPrintLog("TQ INFO ", DEBUG_INFO, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0) #define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ DEBUG ", DEBUG_DEBUG, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0) -#define EMPTY_BLOCK_POLL_IDLE_DURATION 10 +#define EMPTY_BLOCK_POLL_IDLE_DURATION 100 #define DEFAULT_AUTO_COMMIT_INTERVAL 5000 #define DEFAULT_HEARTBEAT_INTERVAL 3000 #define DEFAULT_ASKEP_INTERVAL 1000 @@ -2394,7 +2394,7 @@ static int32_t tmqPollImpl(tmq_t* tmq) { } int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs; - if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed >= 0) { // less than 10ms + if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed >= 0) { // less than 100ms tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, tmq->epoch, pVg->vgId); continue; @@ -2410,7 +2410,7 @@ static int32_t tmqPollImpl(tmq_t* tmq) { int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); if (vgStatus == TMQ_VG_STATUS__WAIT) { int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1); - if (vgSkipCnt % 1000 == 0) { + if (vgSkipCnt % 10000 == 0) { tqInfoC("consumer:0x%" PRIx64 " epoch %d, vgId:%d has skipped poll %d times in a row", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt); } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 0e02718fa397..b37a753f4e98 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -1869,7 +1869,6 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index ed8116385492..36e5482fa79f 100755 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -73,7 +73,6 @@ set( "src/tq/tqMeta.c" "src/tq/tqRead.c" "src/tq/tqOffset.c" - "src/tq/tqPush.c" "src/tq/tqSink.c" "src/tq/tqSnapshot.c" diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index b99a79a66633..35c9ef5c1a1f 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -97,7 +97,6 @@ struct STQ { SVnode* pVnode; char* path; SRWLatch lock; - SHashObj* pPushMgr; // subKey -> STqHandle SHashObj* pHandle; // subKey -> STqHandle SHashObj* pCheckInfo; // topic -> SAlterCheckInfo SHashObj* pOffset; // subKey -> STqOffsetVal diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 08bdb4bf69d8..2b239267e706 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -257,7 +257,6 @@ int64_t tsdbGetEarliestTs(STsdb* pTsdb); // tq int32_t tqOpen(const char* path, SVnode* pVnode); void tqClose(STQ*); -int tqPushMsg(STQ*, tmsg_t msgType); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); void tqUnregisterPushHandle(STQ* pTq, void* pHandle); void tqScanWalAsync(STQ* pTq); @@ -282,7 +281,6 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessPollPush(STQ* pTq); int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ea9a6aa795c7..8bdcaf217f77 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -99,11 +99,6 @@ int32_t tqOpen(const char* path, SVnode* pVnode) { taosInitRWLatch(&pTq->lock); - pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); - if (pTq->pPushMgr == NULL) { - return terrno; - } - pTq->pCheckInfo = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (pTq->pCheckInfo == NULL) { return terrno; @@ -138,20 +133,7 @@ void tqClose(STQ* pTq) { vgId = TD_VID(pTq->pVnode); } - void* pIter = taosHashIterate(pTq->pPushMgr, NULL); - while (pIter) { - STqHandle* pHandle = *(STqHandle**)pIter; - if (pHandle->msg != NULL) { - tqPushEmptyDataRsp(pHandle, vgId); - rpcFreeCont(pHandle->msg->pCont); - taosMemoryFree(pHandle->msg); - pHandle->msg = NULL; - } - pIter = taosHashIterate(pTq->pPushMgr, pIter); - } - taosHashCleanup(pTq->pHandle); - taosHashCleanup(pTq->pPushMgr); taosHashCleanup(pTq->pCheckInfo); taosHashCleanup(pTq->pOffset); taosMemoryFree(pTq->path); @@ -303,7 +285,7 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) { // if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to // TMQ_VG_STATUS__IDLE, otherwise poll data failed after seek. - tqUnregisterPushHandle(pTq, pHandle); + // tqUnregisterPushHandle(pTq, pHandle); taosWUnLockLatch(&pTq->lock); end: @@ -345,44 +327,6 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) { return 0; } -int32_t tqProcessPollPush(STQ* pTq) { - if (pTq == NULL) { - return TSDB_CODE_INVALID_PARA; - } - int32_t vgId = TD_VID(pTq->pVnode); - taosWLockLatch(&pTq->lock); - if (taosHashGetSize(pTq->pPushMgr) > 0) { - void* pIter = taosHashIterate(pTq->pPushMgr, NULL); - - while (pIter) { - STqHandle* pHandle = *(STqHandle**)pIter; - tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId); - - if (pHandle->msg == NULL) { - tqError("pHandle->msg should not be null"); - taosHashCancelIterate(pTq->pPushMgr, pIter); - break; - } else { - SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, - .pCont = pHandle->msg->pCont, - .contLen = pHandle->msg->contLen, - .info = pHandle->msg->info}; - if (tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg) != 0){ - tqError("vgId:%d tmsgPutToQueue failed, consumer:0x%" PRIx64, vgId, pHandle->consumerId); - } - taosMemoryFree(pHandle->msg); - pHandle->msg = NULL; - } - - pIter = taosHashIterate(pTq->pPushMgr, pIter); - } - - taosHashClear(pTq->pPushMgr); - } - taosWUnLockLatch(&pTq->lock); - return 0; -} - int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (pTq == NULL || pMsg == NULL) { return TSDB_CODE_INVALID_PARA; @@ -628,7 +572,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg taosMsleep(10); continue; } - tqUnregisterPushHandle(pTq, pHandle); + // tqUnregisterPushHandle(pTq, pHandle); code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (code != 0) { tqError("cannot process tq delete req %s, since no such handle", pReq->subKey); @@ -751,7 +695,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_store_32(&pHandle->epoch, 0); - tqUnregisterPushHandle(pTq, pHandle); + // tqUnregisterPushHandle(pTq, pHandle); ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); } taosWUnLockLatch(&pTq->lock); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c deleted file mode 100644 index 0434b5138359..000000000000 --- a/source/dnode/vnode/src/tq/tqPush.c +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "tq.h" -#include "vnd.h" - -int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { - if (pTq == NULL) { - return TSDB_CODE_INVALID_MSG; - } - if (taosHashGetSize(pTq->pPushMgr) <= 0) { - return 0; - } - SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME_PUSH}; - msg.pCont = rpcMallocCont(sizeof(SMsgHead)); - if (msg.pCont == NULL) { - return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY); - } - msg.contLen = sizeof(SMsgHead); - SMsgHead *pHead = msg.pCont; - pHead->vgId = TD_VID(pTq->pVnode); - pHead->contLen = msg.contLen; - int32_t code = tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg); - if (code != 0){ - tqError("vgId:%d failed to push msg to queue, code:%d", TD_VID(pTq->pVnode), code); - rpcFreeCont(msg.pCont); - } - return code; -} - -int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) { - int32_t code = 0; - if (msgType == TDMT_VND_SUBMIT) { - code = tqProcessSubmitReqForSubscribe(pTq); - if (code != 0){ - tqError("vgId:%d failed to process submit request for subscribe, code:%d", TD_VID(pTq->pVnode), code); - } - } - - return code; -} - -int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) { - if (pTq == NULL || handle == NULL || pMsg == NULL) { - return TSDB_CODE_INVALID_MSG; - } - int32_t vgId = TD_VID(pTq->pVnode); - STqHandle* pHandle = (STqHandle*)handle; - - if (pHandle->msg == NULL) { - pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg)); - if (pHandle->msg == NULL) { - return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY); - } - (void)memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); - pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); - if (pHandle->msg->pCont == NULL) { - taosMemoryFree(pHandle->msg); - pHandle->msg = NULL; - return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY); - } - } else { - tqPushEmptyDataRsp(pHandle, vgId); - - int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey)); - tqInfo("vgId:%d register handle, remove last handle from mgr,ret:%s, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, tstrerror(ret), - pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen); - void* tmp = pHandle->msg->pCont; - (void)memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); - pHandle->msg->pCont = tmp; - } - - (void)memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); - pHandle->msg->contLen = pMsg->contLen; - int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES); - tqInfo("vgId:%d data is over, put handle to mgr,ret:%s, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, tstrerror(ret), - pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen); - if (ret != 0) { - rpcFreeCont(pHandle->msg->pCont); - taosMemoryFree(pHandle->msg); - pHandle->msg = NULL; - } - return ret; -} - -void tqUnregisterPushHandle(STQ* pTq, void *handle) { - if (pTq == NULL || handle == NULL) { - return; - } - STqHandle *pHandle = (STqHandle*)handle; - int32_t vgId = TD_VID(pTq->pVnode); - - if(taosHashGetSize(pTq->pPushMgr) <= 0) { - return; - } - int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey)); - tqInfo("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); - - if(ret == 0 && pHandle->msg != NULL) { - tqPushEmptyDataRsp(pHandle, vgId); - - rpcFreeCont(pHandle->msg->pCont); - taosMemoryFree(pHandle->msg); - pHandle->msg = NULL; - } -} diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index ed7efcf964db..41a2ef630d88 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -162,19 +162,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, goto end; } - // till now, all data has been transferred to consumer, new data needs to push client once arrived. - if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) { - // lock - taosWLockLatch(&pTq->lock); - int64_t ver = walGetCommittedVer(pTq->pVnode->pWal); - if (dataRsp.rspOffset.version > ver) { // check if there are data again to avoid lost data - code = tqRegisterPushHandle(pTq, pHandle, pMsg); - taosWUnLockLatch(&pTq->lock); - goto end; - } - taosWUnLockLatch(&pTq->lock); - } - // reqOffset represents the current date offset, may be changed if wal not exists tOffsetCopy(&dataRsp.reqOffset, pOffset); code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 24e1929c5df8..b0c62652aa2b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -991,14 +991,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg walApplyVer(pVnode->pWal, ver); - if (pVnode->pTq) { - code = tqPushMsg(pVnode->pTq, pMsg->msgType); - if (code) { - vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); - return code; - } - } - // commit if need if (needCommit) { vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), ver); @@ -1081,8 +1073,6 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0); case TDMT_VND_TMQ_CONSUME: return tqProcessPollReq(pVnode->pTq, pMsg); - case TDMT_VND_TMQ_CONSUME_PUSH: - return tqProcessPollPush(pVnode->pTq); default: vError("unknown msg type:%d in query queue", pMsg->msgType); return TSDB_CODE_APP_ERROR;