Skip to content

Commit 3d37a1d

Browse files
Merge branch 'feat/TS-6100-3.0' of https://github.com/taosdata/TDengine into feat/TS-6100-3.0
2 parents c71aa8b + 08314f3 commit 3d37a1d

File tree

7 files changed

+42
-51
lines changed

7 files changed

+42
-51
lines changed

include/libs/wal/wal.h

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -134,15 +134,6 @@ typedef struct {
134134
SWal *pWal;
135135
} SWalRef;
136136

137-
typedef struct {
138-
int8_t scanUncommited;
139-
int8_t scanNotApplied;
140-
int8_t scanMeta;
141-
int8_t deleteMsg;
142-
int8_t enableRef;
143-
int8_t scanDropCtb;
144-
} SWalFilterCond;
145-
146137
// todo hide this struct
147138
typedef struct SWalReader {
148139
SWal *pWal;
@@ -155,7 +146,6 @@ typedef struct SWalReader {
155146
// data
156147
int64_t capacity;
157148
TdThreadMutex mutex;
158-
SWalFilterCond cond;
159149
SWalCkHead *pHead;
160150
} SWalReader;
161151

@@ -185,12 +175,12 @@ int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
185175
void walApplyVer(SWal *, int64_t ver);
186176

187177
// wal reader
188-
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond, int64_t id);
178+
SWalReader *walOpenReader(SWal *, int64_t id);
189179
void walCloseReader(SWalReader *pRead);
190180
void walReadReset(SWalReader *pReader);
191181
int32_t walReadVer(SWalReader *pRead, int64_t ver);
192182
int32_t walReaderSeekVer(SWalReader *pRead, int64_t ver);
193-
int32_t walNextValidMsg(SWalReader *pRead);
183+
int32_t walNextValidMsg(SWalReader *pRead, bool scanMeta);
194184
int64_t walReaderGetCurrentVer(const SWalReader *pReader);
195185
int64_t walReaderGetValidFirstVer(const SWalReader *pReader);
196186
int64_t walReaderGetSkipToVersion(SWalReader *pReader);

