Skip to content

Commit 2a6c802

Browse files
replay mo_iscp_log after clone (#23244)
replay mo_iscp_log after clone Approved by: @XuPeng-SH
1 parent 591c31b commit 2a6c802

File tree

3 files changed

+151
-2
lines changed

3 files changed

+151
-2
lines changed

pkg/iscp/executor.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,17 @@ func (exec *ISCPTaskExecutor) applyISCPLog(ctx context.Context, from, to types.T
567567
return
568568
}
569569
rel, err := db.Relation(ctx, MOISCPLogTableName, nil)
570+
571+
tid := rel.GetTableID(ctx)
572+
// injection is for ut - simulate table id change
573+
var injectChangeTableID bool
574+
if msg, injected := objectio.ISCPExecutorInjected(); injected && msg == "tableIDChange" {
575+
injectChangeTableID = true
576+
}
577+
if tid != exec.prevISCPTableID || injectChangeTableID {
578+
err = moerr.NewErrStaleReadNoCtx("0-0", "0-0")
579+
return
580+
}
570581
// injection is for ut
571582
if msg, injected := objectio.ISCPExecutorInjected(); injected && msg == "applyISCPLog" {
572583
err = moerr.NewInternalErrorNoCtx(msg)
@@ -693,7 +704,6 @@ func (exec *ISCPTaskExecutor) replay(ctx context.Context) (err error) {
693704
zap.Error(err),
694705
)
695706
}()
696-
sql := cdc.CDCSQLBuilder.ISCPLogSelectSQL()
697707
txn, err := getTxn(ctx, exec.txnEngine, exec.cnTxnClient, "iscp replay")
698708
if err != nil {
699709
return
@@ -702,6 +712,14 @@ func (exec *ISCPTaskExecutor) replay(ctx context.Context) (err error) {
702712
ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
703713
defer cancel()
704714
defer txn.Commit(ctx)
715+
716+
tid, _, err := getTableID(ctx, exec.cnUUID, txn, catalog.System_Account, catalog.MO_CATALOG, catalog.MO_ISCP_LOG)
717+
if err != nil {
718+
return
719+
}
720+
exec.prevISCPTableID = tid
721+
722+
sql := cdc.CDCSQLBuilder.ISCPLogSelectSQL()
705723
result, err := ExecWithResult(ctx, sql, exec.cnUUID, txn)
706724
if err != nil {
707725
return
@@ -747,6 +765,7 @@ func (exec *ISCPTaskExecutor) replay(ctx context.Context) (err error) {
747765
return true
748766
})
749767
exec.iscpLogWm = types.TimestampToTS(txn.SnapshotTS())
768+
750769
return
751770
}
752771

pkg/iscp/types.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,9 @@ type ISCPTaskExecutor struct {
137137
cnUUID string
138138
txnEngine engine.Engine
139139
cnTxnClient client.TxnClient
140-
iscpLogWm types.TS
140+
141+
iscpLogWm types.TS
142+
prevISCPTableID uint64
141143

142144
rpcHandleFn func(
143145
ctx context.Context,

pkg/vm/engine/test/change_handle_test.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5409,3 +5409,131 @@ func TestPartitionChangesHandleGCKPBoundaryStaleRead(t *testing.T) {
54095409
}
54105410
}
54115411
}
5412+
5413+
func TestISCPTableIDChange(t *testing.T) {
5414+
catalog.SetupDefines("")
5415+
5416+
var (
5417+
accountId = catalog.System_Account
5418+
)
5419+
5420+
ctx, cancel := context.WithCancel(context.Background())
5421+
defer cancel()
5422+
ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId)
5423+
ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Minute*5)
5424+
defer cancel()
5425+
5426+
disttaeEngine, taeHandler, rpcAgent, _ := testutil.CreateEngines(ctx, testutil.TestOptions{}, t)
5427+
defer func() {
5428+
disttaeEngine.Close(ctx)
5429+
taeHandler.Close(true)
5430+
rpcAgent.Close()
5431+
}()
5432+
5433+
err := mock_mo_indexes(disttaeEngine, ctxWithTimeout)
5434+
require.NoError(t, err)
5435+
err = mock_mo_foreign_keys(disttaeEngine, ctxWithTimeout)
5436+
require.NoError(t, err)
5437+
err = mock_mo_intra_system_change_propagation_log(disttaeEngine, ctxWithTimeout)
5438+
require.NoError(t, err)
5439+
5440+
bat := CreateDBAndTableForCNConsumerAndGetAppendData(t, disttaeEngine, ctxWithTimeout, "srcdb", "src_table", 10)
5441+
bats := bat.Split(10)
5442+
defer bat.Close()
5443+
5444+
// append 1 row
5445+
_, rel, txn, err := disttaeEngine.GetTable(ctxWithTimeout, "srcdb", "src_table")
5446+
require.Nil(t, err)
5447+
5448+
tableID := rel.GetTableID(ctxWithTimeout)
5449+
5450+
err = rel.Write(ctxWithTimeout, containers.ToCNBatch(bats[0]))
5451+
require.Nil(t, err)
5452+
5453+
txn.Commit(ctxWithTimeout)
5454+
5455+
// init cdc executor
5456+
checkLeaseStub := gostub.Stub(
5457+
&iscp.CheckLeaseWithRetry,
5458+
func(
5459+
context.Context,
5460+
string,
5461+
engine.Engine,
5462+
client.TxnClient,
5463+
) (bool, error) {
5464+
return true, nil
5465+
},
5466+
)
5467+
defer checkLeaseStub.Reset()
5468+
cdcExecutor, err := iscp.NewISCPTaskExecutor(
5469+
ctxWithTimeout,
5470+
disttaeEngine.Engine,
5471+
disttaeEngine.GetTxnClient(),
5472+
"",
5473+
&iscp.ISCPExecutorOption{
5474+
GCInterval: time.Hour,
5475+
GCTTL: time.Hour,
5476+
SyncTaskInterval: time.Millisecond * 100,
5477+
FlushWatermarkInterval: time.Millisecond * 100,
5478+
RetryTimes: 1,
5479+
},
5480+
common.DebugAllocator,
5481+
)
5482+
require.NoError(t, err)
5483+
cdcExecutor.SetRpcHandleFn(taeHandler.GetRPCHandle().HandleGetChangedTableList)
5484+
5485+
err = cdcExecutor.Start()
5486+
require.NoError(t, err)
5487+
defer cdcExecutor.Stop()
5488+
5489+
// register index job
5490+
txn, err = disttaeEngine.NewTxnOperator(ctx, disttaeEngine.Engine.LatestLogtailAppliedTime())
5491+
require.NoError(t, err)
5492+
ok, err := iscp.RegisterJob(
5493+
ctx, "", txn,
5494+
&iscp.JobSpec{
5495+
ConsumerInfo: iscp.ConsumerInfo{
5496+
ConsumerType: int8(iscp.ConsumerType_CNConsumer),
5497+
},
5498+
},
5499+
&iscp.JobID{
5500+
JobName: "test_idx",
5501+
DBName: "srcdb",
5502+
TableName: "src_table",
5503+
},
5504+
false,
5505+
)
5506+
assert.True(t, ok)
5507+
assert.NoError(t, err)
5508+
assert.NoError(t, txn.Commit(ctxWithTimeout))
5509+
5510+
// wait for synchronization to initialize prevISCPTableID
5511+
5512+
// enable injection to trigger table id change check
5513+
fault.Enable()
5514+
defer fault.Disable()
5515+
rmFn, err := objectio.InjectCDCExecutor("tableIDChange")
5516+
assert.NoError(t, err)
5517+
defer rmFn()
5518+
5519+
// append more data to trigger synchronization
5520+
_, rel, txn, err = disttaeEngine.GetTable(ctxWithTimeout, "srcdb", "src_table")
5521+
require.Nil(t, err)
5522+
5523+
err = rel.Write(ctxWithTimeout, containers.ToCNBatch(bats[1]))
5524+
require.Nil(t, err)
5525+
5526+
txn.Commit(ctxWithTimeout)
5527+
5528+
now := taeHandler.GetDB().TxnMgr.Now()
5529+
testutils.WaitExpect(
5530+
4000,
5531+
func() bool {
5532+
ts, ok := cdcExecutor.GetWatermark(accountId, tableID, "test_idx")
5533+
return ok && ts.GE(&now)
5534+
},
5535+
)
5536+
ts, ok := cdcExecutor.GetWatermark(accountId, tableID, "test_idx")
5537+
assert.True(t, ok)
5538+
assert.True(t, ts.GE(&now))
5539+
}

0 commit comments

Comments
 (0)