Skip to content

Commit 31d440c

Browse files
committed
Merge branch 'feat/TS-6100-3.0' into test/TD-36363-3.0
2 parents ddf37a4 + c43d0a5 commit 31d440c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+2036
-809
lines changed

include/libs/executor/executor.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,8 +243,8 @@ const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo);
243243

244244
const char* qExtractTbnameFromTask(qTaskInfo_t tinfo);
245245

246-
void* qExtractReaderFromStreamScanner(void* scanner);
247-
void qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);
246+
void* qExtractReaderFromTmqScanner(void* scanner);
247+
void qExtractTmqScanner(qTaskInfo_t tinfo, void** scanner);
248248

249249
int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo);
250250
int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange* pVerRange, STimeWindow* pWindow);

include/libs/nodes/plannodes.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,7 @@ typedef struct SWindowLogicNode {
352352
int64_t deleteMark;
353353
int8_t igExpired;
354354
int8_t igCheckUpdate;
355+
int8_t indefRowsFunc;
355356
EWindowAlgorithm windowAlgo;
356357
bool isPartTb;
357358
int64_t windowCount;
@@ -724,6 +725,7 @@ typedef struct SWindowPhysiNode {
724725
int64_t watermark;
725726
int64_t deleteMark;
726727
int8_t igExpired;
728+
int8_t indefRowsFunc;
727729
bool mergeDataBlock;
728730
int64_t recalculateInterval;
729731
} SWindowPhysiNode;

include/util/taoserror.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -953,7 +953,7 @@ int32_t taosGetErrSize();
953953
#define TSDB_CODE_PAR_MISMATCH_STABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x2690)
954954
#define TSDB_CODE_PAR_COL_TAG_REF_BY_STM TAOS_DEF_ERROR_CODE(0, 0x2691)
955955
#define TSDB_CODE_PAR_INVALID_PERIOD_UNIT TAOS_DEF_ERROR_CODE(0, 0x2692)
956-
#define TSDB_CODE_PAR_INVALID_PERIOD_RANGE TAOS_DEF_ERROR_CODE(0, 0x2692)
956+
#define TSDB_CODE_PAR_INVALID_PERIOD_RANGE TAOS_DEF_ERROR_CODE(0, 0x2693)
957957
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
958958

959959
//planner

source/dnode/snode/src/snode.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,14 @@ static int32_t handleSyncWriteCheckPointReq(SSnode* pSnode, SRpcMsg* pRpcMsg) {
9292
void* data = NULL;
9393
int64_t dataLen = 0;
9494
int32_t code = streamReadCheckPoint(streamId, &data, &dataLen);
95-
if ((errno == ENOENT && ver == -1) || code != 0){
95+
if (code != 0 || (terrno == TAOS_SYSTEM_ERROR(ENOENT) && ver == -1)){
9696
goto end;
9797
}
98-
if (errno == ENOENT || ver > *(int32_t*)data) {
98+
if (terrno == TAOS_SYSTEM_ERROR(ENOENT) || ver > *(int32_t*)data) {
9999
int32_t ret = streamWriteCheckPoint(streamId, POINTER_SHIFT(pRpcMsg->pCont, sizeof(SMsgHead)), pRpcMsg->contLen - sizeof(SMsgHead));
100100
stDebug("[checkpoint] streamId:%" PRIx64 ", checkpoint local updated, ver:%d, dataLen:%" PRId64 ", ret:%d", streamId, ver, dataLen, ret);
101101
}
102-
if (errno == ENOENT || ver >= *(int32_t*)data) {
102+
if (terrno == TAOS_SYSTEM_ERROR(ENOENT) || ver >= *(int32_t*)data) {
103103
stDebug("[checkpoint] streamId:%" PRIx64 ", checkpoint no need send back, ver:%d, dataLen:%" PRId64, streamId, ver, dataLen);
104104
dataLen = 0;
105105
taosMemoryFreeClear(data);

source/dnode/vnode/src/tq/tqMeta.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,9 +299,9 @@ static int tqMetaInitHandle(STQ* pTq, STqHandle* handle) {
299299
&handle->execHandle.numOfCols, handle->consumerId);
300300
TQ_NULL_GO_TO_END(handle->execHandle.task);
301301
void* scanner = NULL;
302-
qExtractStreamScanner(handle->execHandle.task, &scanner);
302+
qExtractTmqScanner(handle->execHandle.task, &scanner);
303303
TQ_NULL_GO_TO_END(scanner);
304-
handle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
304+
handle->execHandle.pTqReader = qExtractReaderFromTmqScanner(scanner);
305305
TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
306306
} else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
307307
handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);

source/libs/executor/inc/executorInt.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -923,7 +923,7 @@ bool getIgoreNullRes(SExprSupp* pExprSup);
923923
bool checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t index, bool ignoreNull);
924924
int64_t getMinWindowSize(struct SOperatorInfo* pOperator);
925925

926-
void destroyStreamScanOperatorInfo(void* param);
926+
void destroyTmqScanOperatorInfo(void* param);
927927
int32_t checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out);
928928
void resetBasicOperatorState(SOptrBasicInfo* pBasicInfo);
929929

source/libs/executor/inc/operator.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,9 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo
123123

124124
int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
125125

126-
int32_t createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
126+
int32_t createTmqScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
127+
128+
int32_t createTmqRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
127129

128130
int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
129131

source/libs/executor/src/executor.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
210210
return NULL;
211211
}
212212

213-
code = createRawScanOperatorInfo(pReaderHandle, pTaskInfo, &pTaskInfo->pRoot);
213+
code = createTmqRawScanOperatorInfo(pReaderHandle, pTaskInfo, &pTaskInfo->pRoot);
214214
if (NULL == pTaskInfo->pRoot || code != 0) {
215215
taosMemoryFree(pTaskInfo);
216216
return NULL;
@@ -1088,7 +1088,7 @@ int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) {
10881088
return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList);
10891089
}
10901090