source/dnode/vnode/src/tq/tqMeta.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ static int tqMetaInitHandle(STQ* pTq, STqHandle* handle) {
304304
handle->execHandle.pTqReader = qExtractReaderFromTmqScanner(scanner);
305305
TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
306306
} else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
307-
handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
307+
handle->pWalReader = walOpenReader(pVnode->pWal, 0);
308308
TQ_NULL_GO_TO_END(handle->pWalReader);
309309
handle->execHandle.pTqReader = tqReaderOpen(pVnode);
310310
TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
@@ -313,7 +313,7 @@ static int tqMetaInitHandle(STQ* pTq, STqHandle* handle) {
313313
handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
314314
TQ_NULL_GO_TO_END(handle->execHandle.task);
315315
} else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
316-
handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
316+
handle->pWalReader = walOpenReader(pVnode->pWal, 0);
317317
TQ_NULL_GO_TO_END(handle->pWalReader);
318318
if (handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) {
319319
if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) {

source/dnode/vnode/src/tq/tqRead.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ STqReader* tqReaderOpen(SVnode* pVnode) {
304304
return NULL;
305305
}
306306

307-
pReader->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
307+
pReader->pWalReader = walOpenReader(pVnode->pWal, 0);
308308
if (pReader->pWalReader == NULL) {
309309
taosMemoryFree(pReader);
310310
return NULL;
@@ -411,7 +411,7 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
411411
}
412412

413413
// try next message in wal file
414-
if (walNextValidMsg(pWalReader) < 0) {
414+
if (walNextValidMsg(pWalReader, false) < 0) {
415415
return false;
416416
}
417417

source/dnode/vnode/src/vnd/vnodeStream.c

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,24 @@ int32_t retrieveWalData(SSubmitTbData* pSubmitTbData, void* pTableList, SSDataBl
365365
return code;
366366
}
367367

368+
static int32_t buildDeleteData(void* pTableList, bool isVTable, SSDataBlock* pBlock, SDeleteRes* req, int64_t uid, int64_t ver){
369+
int32_t code = 0;
370+
int32_t lino = 0;
371+
uint64_t gid = 0;
372+
if (isVTable) {
373+
STREAM_CHECK_CONDITION_GOTO(taosHashGet(pTableList, &uid, sizeof(uid)) == NULL, TDB_CODE_SUCCESS)
374+
} else {
375+
gid = qStreamGetGroupId(pTableList, uid);
376+
STREAM_CHECK_CONDITION_GOTO(gid == -1, TDB_CODE_SUCCESS);
377+
}
378+
STREAM_CHECK_RET_GOTO(
379+
buildWalMetaBlock(pBlock, WAL_DELETE_DATA, gid, isVTable, uid, req->skey, req->ekey, ver, 1));
380+
pBlock->info.rows++;
381+
382+
end:
383+
return code;
384+
}
385+
368386
static int32_t scanDeleteData(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
369387
int64_t ver) {
370388
int32_t code = 0;
@@ -373,18 +391,14 @@ static int32_t scanDeleteData(void* pTableList, bool isVTable, SSDataBlock* pBlo
373391
SDeleteRes req = {0};
374392
tDecoderInit(&decoder, data, len);
375393
STREAM_CHECK_RET_GOTO(tDecodeDeleteRes(&decoder, &req));
376-
uint64_t gid = 0;
377-
if (isVTable) {
378-
STREAM_CHECK_CONDITION_GOTO(taosHashGet(pTableList, &req.suid, sizeof(req.suid)) == NULL, TDB_CODE_SUCCESS)
379-
} else {
380-
gid = qStreamGetGroupId(pTableList, req.suid);
381-
STREAM_CHECK_CONDITION_GOTO(gid == -1, TDB_CODE_SUCCESS);
394+
395+
for (int32_t i = 0; i < taosArrayGetSize(req.uidList); i++) {
396+
uint64_t* uid = taosArrayGet(req.uidList, i);
397+
STREAM_CHECK_NULL_GOTO(uid, terrno);
398+
STREAM_CHECK_RET_GOTO(buildDeleteData(pTableList, isVTable, pBlock, &req, *uid, ver));
382399
}
383-
STREAM_CHECK_RET_GOTO(
384-
buildWalMetaBlock(pBlock, WAL_DELETE_DATA, gid, isVTable, req.suid, req.skey, req.ekey, ver, 1));
385-
pBlock->info.rows++;
386-
stDebug("stream reader scan delete data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64 ", gid %" PRIu64, req.suid,
387-
req.skey, req.ekey, gid);
400+
stDebug("stream reader scan delete data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, req.suid,
401+
req.skey, req.ekey);
388402

389403
end:
390404
tDecoderClear(&decoder);
@@ -455,14 +469,14 @@ static int32_t scanWal(SVnode* pVnode, void* pTableList, bool isVTable, SSDataBl
455469
int32_t code = 0;
456470
int32_t lino = 0;
457471

458-
SWalReader* pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
472+
SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
459473
STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
460474
*retVer = walGetLastVer(pWalReader->pWal);
461475
STREAM_CHECK_CONDITION_GOTO(walReaderSeekVer(pWalReader, lastVer + 1) != 0, TSDB_CODE_SUCCESS);
462476

463477
while (1) {
464478
*retVer = walGetLastVer(pWalReader->pWal);
465-
STREAM_CHECK_CONDITION_GOTO(walNextValidMsg(pWalReader) < 0, TSDB_CODE_SUCCESS);
479+
STREAM_CHECK_CONDITION_GOTO(walNextValidMsg(pWalReader, true) < 0, TSDB_CODE_SUCCESS);
466480

467481
SWalCont* wCont = &pWalReader->pHead->head;
468482
if (wCont->ingestTs / 1000 > ctime) break;
@@ -498,7 +512,7 @@ int32_t scanWalOneVer(SVnode* pVnode, void* pTableList, SSDataBlock* pBlock, SSD
498512
SSubmitReq2 submit = {0};
499513
SDecoder decoder = {0};
500514

501-
SWalReader* pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
515+
SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
502516
STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
503517

504518
STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, ver));

source/libs/sync/src/syncRaftLog.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
6767
}
6868

6969
(void)taosThreadMutexInit(&(pData->mutex), NULL);
70-
pData->pWalHandle = walOpenReader(pData->pWal, NULL, 0);
70+
pData->pWalHandle = walOpenReader(pData->pWal, 0);
7171
if (!pData->pWalHandle) {
7272
taosMemoryFree(pLogStore);
7373
taosLRUCacheCleanup(pLogStore->pCache);

source/libs/wal/src/walRead.c

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
#include "wal.h"
1919
#include "walInt.h"
2020

21-
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond, int64_t id) {
21+
SWalReader *walOpenReader(SWal *pWal, int64_t id) {
2222
SWalReader *pReader = taosMemoryCalloc(1, sizeof(SWalReader));
2323
if (pReader == NULL) {
2424
terrno = TSDB_CODE_OUT_OF_MEMORY;
@@ -32,14 +32,6 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond, int64_t id) {
3232
pReader->curVersion = -1;
3333
pReader->curFileFirstVer = -1;
3434
pReader->capacity = 0;
35-
if (cond) {
36-
pReader->cond = *cond;
37-
} else {
38-
// pReader->cond.scanUncommited = 0;
39-
pReader->cond.scanNotApplied = 0;
40-
pReader->cond.scanMeta = 0;
41-
pReader->cond.enableRef = 0;
42-
}
4335

4436
terrno = taosThreadMutexInit(&pReader->mutex, NULL);
4537
if (terrno) {
@@ -70,7 +62,7 @@ void walCloseReader(SWalReader *pReader) {
7062
taosMemoryFree(pReader);
7163
}
7264

73-
int32_t walNextValidMsg(SWalReader *pReader) {
65+
int32_t walNextValidMsg(SWalReader *pReader, bool scanMeta) {
7466
int64_t fetchVer = pReader->curVersion;
7567
int64_t lastVer = walGetLastVer(pReader->pWal);
7668
int64_t committedVer = walGetCommittedVer(pReader->pWal);
@@ -87,10 +79,7 @@ int32_t walNextValidMsg(SWalReader *pReader) {
8779
TAOS_CHECK_RETURN(walFetchHead(pReader, fetchVer));
8880

8981
int32_t type = pReader->pHead->head.msgType;
90-
if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) ||
91-
(IS_META_MSG(type) && pReader->cond.scanMeta)) {
92-
TAOS_RETURN(walFetchBody(pReader));
93-
} else if (type == TDMT_VND_DROP_TABLE && pReader->cond.scanDropCtb) {
82+
if (type == TDMT_VND_SUBMIT || scanMeta) {
9483
TAOS_RETURN(walFetchBody(pReader));
9584
} else {
9685
TAOS_CHECK_RETURN(walSkipFetchBody(pReader));
@@ -115,9 +104,7 @@ int64_t walReaderGetSkipToVersion(SWalReader *pReader) {
115104

116105
void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever) {
117106
*sver = walGetFirstVer(pReader->pWal);
118-
int64_t lastVer = walGetLastVer(pReader->pWal);
119-
int64_t committedVer = walGetCommittedVer(pReader->pWal);
120-
*ever = pReader->cond.scanUncommited ? lastVer : committedVer;
107+
*ever = walGetCommittedVer(pReader->pWal);
121108
}
122109

123110
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal *pOffset) {

source/libs/wal/test/walMetaTest.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ TEST_F(WalCleanDeleteEnv, roll) {
415415
TEST_F(WalKeepEnv, readHandleRead) {
416416
walResetEnv();
417417
int code;
418-
SWalReader* pRead = walOpenReader(pWal, NULL, 0);
418+
SWalReader* pRead = walOpenReader(pWal, 0);
419419
ASSERT(pRead != NULL);
420420

421421
int i;
@@ -453,7 +453,7 @@ TEST_F(WalKeepEnv, readHandleRead) {
453453
TEST_F(WalKeepEnv, walLogExist) {
454454
walResetEnv();
455455
int code;
456-
SWalReader* pRead = walOpenReader(pWal, NULL, 0);
456+
SWalReader* pRead = walOpenReader(pWal, 0);
457457
ASSERT(pRead != NULL);
458458

459459
int i;
@@ -715,7 +715,7 @@ TEST_F(WalRetentionEnv, repairMeta1) {
715715

716716
ASSERT_EQ(pWal->vers.lastVer, 99);
717717

718-
SWalReader* pRead = walOpenReader(pWal, NULL, 0);
718+
SWalReader* pRead = walOpenReader(pWal, 0);
719719
ASSERT(pRead != NULL);
720720

721721
for (int i = 0; i < 1000; i++) {

0 commit comments

Comments
 (0)