Skip to content

Commit 232e202

Browse files
fix iscp apply log (#22625)
判断非法的to ts Approved by: @XuPeng-SH
1 parent aa48108 commit 232e202

File tree

3 files changed

+127
-1
lines changed

3 files changed

+127
-1
lines changed

pkg/iscp/executor.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,9 @@ func (exec *ISCPTaskExecutor) applyISCPLog(ctx context.Context, from, to types.T
450450
err = moerr.NewErrStaleReadNoCtx("0-0", "0-0")
451451
return
452452
}
453+
if msg, injected := objectio.ISCPExecutorInjected(); injected && msg == "invalid timestamp" {
454+
to = types.TS{}
455+
}
453456
ctx = context.WithValue(ctx, defines.TenantIDKey{}, catalog.System_Account)
454457
ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
455458
defer cancel()

pkg/vm/engine/disttae/change_handle.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ func NewPartitionChangesHandle(
9292
skipDeletes bool,
9393
mp *mpool.MPool,
9494
) (*PartitionChangesHandle, error) {
95+
if to.IsEmpty() || from.GT(&to) {
96+
return nil, moerr.NewInternalErrorNoCtx("invalid timestamp")
97+
}
9598
handle := &PartitionChangesHandle{
9699
tbl: tbl,
97100
fromTs: from,
@@ -106,7 +109,7 @@ func NewPartitionChangesHandle(
106109
return nil, err
107110
}
108111
if end {
109-
panic(fmt.Sprintf("logic error: from %s to %s", from.ToString(), to.ToString()))
112+
return nil, moerr.NewInternalErrorNoCtx(fmt.Sprintf("logic error:from %s to %s", from.ToString(), to.ToString()))
110113
}
111114
return handle, err
112115
}

pkg/vm/engine/test/change_handle_test.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3457,6 +3457,126 @@ func TestDropJobsByDBName(t *testing.T) {
34573457
assert.False(t, ok)
34583458
}
34593459

3460+
func TestInvalidTimestamp(t *testing.T) {
3461+
catalog.SetupDefines("")
3462+
3463+
// idAllocator := common.NewIdAllocator(1000)
3464+
3465+
var (
3466+
accountId = catalog.System_Account
3467+
)
3468+
3469+
ctx, cancel := context.WithCancel(context.Background())
3470+
defer cancel()
3471+
ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId)
3472+
ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Minute*5)
3473+
defer cancel()
3474+
3475+
disttaeEngine, taeHandler, rpcAgent, _ := testutil.CreateEngines(ctx, testutil.TestOptions{}, t)
3476+
defer func() {
3477+
disttaeEngine.Close(ctx)
3478+
taeHandler.Close(true)
3479+
rpcAgent.Close()
3480+
}()
3481+
3482+
err := mock_mo_indexes(disttaeEngine, ctxWithTimeout)
3483+
require.NoError(t, err)
3484+
err = mock_mo_foreign_keys(disttaeEngine, ctxWithTimeout)
3485+
require.NoError(t, err)
3486+
err = mock_mo_intra_system_change_propagation_log(disttaeEngine, ctxWithTimeout)
3487+
require.NoError(t, err)
3488+
t.Log(taeHandler.GetDB().Catalog.SimplePPString(3))
3489+
// init cdc executor
3490+
cdcExecutor, err := iscp.NewISCPTaskExecutor(
3491+
ctxWithTimeout,
3492+
disttaeEngine.Engine,
3493+
disttaeEngine.GetTxnClient(),
3494+
"",
3495+
&iscp.ISCPExecutorOption{
3496+
GCInterval: time.Hour,
3497+
GCTTL: time.Hour,
3498+
SyncTaskInterval: time.Millisecond * 100,
3499+
FlushWatermarkInterval: time.Millisecond * 100,
3500+
RetryTimes: 1,
3501+
},
3502+
common.DebugAllocator,
3503+
)
3504+
require.NoError(t, err)
3505+
cdcExecutor.SetRpcHandleFn(taeHandler.GetRPCHandle().HandleGetChangedTableList)
3506+
3507+
err = cdcExecutor.Start()
3508+
require.NoError(t, err)
3509+
defer cdcExecutor.Stop()
3510+
3511+
// create database and table
3512+
3513+
bat := CreateDBAndTableForCNConsumerAndGetAppendData(t, disttaeEngine, ctxWithTimeout, "srcdb", "src_table", 10)
3514+
bats := bat.Split(10)
3515+
defer bat.Close()
3516+
3517+
// append 1 row
3518+
_, rel, txn, err := disttaeEngine.GetTable(ctxWithTimeout, "srcdb", "src_table")
3519+
require.Nil(t, err)
3520+
3521+
tableID := rel.GetTableID(ctxWithTimeout)
3522+
3523+
err = rel.Write(ctxWithTimeout, containers.ToCNBatch(bats[0]))
3524+
require.Nil(t, err)
3525+
3526+
txn.Commit(ctxWithTimeout)
3527+
3528+
fault.Enable()
3529+
defer fault.Disable()
3530+
3531+
rmFn, err := objectio.InjectCDCExecutor("invalid timestamp")
3532+
assert.NoError(t, err)
3533+
3534+
txn, err = disttaeEngine.NewTxnOperator(ctx, disttaeEngine.Engine.LatestLogtailAppliedTime())
3535+
require.NoError(t, err)
3536+
ok, err := iscp.RegisterJob(
3537+
ctx, "", txn,
3538+
&iscp.JobSpec{
3539+
ConsumerInfo: iscp.ConsumerInfo{
3540+
ConsumerType: int8(iscp.ConsumerType_CNConsumer),
3541+
},
3542+
},
3543+
&iscp.JobID{
3544+
JobName: "hnsw_idx",
3545+
DBName: "srcdb",
3546+
TableName: "src_table",
3547+
},
3548+
false,
3549+
)
3550+
assert.True(t, ok)
3551+
assert.NoError(t, err)
3552+
assert.NoError(t, txn.Commit(ctxWithTimeout))
3553+
3554+
now := taeHandler.GetDB().TxnMgr.Now()
3555+
testutils.WaitExpect(
3556+
4000,
3557+
func() bool {
3558+
ts, ok := cdcExecutor.GetWatermark(accountId, tableID, "hnsw_idx")
3559+
return ok && ts.GE(&now)
3560+
},
3561+
)
3562+
_, ok = cdcExecutor.GetWatermark(accountId, tableID, "hnsw_idx")
3563+
assert.False(t, ok)
3564+
3565+
rmFn()
3566+
3567+
now = taeHandler.GetDB().TxnMgr.Now()
3568+
testutils.WaitExpect(
3569+
4000,
3570+
func() bool {
3571+
ts, ok := cdcExecutor.GetWatermark(accountId, tableID, "hnsw_idx")
3572+
return ok && ts.GE(&now)
3573+
},
3574+
)
3575+
ts, ok := cdcExecutor.GetWatermark(accountId, tableID, "hnsw_idx")
3576+
assert.True(t, ok)
3577+
assert.True(t, ts.GE(&now))
3578+
}
3579+
34603580
func TestCancelIteration1(t *testing.T) {
34613581
catalog.SetupDefines("")
34623582

0 commit comments

Comments
 (0)