1091-
void qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
1091+
void qExtractTmqScanner(qTaskInfo_t tinfo, void** scanner) {
10921092
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
10931093
SOperatorInfo* pOperator = pTaskInfo->pRoot;
10941094

@@ -1227,7 +1227,7 @@ int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) {
12271227
return 0;
12281228
}
12291229

1230-
void* qExtractReaderFromStreamScanner(void* scanner) {
1230+
void* qExtractReaderFromTmqScanner(void* scanner) {
12311231
SStreamScanInfo* pInfo = scanner;
12321232
return (void*)pInfo->tqReader;
12331233
}

source/libs/executor/src/operator.c

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,32 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
382382
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
383383
code = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode,
384384
pTaskInfo, &pOperator);
385+
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
386+
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
387+
STableListInfo* pTableListInfo = tableListCreate();
388+
if (!pTableListInfo) {
389+
pTaskInfo->code = terrno;
390+
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
391+
return terrno;
392+
}
393+
394+
if (pHandle->vnode && (pTaskInfo->pSubplan->pVTables == NULL)) {
395+
code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
396+
pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo, NULL);
397+
if (code) {
398+
pTaskInfo->code = code;
399+
tableListDestroy(pTableListInfo);
400+
qError("failed to createScanTableListInfo, code:%s", tstrerror(code));
401+
return code;
402+
}
403+
}
404+
405+
code = createTmqScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTableListInfo, pTaskInfo, &pOperator);
406+
if (code) {
407+
pTaskInfo->code = code;
408+
tableListDestroy(pTableListInfo);
409+
return code;
410+
}
385411
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
386412
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
387413
code = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo, &pOperator);

source/libs/executor/src/projectoperator.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode*
146146
pInfo->inputIgnoreGroup = pProjPhyNode->inputIgnoreGroup;
147147
pInfo->outputIgnoreGroup = pProjPhyNode->ignoreGroupId;
148148

149-
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM || pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
149+
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
150150
pInfo->mergeDataBlocks = false;
151151
} else {
152152
if (!pProjPhyNode->ignoreGroupId) {

0 commit comments

Comments
 (0)