Skip to content

Commit 25c6c14

Browse files
authored
fix(stream): add resetting tagScan logic & fix parse value error if value is integer & fix tmqError.py (#34124)
1 parent 684f9d2 commit 25c6c14

File tree

4 files changed

+25
-10
lines changed

4 files changed

+25
-10
lines changed

source/libs/executor/src/executor.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,8 @@ bool qNeedReset(qTaskInfo_t pInfo) {
350350
int32_t node = nodeType(pOperator->pPhyNode);
351351
return (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == node ||
352352
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == node ||
353-
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == node);
353+
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == node ||
354+
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == node);
354355
}
355356

356357
static void setReadHandle(SReadHandle* pHandle, STableScanBase* pScanBaseInfo) {

source/libs/executor/src/scanoperator.c

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3845,12 +3845,6 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock*
38453845
return code;
38463846
}
38473847

3848-
static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) {
3849-
SSDataBlock* pRes = NULL;
3850-
int32_t code = doTagScanFromMetaEntryNext(pOperator, &pRes);
3851-
return pRes;
3852-
}
3853-
38543848
static void destroyTagScanOperatorInfo(void* param) {
38553849
STagScanInfo* pInfo = (STagScanInfo*)param;
38563850
if (pInfo->pCtbCursor != NULL && pInfo->pStorageAPI != NULL) {
@@ -3870,6 +3864,24 @@ static void destroyTagScanOperatorInfo(void* param) {
38703864
taosMemoryFreeClear(param);
38713865
}
38723866

3867+
static int32_t resetTagScanOperatorState(SOperatorInfo* pOper) {
3868+
int32_t code = TSDB_CODE_SUCCESS;
3869+
STagScanInfo* pInfo = pOper->info;
3870+
3871+
pOper->status = OP_NOT_OPENED;
3872+
STagScanPhysiNode* pTagScanNode = (STagScanPhysiNode*)pOper->pPhyNode;
3873+
if (pTagScanNode->onlyMetaCtbIdx) {
3874+
SExecTaskInfo* pTaskInfo = pOper->pTaskInfo;
3875+
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
3876+
pAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor);
3877+
pInfo->pCtbCursor = NULL;
3878+
} else {
3879+
pInfo->curPos = 0;
3880+
}
3881+
3882+
return code;
3883+
}
3884+
38733885
int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pTagScanNode,
38743886
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
38753887
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
@@ -3884,7 +3896,7 @@ int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* p
38843896
code = terrno;
38853897
goto _error;
38863898
}
3887-
3899+
pOperator->pPhyNode = pTagScanNode;
38883900
SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;
38893901

38903902
int32_t numOfExprs = 0;
@@ -3940,6 +3952,8 @@ int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* p
39403952
__optr_fn_t tagScanNextFn = (pTagScanNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdxNext : doTagScanFromMetaEntryNext;
39413953
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, tagScanNextFn, NULL, destroyTagScanOperatorInfo,
39423954
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
3955+
setOperatorResetStateFn(pOperator, resetTagScanOperatorState);
3956+
39433957
*pOptrInfo = pOperator;
39443958
return code;
39453959

source/libs/parser/src/parAstCreater.c

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -659,8 +659,7 @@ SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken*
659659
}
660660
copyValueTrimEscape(val->literal, pLiteral->n + 1, pLiteral,
661661
pCxt->pQueryCxt->hasDupQuoteChar && (TK_NK_ID == pLiteral->type));
662-
if (TK_NK_ID != pLiteral->type && TK_TIMEZONE != pLiteral->type &&
663-
(IS_VAR_DATA_TYPE(dataType) || TSDB_DATA_TYPE_TIMESTAMP == dataType)) {
662+
if (TK_NK_STRING == pLiteral->type) {
664663
(void)trimString(pLiteral->z, pLiteral->n, val->literal, pLiteral->n);
665664
}
666665
val->node.resType.type = dataType;

test/cases/17-DataSubscription/02-Consume/test_tmq_error.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ def tmqCase1(self, cfgPath, buildPath):
235235
tmqCom.getStartConsumeNotifyFromTmqsim()
236236
tdLog.info("================= stop dnode, and remove data file, then start dnode ===========================")
237237
tdDnodes.stop(1)
238+
tmqCom.stopTmqSimProcess("tmq_sim")
238239

239240
time.sleep(5)
240241
dataPath = buildPath + "/../sim/dnode1/data/*"

0 commit comments

Comments
 (0)