diff --git a/pkg/objectio/ioutil/codec.go b/pkg/objectio/ioutil/codec.go index 5e34bca972a93..36edef126356b 100644 --- a/pkg/objectio/ioutil/codec.go +++ b/pkg/objectio/ioutil/codec.go @@ -144,6 +144,26 @@ func EncodeGCMetadataName(start, end types.TS) string { ) } +func EncodeGCFastMetadataName(start, end types.TS) string { + return fmt.Sprintf( + "%s_%s_%s.%s", + PrefixGCMeta, + start.ToString(), + end.ToString(), + FastMetaExt, + ) +} + +func EncodeGCScanMetadataName(start, end types.TS) string { + return fmt.Sprintf( + "%s_%s_%s.%s", + PrefixGCMeta, + start.ToString(), + end.ToString(), + ScanMetaExt, + ) +} + func DecodeGCMetadataName(name string) (ret TSRangeFile) { return DecodeTSRangeFile(name) } diff --git a/pkg/objectio/ioutil/const.go b/pkg/objectio/ioutil/const.go index 0bdb171521973..c8b921c0ecd4d 100644 --- a/pkg/objectio/ioutil/const.go +++ b/pkg/objectio/ioutil/const.go @@ -22,6 +22,8 @@ const defaultGCDir = "gc/" const ( // checkpint related CheckpointExt = "ckp" + ScanMetaExt = "scan" + FastMetaExt = "fast" CompactExt = "cpt" // gc related @@ -54,6 +56,10 @@ func IsCKPExt(ext string) bool { return ext == CheckpointExt } +func IsScanExt(ext string) bool { + return ext == ScanMetaExt +} + func IsGCMetaExt(ext string) bool { return ext == GCMetaExt } diff --git a/pkg/objectio/ioutil/types.go b/pkg/objectio/ioutil/types.go index 3c00aa89d0eff..1c9fb80353da0 100644 --- a/pkg/objectio/ioutil/types.go +++ b/pkg/objectio/ioutil/types.go @@ -57,6 +57,14 @@ func (m TSRangeFile) IsCKPFile() bool { return IsCKPExt(m.ext) } +func (m TSRangeFile) IsScanFile() bool { + return IsScanExt(m.ext) +} + +func (m TSRangeFile) IsFastFile() bool { + return m.ext == FastMetaExt +} + func (m TSRangeFile) IsSnapshotExt() bool { return m.ext == SnapshotExt } diff --git a/pkg/sql/plan/function/ctl/cmd_disk_cleaner.go b/pkg/sql/plan/function/ctl/cmd_disk_cleaner.go index af92673352ed2..8a1fede522c52 100644 --- a/pkg/sql/plan/function/ctl/cmd_disk_cleaner.go +++ b/pkg/sql/plan/function/ctl/cmd_disk_cleaner.go @@ -33,6 +33,7 @@ func IsValidArg(parameter string, proc *process.Process) (*cmd_util.DiskCleaner, switch op { case cmd_util.AddChecker: case cmd_util.RemoveChecker: + case cmd_util.ExecuteGC: break default: return nil, moerr.NewInternalError(proc.Ctx, "handleDiskCleaner: invalid operation!") @@ -42,6 +43,14 @@ func IsValidArg(parameter string, proc *process.Process) (*cmd_util.DiskCleaner, case cmd_util.CheckerKeyTTL: case cmd_util.CheckerKeyMinTS: break + case cmd_util.FastGC: + ts := proc.Base.TxnClient.MinTimestamp() + minTS := types.TimestampToTS(ts) + return &cmd_util.DiskCleaner{ + Op: op, + Key: key, + Value: minTS.ToString(), + }, nil default: return nil, moerr.NewInternalError(proc.Ctx, "handleDiskCleaner: invalid key!") } diff --git a/pkg/txn/client/client.go b/pkg/txn/client/client.go index 322ab528f9938..1a9811a5100b6 100644 --- a/pkg/txn/client/client.go +++ b/pkg/txn/client/client.go @@ -382,10 +382,14 @@ func (client *txnClient) Close() error { func (client *txnClient) MinTimestamp() timestamp.Timestamp { client.mu.RLock() - defer client.mu.RUnlock() + ops := make([]*txnOperator, 0, len(client.mu.activeTxns)) + for _, op := range client.mu.activeTxns { + ops = append(ops, op) + } + client.mu.RUnlock() min := timestamp.Timestamp{} - for _, op := range client.mu.activeTxns { + for _, op := range ops { if min.IsEmpty() || op.Txn().SnapshotTS.Less(min) { min = op.Txn().SnapshotTS diff --git a/pkg/vm/engine/cmd_util/type.go b/pkg/vm/engine/cmd_util/type.go index 7e61cf81ebbeb..d709c8b39c29b 100644 --- a/pkg/vm/engine/cmd_util/type.go +++ b/pkg/vm/engine/cmd_util/type.go @@ -24,4 +24,7 @@ const ( AddChecker = "add_checker" RemoveChecker = "remove_checker" + ExecuteGC = "execute_gc" + + FastGC = "fast" ) diff --git a/pkg/vm/engine/tae/db/gc/v3/base.go b/pkg/vm/engine/tae/db/gc/v3/base.go new file mode 100644 index 0000000000000..35ec87fc812b2 --- /dev/null +++ b/pkg/vm/engine/tae/db/gc/v3/base.go @@ -0,0 +1,275 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gc + +import ( + "context" + "fmt" + "github.com/matrixorigin/matrixone/pkg/common/bitmap" + "github.com/matrixorigin/matrixone/pkg/common/malloc" + "github.com/matrixorigin/matrixone/pkg/common/mpool" + "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/logutil" + "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/vm/engine" + "github.com/matrixorigin/matrixone/pkg/vm/engine/ckputil" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" +) + +const ( + Default_Coarse_EstimateRows = 10000000 + Default_Coarse_Probility = 0.00001 + Default_CanGC_TailSize = 64 * malloc.MB +) + +type GCJobExecutorOption func(*GCJob) + +func WithGCJobCoarseConfig( + estimateRows int, + probility float64, + size int, +) GCJobExecutorOption { + return func(e *GCJob) { + e.config.coarseEstimateRows = estimateRows + e.config.coarseProbility = probility + e.config.canGCCacheSize = size + } +} + +type FilterProvider interface { + CoarseFilter(ctx context.Context) (FilterFn, error) + FineFilter(ctx context.Context) (FilterFn, error) +} + +type BaseCheckpointGCJob struct { + GCExecutor + config struct { + coarseEstimateRows int + coarseProbility float64 + canGCCacheSize int + } + sourcer engine.BaseReader + snapshotMeta *logtail.SnapshotMeta + accountSnapshots map[uint32][]types.TS + pitr *logtail.PitrInfo + ts *types.TS + result struct { + filesToGC []string + filesNotGC []objectio.ObjectStats + } + transObjects map[string]*ObjectEntry + buffer containers.IBatchBuffer + + filterProvider FilterProvider +} + +func (e *BaseCheckpointGCJob) fillDefaults() { + if e.config.coarseEstimateRows <= 0 { + e.config.coarseEstimateRows = Default_Coarse_EstimateRows + } + if e.config.coarseProbility <= 0 { + e.config.coarseProbility = Default_Coarse_Probility + } + if e.config.canGCCacheSize <= 0 { + e.config.canGCCacheSize = Default_CanGC_TailSize + } +} + +func (e *BaseCheckpointGCJob) Result() ([]string, []objectio.ObjectStats) { + return e.result.filesToGC, e.result.filesNotGC +} + +func (e *BaseCheckpointGCJob) Close() error { + for name, entry := range e.transObjects { + entry.Release() + delete(e.transObjects, name) + } + if e.sourcer != nil { + e.sourcer.Close() + e.sourcer = nil + } + if e.buffer != nil { + e.buffer.Close(e.mp) + e.buffer = nil + } + e.snapshotMeta = nil + e.accountSnapshots = nil + e.pitr = nil + e.ts = nil + e.result.filesToGC = nil + e.result.filesNotGC = nil + return e.GCExecutor.Close() +} + +func (e *BaseCheckpointGCJob) Execute(ctx context.Context) error { + if e.transObjects == nil { + e.transObjects = make(map[string]*ObjectEntry, 100) + } + if len(e.transObjects) > 0 { + logutil.Warnf("transObjects is not empty: %d, maybe it is not closed", len(e.transObjects)) + } + attrs, attrTypes := ckputil.DataScan_TableIDAtrrs, ckputil.DataScan_TableIDTypes + e.buffer = containers.NewOneSchemaBatchBuffer( + mpool.MB*16, + attrs, + attrTypes, + ) + defer e.buffer.Close(e.mp) + coarseFilter, err := e.filterProvider.CoarseFilter(ctx) + if err != nil { + return err + } + fineFilter, err := e.filterProvider.FineFilter(ctx) + if err != nil { + return err + } + + return e.ExecuteTemplate(ctx, coarseFilter, fineFilter) +} + +func (e *BaseCheckpointGCJob) ExecuteTemplate( + ctx context.Context, + coarseFilter, fineFilter FilterFn, +) error { + e.result.filesToGC = make([]string, 0, 20) + finalSinker, err := MakeFinalCanGCSinker(&e.result.filesToGC) + if err != nil { + return err + } + + newFiles, err := e.Run( + ctx, + e.sourcer.Read, + coarseFilter, + fineFilter, + finalSinker, + ) + if err != nil { + return err + } + + e.result.filesNotGC = make([]objectio.ObjectStats, 0, len(newFiles)) + e.result.filesNotGC = append(e.result.filesNotGC, newFiles...) + return nil +} + +func (e *BaseCheckpointGCJob) FineFilter(_ context.Context) (FilterFn, error) { + return MakeSnapshotAndPitrFineFilter( + e.ts, + e.accountSnapshots, + e.pitr, + e.snapshotMeta, + e.transObjects, + ) +} + +func MakeSnapshotAndPitrFineFilter( + ts *types.TS, + accountSnapshots map[uint32][]types.TS, + pitrs *logtail.PitrInfo, + snapshotMeta *logtail.SnapshotMeta, + transObjects map[string]*ObjectEntry, +) ( + filter FilterFn, + err error, +) { + tableSnapshots, tablePitrs := snapshotMeta.AccountToTableSnapshots( + accountSnapshots, + pitrs, + ) + return func( + ctx context.Context, + bm *bitmap.Bitmap, + bat *batch.Batch, + mp *mpool.MPool, + ) error { + createTSs := vector.MustFixedColNoTypeCheck[types.TS](bat.Vecs[1]) + deleteTSs := vector.MustFixedColNoTypeCheck[types.TS](bat.Vecs[2]) + tableIDs := vector.MustFixedColNoTypeCheck[uint64](bat.Vecs[4]) + for i := 0; i < bat.Vecs[0].Length(); i++ { + buf := bat.Vecs[0].GetRawBytesAt(i) + stats := (objectio.ObjectStats)(buf) + name := stats.ObjectName().UnsafeString() + tableID := tableIDs[i] + createTS := createTSs[i] + dropTS := deleteTSs[i] + + snapshots := tableSnapshots[tableID] + pitr := tablePitrs[tableID] + + if entry := transObjects[name]; entry != nil { + if !logtail.ObjectIsSnapshotRefers( + entry.stats, pitr, &entry.createTS, &entry.dropTS, snapshots, + ) { + bm.Add(uint64(i)) + } + continue + } + if !createTS.LT(ts) || !dropTS.LT(ts) { + continue + } + if dropTS.IsEmpty() { + panic(fmt.Sprintf("dropTS is empty, name: %s, createTS: %s", name, createTS.ToString())) + } + if !logtail.ObjectIsSnapshotRefers( + &stats, pitr, &createTS, &dropTS, snapshots, + ) { + bm.Add(uint64(i)) + } + } + return nil + }, nil +} + +func MakeFinalCanGCSinker( + filesToGC *[]string, +) ( + SinkerFn, + error, +) { + buffer := make(map[string]struct{}, 100) + return func( + ctx context.Context, bat *batch.Batch, + ) error { + clear(buffer) + var dropTSs []types.TS + var tableIDs []uint64 + if bat.Vecs[0].Length() > 0 { + dropTSs = vector.MustFixedColNoTypeCheck[types.TS](bat.Vecs[2]) + tableIDs = vector.MustFixedColNoTypeCheck[uint64](bat.Vecs[4]) + } + for i := 0; i < bat.Vecs[0].Length(); i++ { + buf := bat.Vecs[0].GetRawBytesAt(i) + stats := (objectio.ObjectStats)(buf) + name := stats.ObjectName().String() + dropTS := dropTSs[i] + tableID := tableIDs[i] + if !dropTS.IsEmpty() { + buffer[name] = struct{}{} + continue + } + if !logtail.IsMoTable(tableID) { + buffer[name] = struct{}{} + } + } + for name := range buffer { + *filesToGC = append(*filesToGC, name) + } + return nil + }, nil +} diff --git a/pkg/vm/engine/tae/db/gc/v3/base_test.go b/pkg/vm/engine/tae/db/gc/v3/base_test.go new file mode 100644 index 0000000000000..c0871b98d726f --- /dev/null +++ b/pkg/vm/engine/tae/db/gc/v3/base_test.go @@ -0,0 +1,121 @@ +// Copyright 2023 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gc + +import ( + "context" + "github.com/matrixorigin/matrixone/pkg/common/bitmap" + "github.com/matrixorigin/matrixone/pkg/common/mpool" + "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" +) + +// TestBaseCheckpointGCJobDefaults tests the default configuration values +func TestBaseCheckpointGCJobDefaults(t *testing.T) { + job := &BaseCheckpointGCJob{} + job.fillDefaults() + + assert.Equal(t, Default_Coarse_EstimateRows, job.config.coarseEstimateRows) + assert.Equal(t, Default_Coarse_Probility, job.config.coarseProbility) + assert.Equal(t, Default_CanGC_TailSize, job.config.canGCCacheSize) +} + +// TestBaseCheckpointGCJobCustomConfig tests custom configuration +func TestBaseCheckpointGCJobCustomConfig(t *testing.T) { + job := &BaseCheckpointGCJob{} + customEstimateRows := 5000000 + customProbability := 0.00002 + customCacheSize := 32 * 1024 * 1024 + + opt := WithGCJobCoarseConfig(customEstimateRows, customProbability, customCacheSize) + opt(job) + + assert.Equal(t, customEstimateRows, job.config.coarseEstimateRows) + assert.Equal(t, customProbability, job.config.coarseProbility) + assert.Equal(t, customCacheSize, job.config.canGCCacheSize) +} + +// TestBaseCheckpointGCJobClose tests the cleanup of resources +func TestBaseCheckpointGCJobClose(t *testing.T) { + transObjects := make(map[string]*ObjectEntry) + // Add some test objects + transObjects["test1"] = &ObjectEntry{ + stats: &objectio.ObjectStats{}, + createTS: types.BuildTS(1, 0), + dropTS: types.BuildTS(2, 0), + } + + job, _ := getJob(context.Background(), t, transObjects) + err := job.Close() + require.NoError(t, err) + assert.Empty(t, job.transObjects) + assert.Nil(t, job.sourcer) + assert.Nil(t, job.buffer) + assert.Nil(t, job.snapshotMeta) + assert.Nil(t, job.accountSnapshots) + assert.Nil(t, job.pitr) + assert.Nil(t, job.ts) +} + +// TestSnapshotAndPitrFineFilter tests the fine filter functionality +func TestSnapshotAndPitrFineFilter(t *testing.T) { + ts := types.BuildTS(100, 0) + accountSnapshots := map[uint32][]types.TS{ + 1: {types.BuildTS(50, 0)}, + } + pitrs := &logtail.PitrInfo{} + snapshotMeta := &logtail.SnapshotMeta{} + transObjects := make(map[string]*ObjectEntry) + + filter, err := MakeSnapshotAndPitrFineFilter( + &ts, + accountSnapshots, + pitrs, + snapshotMeta, + transObjects, + ) + + require.NoError(t, err) + require.NotNil(t, filter) +} + +// TestGCJobExecute tests the execution flow of a GC job +func TestGCJobExecute(t *testing.T) { + ctx := context.Background() + // Create test objects with overlapping timestamps + job, _ := getJob(ctx, t, nil) + + mockProvider := &MockFilterProvider{ + coarseFilterFn: func(ctx context.Context, bm *bitmap.Bitmap, bat *batch.Batch, mp *mpool.MPool) error { + return nil + }, + fineFilterFn: func(ctx context.Context, bm *bitmap.Bitmap, bat *batch.Batch, mp *mpool.MPool) error { + return nil + }, + } + job.filterProvider = mockProvider + + err := job.Execute(ctx) + require.NoError(t, err) + + // Verify results initialization + assert.NotNil(t, job.result.filesToGC) + assert.NotNil(t, job.result.filesNotGC) +} diff --git a/pkg/vm/engine/tae/db/gc/v3/checkpoint.go b/pkg/vm/engine/tae/db/gc/v3/checkpoint.go index b68287797e886..357473a9ae637 100644 --- a/pkg/vm/engine/tae/db/gc/v3/checkpoint.go +++ b/pkg/vm/engine/tae/db/gc/v3/checkpoint.go @@ -35,6 +35,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/wal" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/testutils" "go.uber.org/zap" ) @@ -345,6 +346,16 @@ func (c *checkpointCleaner) Replay(inputCtx context.Context) (err error) { maxConsumedStart = *meta.GetStart() maxConsumedEnd = *meta.GetEnd() gcFiles = append(gcFiles, meta.GetName()) + + if meta.IsCKPFile() && maxConsumedStart.IsEmpty() { + gckp := c.checkpointCli.MaxGlobalCheckpoint() + if gckp != nil { + end := gckp.GetEnd() + if end.LT(&maxConsumedEnd) { + c.updateGCWaterMark(gckp) + } + } + } } } @@ -592,6 +603,8 @@ func (c *checkpointCleaner) deleteStaleSnapshotFilesLocked() error { delete(metaFiles, maxFile) return } + newMaxFile = maxFile + newMaxTS = *maxTS // thisTS <= maxTS: this file is expired and should be deleted if err = c.fs.Delete(c.ctx, ioutil.MakeGCFullName(thisFile)); err != nil { @@ -665,7 +678,7 @@ func (c *checkpointCleaner) deleteStaleCKPMetaFileLocked() (err error) { metaFiles := c.CloneMetaFilesLocked() filesToDelete := make([]string, 0) for _, metaFile := range metaFiles { - if !metaFile.IsCKPFile() || + if (!metaFile.IsCKPFile() && !metaFile.IsScanFile() && !metaFile.IsFastFile()) || (metaFile.RangeEqual(&window.tsRange.start, &window.tsRange.end)) { logutil.Info( "GC-TRACE-DELETE-CKP-FILE-SKIP", @@ -1233,7 +1246,7 @@ func (c *checkpointCleaner) scanCheckpointsAsDebugWindow( return } -func (c *checkpointCleaner) DoCheck(ctx context.Context) error { +func (c *checkpointCleaner) DoCheck(waitGC bool) error { c.StartMutationTask("gc-check") defer c.StopMutationTask() @@ -1247,10 +1260,12 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error { } gCkp := c.GetGCWaterMark() - testutils.WaitExpect(10000, func() bool { - gCkp = c.GetGCWaterMark() - return gCkp != nil - }) + if waitGC { + testutils.WaitExpect(10000, func() bool { + gCkp = c.GetGCWaterMark() + return gCkp != nil + }) + } if gCkp == nil { gCkp = c.checkpointCli.MaxGlobalCheckpoint() if gCkp == nil { @@ -1349,7 +1364,6 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error { return err } - //logutil.Infof("debug table is %d, stats is %v", len(debugWindow.files.stats), debugWindow.files.stats[0].ObjectName().String()) if _, _, err = debugWindow.ExecuteGlobalCheckpointBasedGC( c.ctx, gCkp, @@ -1371,7 +1385,6 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error { return err } - //logutil.Infof("debug table2 is %d, stats is %v", len(debugWindow.files.stats), debugWindow.files.stats[0].ObjectName().String()) objects1, objects2, equal := mergeWindow.Compare(debugWindow, buffer) if !equal { logutil.Error( @@ -1455,23 +1468,55 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error { return nil } -func (c *checkpointCleaner) Process(inputCtx context.Context) (err error) { +func (c *checkpointCleaner) Process( + inputCtx context.Context, + jobType tasks.JobType, + ts *types.TS, +) (err error) { if !c.GCEnabled() { return } now := time.Now() - c.StartMutationTask("gc-process") - defer c.StopMutationTask() - startScanWaterMark := c.GetScanWaterMark() startGCWaterMark := c.GetGCWaterMark() - + var name, msg string + var checker func(*checkpoint.CheckpointEntry) bool + var execute func(context.Context, *containers.OneSchemaBatchBuffer) error + var scanCount int + minTS := &types.TS{} + switch jobType { + case JT_GCExecute: + name = "gc-execute" + msg = "GC-EXECUTE-PROCESS" + execute = c.tryGCLocked + scanCount = 10 + case JT_GCFastExecute: + name = "gc-fast-execute" + msg = "GC-FAST-EXECUTE-PROCESS" + minTS = ts + checker = func(ckp *checkpoint.CheckpointEntry) bool { + start := ckp.GetStart() + end := ckp.GetEnd() + if !minTS.IsEmpty() && + ((start.IsEmpty() && end.LT(minTS)) || + start.LT(minTS)) { + return true + } + return false + } + execute = c.tryFastGCLocked + scanCount = 20 + default: + return moerr.NewInternalErrorNoCtx("unknown job type") + } + c.StartMutationTask(name) + defer c.StopMutationTask() defer func() { endScanWaterMark := c.GetScanWaterMark() endGCWaterMark := c.GetGCWaterMark() logutil.Info( - "GC-TRACE-PROCESS", + msg, zap.String("task", c.TaskNameLocked()), zap.Duration("duration", time.Since(now)), zap.Error(err), @@ -1479,6 +1524,7 @@ func (c *checkpointCleaner) Process(inputCtx context.Context) (err error) { zap.String("end-scan-watermark", endScanWaterMark.String()), zap.String("start-gc-watermark", startGCWaterMark.String()), zap.String("end-gc-watermark", endGCWaterMark.String()), + zap.String("min-ts", minTS.ToString()), ) }() @@ -1503,21 +1549,152 @@ func (c *checkpointCleaner) Process(inputCtx context.Context) (err error) { memoryBuffer := MakeGCWindowBuffer(16 * mpool.MB) defer memoryBuffer.Close(c.mp) - if err = c.tryScanLocked(ctx, memoryBuffer); err != nil { + var tryGC bool + if tryGC, err = c.tryScanLocked( + ctx, memoryBuffer, scanCount, checker); err != nil || + (!tryGC && jobType == JT_GCFastExecute) { + // fast gc can return directly if no gc is needed, but normal gc must execute + // tryGCLocked because it needs to clean up and merge expired checkpoints return } - err = c.tryGCLocked(ctx, memoryBuffer) - return + return execute(ctx, memoryBuffer) +} + +func (c *checkpointCleaner) tryFastGCLocked( + ctx context.Context, + memoryBuffer *containers.OneSchemaBatchBuffer, +) (err error) { + var snapshots map[uint32]containers.Vector + var extraErrMsg string + now := time.Now() + defer func() { + logtail.CloseSnapshotList(snapshots) + logutil.Info( + "GC-TRACE-TRY-FAST-GC-AGAINST-GCKP", + zap.String("task", c.TaskNameLocked()), + zap.Duration("duration", time.Since(now)), + zap.Error(err), + zap.String("extra-err-msg", extraErrMsg), + ) + }() + pitrs, err := c.GetPITRsLocked(ctx) + if err != nil { + extraErrMsg = "GetPITRs failed" + return + } + snapshots, err = c.mutation.snapshotMeta.GetSnapshot(c.ctx, c.sid, c.fs, c.mp) + if err != nil { + extraErrMsg = "GetSnapshot failed" + return + } + accountSnapshots := TransformToTSList(snapshots) + scanWindow := c.GetScannedWindowLocked() + + if scanWindow == nil { + return + } + filesToGC, err := c.doGCAgainstFastLocked( + accountSnapshots, pitrs, memoryBuffer, scanWindow, + ) + if err != nil { + extraErrMsg = "doGCAgainstGlobalCheckpointLocked failed" + return + } + // Delete files after doGCAgainstGlobalCheckpointLocked + // TODO:Requires Physical Removal Policy + if err = c.deleter.DeleteMany( + c.ctx, + c.TaskNameLocked(), + filesToGC, + ); err != nil { + extraErrMsg = fmt.Sprintf("ExecDelete %v failed", filesToGC) + return + } + if err = c.deleteStaleCKPMetaFileLocked(); err != nil { + logutil.Error( + "GC-TRY-DELETE-STALE-CKP-META-FILE-ERROR", + zap.Error(err), + zap.String("task", c.TaskNameLocked()), + ) + } + + if err = c.deleteStaleSnapshotFilesLocked(); err != nil { + logutil.Error( + "GC-TRY-DELETE-STALE-SNAPSHOT-FILES-ERROR", + zap.Error(err), + zap.String("task", c.TaskNameLocked()), + ) + } + return nil +} + +func (c *checkpointCleaner) doGCAgainstFastLocked( + accountSnapshots map[uint32][]types.TS, + pitrs *logtail.PitrInfo, + memoryBuffer *containers.OneSchemaBatchBuffer, + window *GCWindow, +) ([]string, error) { + now := time.Now() + + var ( + filesToGC []string + metafile string + err error + softCost, mergeCost time.Duration + extraErrMsg string + ) + + defer func() { + logutil.Info( + "GC-TRACE-DO-GC-AGAINST-FAST", + zap.String("task", c.TaskNameLocked()), + zap.Duration("duration", time.Since(now)), + zap.Duration("soft-gc", softCost), + zap.Duration("merge-table", mergeCost), + zap.Error(err), + zap.String("metafile", metafile), + zap.String("extra-err-msg", extraErrMsg), + ) + }() + if filesToGC, metafile, err = window.ExecuteFastBasedGC( + c.ctx, + &window.tsRange.end, + accountSnapshots, + pitrs, + c.mutation.snapshotMeta, + memoryBuffer, + c.config.canGCCacheSize, + c.config.estimateRows, + c.config.probility, + c.checkpointCli.GetCatalog(), + c.mp, + c.fs, + ); err != nil { + extraErrMsg = fmt.Sprintf("ExecuteGlobalCheckpointBasedGC %v-%v failed", window.tsRange.start.ToString(), window.tsRange.end.ToString()) + return nil, err + } + c.mutAddMetaFileLocked(metafile, ioutil.NewTSRangeFile( + metafile, + ioutil.FastMetaExt, + window.tsRange.start, + window.tsRange.end, + )) + return filesToGC, nil } // tryScanLocked scans the incremental checkpoints and tries to create a new GC window // it will update `mutation.scanned` and `mutation.metaFiles` // it will update the scan watermark // it will save the snapshot meta and table info to the disk + +// checker is the specified checker function, used to filter checkpoints. +// The checker is nil during normal GC, and is passed in during fast-gc. func (c *checkpointCleaner) tryScanLocked( ctx context.Context, memoryBuffer *containers.OneSchemaBatchBuffer, -) (err error) { + scanCount int, + checker func(*checkpoint.CheckpointEntry) bool, +) (tryGC bool, err error) { // get the max scanned timestamp var maxScannedTS types.TS if scanWaterMark := c.GetScanWaterMark(); scanWaterMark != nil { @@ -1525,7 +1702,7 @@ func (c *checkpointCleaner) tryScanLocked( } // get up to 10 incremental checkpoints starting from the max scanned timestamp - checkpoints := c.checkpointCli.ICKPSeekLT(maxScannedTS, 10) + checkpoints := c.checkpointCli.ICKPSeekLT(maxScannedTS, scanCount) // quick return if there is no incremental checkpoint if len(checkpoints) == 0 { @@ -1535,8 +1712,14 @@ func (c *checkpointCleaner) tryScanLocked( candidates := make([]*checkpoint.CheckpointEntry, 0, len(checkpoints)) // filter out the incremental checkpoints that do not meet the requirements for _, ckp := range checkpoints { - if !c.checkExtras(ckp) { - continue + if checker != nil { + if !checker(ckp) { + continue + } + } else { + if !c.checkExtras(ckp) { + continue + } } candidates = append(candidates, ckp) } @@ -1559,6 +1742,7 @@ func (c *checkpointCleaner) tryScanLocked( } c.mutAddScannedLocked(newWindow) c.updateScanWaterMark(candidates[len(candidates)-1]) + tryGC = true files := tmpNewFiles for _, stats := range c.GetScannedWindowLocked().files { files = append(files, stats.ObjectName().String()) @@ -1649,6 +1833,7 @@ func (c *checkpointCleaner) scanCheckpointsLocked( var ( snapSize, tableSize uint32 + gcMetaFile string ) defer func() { logutil.Info( @@ -1658,7 +1843,9 @@ func (c *checkpointCleaner) scanCheckpointsLocked( zap.Duration("duration", time.Since(now)), zap.Uint32("snap-meta-size :", snapSize), zap.Uint32("table-meta-size :", tableSize), - zap.String("snapshot-detail", c.mutation.snapshotMeta.String())) + zap.String("snapshot-detail", c.mutation.snapshotMeta.String()), + zap.String("scan-meta-file", gcMetaFile), + ) }() var snapshotFile, accountFile ioutil.TSRangeFile @@ -1715,7 +1902,6 @@ func (c *checkpointCleaner) scanCheckpointsLocked( } gcWindow = NewGCWindow(c.mp, c.fs) - var gcMetaFile string if gcMetaFile, err = gcWindow.ScanCheckpoints( ctx, ckps, @@ -1735,7 +1921,7 @@ func (c *checkpointCleaner) scanCheckpointsLocked( gcMetaFile, ioutil.NewTSRangeFile( gcMetaFile, - ioutil.CheckpointExt, + ioutil.ScanMetaExt, gcWindow.tsRange.start, gcWindow.tsRange.end, ), diff --git a/pkg/vm/engine/tae/db/gc/v3/cleaner_test.go b/pkg/vm/engine/tae/db/gc/v3/cleaner_test.go index b829bb08fbe23..d03864d146194 100644 --- a/pkg/vm/engine/tae/db/gc/v3/cleaner_test.go +++ b/pkg/vm/engine/tae/db/gc/v3/cleaner_test.go @@ -172,7 +172,7 @@ func TestForMockCoverage(t *testing.T) { var cleaner MockCleaner ctx := context.Background() require.NoError(t, cleaner.Replay(ctx)) - require.NoError(t, cleaner.Process(ctx)) + require.NoError(t, cleaner.Process(ctx, JT_GCExecute, nil)) require.NoError(t, cleaner.TryGC(ctx)) cleaner.AddChecker(nil, "") require.Nil(t, cleaner.GetChecker("")) @@ -181,7 +181,7 @@ func TestForMockCoverage(t *testing.T) { require.Nil(t, cleaner.GetCheckpointGCWaterMark()) require.Nil(t, cleaner.GetScannedWindow()) require.Nil(t, cleaner.GetMinMerged()) - require.Nil(t, cleaner.DoCheck(ctx)) + require.Nil(t, cleaner.DoCheck(true)) v1, v2 := cleaner.GetPITRs() require.Nil(t, v1) require.Nil(t, v2) diff --git a/pkg/vm/engine/tae/db/gc/v3/diskcleaner.go b/pkg/vm/engine/tae/db/gc/v3/diskcleaner.go index 192e5bb15fd94..bb8624772658e 100644 --- a/pkg/vm/engine/tae/db/gc/v3/diskcleaner.go +++ b/pkg/vm/engine/tae/db/gc/v3/diskcleaner.go @@ -17,6 +17,7 @@ package gc import ( "context" "fmt" + "github.com/matrixorigin/matrixone/pkg/container/types" "sync" "sync/atomic" "time" @@ -36,6 +37,7 @@ const ( JT_GCExecute JT_GCReplay JT_GCReplayAndExecute + JT_GCFastExecute ) func init() { @@ -43,6 +45,7 @@ func init() { tasks.RegisterJobType(JT_GCExecute, "GCExecute") tasks.RegisterJobType(JT_GCReplay, "GCReplay") tasks.RegisterJobType(JT_GCReplayAndExecute, "GCReplayAndExecute") + tasks.RegisterJobType(JT_GCFastExecute, "GCFastExecute") } type StateStep = uint32 @@ -59,6 +62,11 @@ type runningCtx struct { cancel context.CancelCauseFunc } +type specialGCJob struct { + jobType tasks.JobType + minTS *types.TS +} + // DiskCleaner is the main structure of v2 operation, // and provides "JobFactory" to let tae notify itself // to perform a v2 @@ -95,6 +103,15 @@ func (cleaner *DiskCleaner) GC(ctx context.Context) (err error) { return cleaner.scheduleGCJob(ctx) } +func (cleaner *DiskCleaner) FastGC(ctx context.Context, ts *types.TS) (err error) { + logutil.Info("GC-Send-Intents-Fast") + gcJob := new(specialGCJob) + gcJob.jobType = JT_GCFastExecute + gcJob.minTS = ts + _, err = cleaner.processQueue.Enqueue(gcJob) + return err +} + func (cleaner *DiskCleaner) IsWriteMode() bool { return cleaner.step.Load() == StateStep_Write } @@ -229,6 +246,7 @@ func (cleaner *DiskCleaner) scheduleGCJob(ctx context.Context) (err error) { err = moerr.NewTxnControlErrorNoCtxf("GC-Not-Write-Mode") return } + logutil.Info("GC-Send-Intents") _, err = cleaner.processQueue.Enqueue(JT_GCExecute) return } @@ -259,7 +277,34 @@ func (cleaner *DiskCleaner) doExecute(ctx context.Context) (err error) { cleaner.replayError.Store(nil) } } - err = cleaner.cleaner.Process(ctx) + err = cleaner.cleaner.Process(ctx, JT_GCExecute, nil) + return +} + +func (cleaner *DiskCleaner) doFastExecute(ctx context.Context, ts *types.TS) (err error) { + now := time.Now() + msg := "GC-Fast-Execute" + defer func() { + logger := logutil.Info + if err != nil { + logger = logutil.Error + } + logger( + msg, + zap.Duration("duration", time.Since(now)), + zap.Error(err), + ) + }() + if replayErr := cleaner.replayError.Load(); replayErr != nil { + if err = cleaner.cleaner.Replay(ctx); err != nil { + msg = "GC-Replay" + cleaner.replayError.Store(&err) + return + } else { + cleaner.replayError.Store(nil) + } + } + err = cleaner.cleaner.Process(ctx, JT_GCFastExecute, ts) return } @@ -321,6 +366,20 @@ func (cleaner *DiskCleaner) process(items ...any) { default: logutil.Error("GC-Unknown-JobType", zap.Any("job-type", v)) } + case *specialGCJob: + ctx := cleaner.runningCtx.Load() + if ctx == nil { + ctx = new(runningCtx) + ctx.ctx, ctx.cancel = context.WithCancelCause(context.Background()) + cleaner.runningCtx.Store(ctx) + } + job := item.(*specialGCJob) + switch job.jobType { + case JT_GCFastExecute: + cleaner.doFastExecute(ctx.ctx, job.minTS) + default: + logutil.Error("GC-Unknown-Special-JobType", zap.Any("job-type", job.jobType)) + } case *tasks.Job: // noop will reset the runningCtx if v.Type() == JT_GCNoop { diff --git a/pkg/vm/engine/tae/db/gc/v3/exec_v1.go b/pkg/vm/engine/tae/db/gc/v3/exec_v1.go index 2f82b619b35f0..5c1448d4c0a4b 100644 --- a/pkg/vm/engine/tae/db/gc/v3/exec_v1.go +++ b/pkg/vm/engine/tae/db/gc/v3/exec_v1.go @@ -16,64 +16,23 @@ package gc import ( "context" - "fmt" - "unsafe" - - "github.com/matrixorigin/matrixone/pkg/common/malloc" - + "github.com/matrixorigin/matrixone/pkg/common/bitmap" + "github.com/matrixorigin/matrixone/pkg/common/mpool" + "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/engine/ckputil" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" - - "github.com/matrixorigin/matrixone/pkg/common/bitmap" - "github.com/matrixorigin/matrixone/pkg/common/mpool" - "github.com/matrixorigin/matrixone/pkg/container/batch" - "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" ) -const ( - Default_Coarse_EstimateRows = 10000000 - Default_Coarse_Probility = 0.00001 - Default_CanGC_TailSize = 64 * malloc.MB -) - -type GCJobExecutorOption func(*GCJob) - -func WithGCJobCoarseConfig( - estimateRows int, - probility float64, - size int, -) GCJobExecutorOption { - return func(e *GCJob) { - e.config.coarseEstimateRows = estimateRows - e.config.coarseProbility = probility - e.config.canGCCacheSize = size - } -} - type CheckpointBasedGCJob struct { - GCExecutor - config struct { - coarseEstimateRows int - coarseProbility float64 - canGCCacheSize int - } - sourcer engine.BaseReader - snapshotMeta *logtail.SnapshotMeta - accountSnapshots map[uint32][]types.TS - pitr *logtail.PitrInfo - ts *types.TS - globalCkpLoc objectio.Location - globalCkpVer uint32 - - result struct { - filesToGC []string - filesNotGC []objectio.ObjectStats - } + BaseCheckpointGCJob + globalCkpLoc objectio.Location + globalCkpVer uint32 } func NewCheckpointBasedGCJob( @@ -90,112 +49,47 @@ func NewCheckpointBasedGCJob( fs fileservice.FileService, opts ...GCJobExecutorOption, ) *CheckpointBasedGCJob { - e := &CheckpointBasedGCJob{ + e := &BaseCheckpointGCJob{ sourcer: sourcer, snapshotMeta: snapshotMeta, accountSnapshots: accountSnapshots, pitr: pitr, ts: ts, - globalCkpLoc: globalCkpLoc, - globalCkpVer: gckpVersion, + transObjects: make(map[string]*ObjectEntry, 100), } + for _, opt := range opts { opt(e) } e.fillDefaults() e.GCExecutor = *NewGCExecutor(buffer, isOwner, e.config.canGCCacheSize, mp, fs) - return e + job := &CheckpointBasedGCJob{ + BaseCheckpointGCJob: *e, + globalCkpLoc: globalCkpLoc, + globalCkpVer: gckpVersion, + } + job.filterProvider = job + return job } func (e *CheckpointBasedGCJob) Close() error { - if e.sourcer != nil { - e.sourcer.Close() - e.sourcer = nil - } - e.snapshotMeta = nil - e.accountSnapshots = nil - e.pitr = nil - e.ts = nil e.globalCkpLoc = nil - e.globalCkpVer = 0 - e.result.filesToGC = nil - e.result.filesNotGC = nil - return e.GCExecutor.Close() -} - -func (e *CheckpointBasedGCJob) fillDefaults() { - if e.config.coarseEstimateRows <= 0 { - e.config.coarseEstimateRows = Default_Coarse_EstimateRows - } - if e.config.coarseProbility <= 0 { - e.config.coarseProbility = Default_Coarse_Probility - } - if e.config.canGCCacheSize <= 0 { - e.config.canGCCacheSize = Default_CanGC_TailSize - } + return e.BaseCheckpointGCJob.Close() } -func (e *CheckpointBasedGCJob) Execute(ctx context.Context) error { - attrs, attrTypes := ckputil.DataScan_TableIDAtrrs, ckputil.DataScan_TableIDTypes - buffer := containers.NewOneSchemaBatchBuffer( - mpool.MB*16, - attrs, - attrTypes, - ) - defer buffer.Close(e.mp) - transObjects := make(map[string]*ObjectEntry, 100) - coarseFilter, err := MakeBloomfilterCoarseFilter( +func (e *CheckpointBasedGCJob) CoarseFilter(ctx context.Context) (FilterFn, error) { + return MakeBloomfilterCoarseFilter( ctx, e.config.coarseEstimateRows, e.config.coarseProbility, - buffer, + e.buffer, e.globalCkpLoc, e.globalCkpVer, e.ts, - &transObjects, + &e.transObjects, e.mp, e.fs, ) - if err != nil { - return err - } - - fineFilter, err := MakeSnapshotAndPitrFineFilter( - e.ts, - e.accountSnapshots, - e.pitr, - e.snapshotMeta, - transObjects, - ) - if err != nil { - return err - } - - e.result.filesToGC = make([]string, 0, 20) - finalSinker, err := MakeFinalCanGCSinker(&e.result.filesToGC) - if err != nil { - return err - } - - newFiles, err := e.Run( - ctx, - e.sourcer.Read, - coarseFilter, - fineFilter, - finalSinker, - ) - if err != nil { - return err - } - transObjects = nil - - e.result.filesNotGC = make([]objectio.ObjectStats, 0, len(newFiles)) - e.result.filesNotGC = append(e.result.filesNotGC, newFiles...) - return nil -} - -func (e *CheckpointBasedGCJob) Result() ([]string, []objectio.ObjectStats) { - return e.result.filesToGC, e.result.filesNotGC } func MakeBloomfilterCoarseFilter( @@ -259,14 +153,13 @@ func MakeBloomfilterCoarseFilter( name := stats.ObjectName().UnsafeString() if dropTS.IsEmpty() && (*transObjects)[name] == nil { - object := &ObjectEntry{ - stats: &stats, - createTS: createTS, - dropTS: dropTS, - db: dbs[i], - table: tableIDs[i], - } - (*transObjects)[name] = object + entry := NewObjectEntry() + entry.stats = &stats + entry.createTS = createTS + entry.dropTS = dropTS + entry.db = dbs[i] + entry.table = tableIDs[i] + (*transObjects)[name] = entry return } if (*transObjects)[name] != nil { @@ -279,99 +172,3 @@ func MakeBloomfilterCoarseFilter( }, nil } - -func MakeSnapshotAndPitrFineFilter( - ts *types.TS, - accountSnapshots map[uint32][]types.TS, - pitrs *logtail.PitrInfo, - snapshotMeta *logtail.SnapshotMeta, - transObjects map[string]*ObjectEntry, -) ( - filter FilterFn, - err error, -) { - tableSnapshots, tablePitrs := snapshotMeta.AccountToTableSnapshots( - accountSnapshots, - pitrs, - ) - return func( - ctx context.Context, - bm *bitmap.Bitmap, - bat *batch.Batch, - mp *mpool.MPool, - ) error { - createTSs := vector.MustFixedColNoTypeCheck[types.TS](bat.Vecs[1]) - deleteTSs := vector.MustFixedColNoTypeCheck[types.TS](bat.Vecs[2]) - tableIDs := vector.MustFixedColNoTypeCheck[uint64](bat.Vecs[4]) - for i := 0; i < bat.Vecs[0].Length(); i++ { - buf := bat.Vecs[0].GetRawBytesAt(i) - stats := (objectio.ObjectStats)(buf) - name := stats.ObjectName().UnsafeString() - tableID := tableIDs[i] - createTS := createTSs[i] - dropTS := deleteTSs[i] - - snapshots := tableSnapshots[tableID] - pitr := tablePitrs[tableID] - - if entry := transObjects[name]; entry != nil { - if !logtail.ObjectIsSnapshotRefers( - entry.stats, pitr, &entry.createTS, &entry.dropTS, snapshots, - ) { - bm.Add(uint64(i)) - } - continue - } - if !createTS.LT(ts) || !dropTS.LT(ts) { - continue - } - if dropTS.IsEmpty() { - panic(fmt.Sprintf("dropTS is empty, name: %s, createTS: %s", name, createTS.ToString())) - } - if !logtail.ObjectIsSnapshotRefers( - &stats, pitr, &createTS, &dropTS, snapshots, - ) { - bm.Add(uint64(i)) - } - } - return nil - }, nil -} - -func MakeFinalCanGCSinker( - filesToGC *[]string, -) ( - SinkerFn, - error, -) { - buffer := make(map[string]struct{}, 100) - return func( - ctx context.Context, bat *batch.Batch, - ) error { - clear(buffer) - var dropTSs []types.TS - var tableIDs []uint64 - if bat.Vecs[0].Length() > 0 { - dropTSs = vector.MustFixedColNoTypeCheck[types.TS](bat.Vecs[2]) - tableIDs = vector.MustFixedColNoTypeCheck[uint64](bat.Vecs[4]) - } - for i := 0; i < bat.Vecs[0].Length(); i++ { - buf := bat.Vecs[0].GetRawBytesAt(i) - stats := (*objectio.ObjectStats)(unsafe.Pointer(&buf[0])) - name := stats.ObjectName().String() - dropTS := dropTSs[i] - tableID := tableIDs[i] - if !dropTS.IsEmpty() { - buffer[name] = struct{}{} - continue - } - if !logtail.IsMoTable(tableID) { - buffer[name] = struct{}{} - } - } - for name := range buffer { - *filesToGC = append(*filesToGC, name) - } - return nil - }, nil -} diff --git a/pkg/vm/engine/tae/db/gc/v3/executor.go b/pkg/vm/engine/tae/db/gc/v3/executor.go index 85ec7b25c81da..9ca528954aa12 100644 --- a/pkg/vm/engine/tae/db/gc/v3/executor.go +++ b/pkg/vm/engine/tae/db/gc/v3/executor.go @@ -16,7 +16,6 @@ package gc import ( "context" - "github.com/matrixorigin/matrixone/pkg/common/bitmap" "github.com/matrixorigin/matrixone/pkg/common/bloomfilter" "github.com/matrixorigin/matrixone/pkg/common/mpool" @@ -31,7 +30,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" ) -type GCJob = CheckpointBasedGCJob +type GCJob = BaseCheckpointGCJob type FilterFn func(context.Context, *bitmap.Bitmap, *batch.Batch, *mpool.MPool) error type SourerFn func(context.Context, []string, *plan.Expr, *mpool.MPool, *batch.Batch) (bool, error) diff --git a/pkg/vm/engine/tae/db/gc/v3/fast.go b/pkg/vm/engine/tae/db/gc/v3/fast.go new file mode 100644 index 0000000000000..dd0add76ab757 --- /dev/null +++ b/pkg/vm/engine/tae/db/gc/v3/fast.go @@ -0,0 +1,161 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gc + +import ( + "context" + "github.com/matrixorigin/matrixone/pkg/catalog" + "github.com/matrixorigin/matrixone/pkg/common/bitmap" + "github.com/matrixorigin/matrixone/pkg/common/mpool" + "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/fileservice" + "github.com/matrixorigin/matrixone/pkg/logutil" + "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/vm/engine" + catalog2 "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" +) + +type CheckpointFastGCJob struct { + BaseCheckpointGCJob + catalog *catalog2.Catalog +} + +func NewCheckpointFastGCJob( + ts *types.TS, + sourcer engine.BaseReader, + pitr *logtail.PitrInfo, + accountSnapshots map[uint32][]types.TS, + snapshotMeta *logtail.SnapshotMeta, + buffer *containers.OneSchemaBatchBuffer, + isOwner bool, + catalog *catalog2.Catalog, + mp *mpool.MPool, + fs fileservice.FileService, + opts ...GCJobExecutorOption, +) *CheckpointFastGCJob { + e := &BaseCheckpointGCJob{ + sourcer: sourcer, + snapshotMeta: snapshotMeta, + accountSnapshots: accountSnapshots, + pitr: pitr, + ts: ts, + transObjects: make(map[string]*ObjectEntry, 100), + } + + for _, opt := range opts { + opt(e) + } + e.fillDefaults() + e.GCExecutor = *NewGCExecutor(buffer, isOwner, e.config.canGCCacheSize, mp, fs) + job := &CheckpointFastGCJob{ + BaseCheckpointGCJob: *e, + catalog: catalog, + } + job.filterProvider = job + return job +} + +func (e *CheckpointFastGCJob) CoarseFilter(_ context.Context) (FilterFn, error) { + return makeSoftDeleteFilterCoarseFilter( + e.transObjects, + e.snapshotMeta, + e.catalog, + ) +} + +func makeSoftDeleteFilterCoarseFilter( + transObjects map[string]*ObjectEntry, + meta *logtail.SnapshotMeta, + c *catalog2.Catalog, +) ( + FilterFn, + error, +) { + filterTable := meta.GetSnapshotTableIDs() + tables := meta.GetTableIDs() + tableIsDrop := func(db, tid uint64) bool { + dbEntry, getErr := c.GetDatabaseByID(db) + if getErr != nil { + return true + } + dbEntry.GetDeleteAtLocked() + tableEntry, getErr := dbEntry.GetTableEntryByID(tid) + if getErr != nil { + return true + } + dropTS := tableEntry.GetDeleteAtLocked() + return !dropTS.IsEmpty() + } + logutil.Infof("GetTableIDs count is %d", len(tables)) + return func( + ctx context.Context, + bm *bitmap.Bitmap, + bat *batch.Batch, + mp *mpool.MPool, + ) (err error) { + createTSs := vector.MustFixedColNoTypeCheck[types.TS](bat.Vecs[1]) + dropTSs := vector.MustFixedColNoTypeCheck[types.TS](bat.Vecs[2]) + dbs := vector.MustFixedColNoTypeCheck[uint64](bat.Vecs[3]) + tableIDs := vector.MustFixedColNoTypeCheck[uint64](bat.Vecs[4]) + for i := 0; i < bat.Vecs[0].Length(); i++ { + createTS := createTSs[i] + dropTS := dropTSs[i] + buf := bat.Vecs[0].GetRawBytesAt(i) + stats := (objectio.ObjectStats)(buf) + name := stats.ObjectName().UnsafeString() + dropTSIsEmpty := dropTS.IsEmpty() + if dropTSIsEmpty { + if (transObjects)[name] == nil { + object := NewObjectEntry() + object.stats = &stats + object.createTS = createTS + object.dropTS = dropTS + object.db = dbs[i] + object.table = tableIDs[i] + (transObjects)[name] = object + } + table := tables[tableIDs[i]] + if table != nil && !table.IsDrop() { + continue + } + + if table == nil { + if !tableIsDrop(dbs[i], tableIDs[i]) { + continue + } + } + } + + if _, ok := filterTable[tableIDs[i]]; ok { + continue + } + + if catalog.IsSystemTable(tableIDs[i]) { + continue + } + bm.Add(uint64(i)) + if (transObjects)[name] != nil { + (transObjects)[name].dropTS = dropTS + continue + } + } + return nil + + }, nil +} diff --git a/pkg/vm/engine/tae/db/gc/v3/filter_test.go b/pkg/vm/engine/tae/db/gc/v3/filter_test.go new file mode 100644 index 0000000000000..e7309abd192c1 --- /dev/null +++ b/pkg/vm/engine/tae/db/gc/v3/filter_test.go @@ -0,0 +1,165 @@ +// Copyright 2023 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gc + +import ( + "context" + "testing" + + "github.com/matrixorigin/matrixone/pkg/common/bitmap" + "github.com/matrixorigin/matrixone/pkg/common/mpool" + "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" + "github.com/stretchr/testify/require" +) + +// TestFilterWithEmptySnapshots tests filter behavior with empty snapshots +func TestFilterWithEmptySnapshots(t *testing.T) { + ctx := context.Background() + + ts := types.BuildTS(100, 0) + accountSnapshots := make(map[uint32][]types.TS) + pitrs := &logtail.PitrInfo{} + snapshotMeta := &logtail.SnapshotMeta{} + job, transObjects := getJob(ctx, t, nil) + + filter, err := MakeSnapshotAndPitrFineFilter( + &ts, + accountSnapshots, + pitrs, + snapshotMeta, + transObjects, + ) + require.NoError(t, err) + require.NotNil(t, filter) + + job.filterProvider = &MockFilterProvider{ + fineFilterFn: filter, + coarseFilterFn: func(ctx context.Context, bm *bitmap.Bitmap, bat *batch.Batch, mp *mpool.MPool) error { + return nil + }, + } + + err = job.Execute(ctx) + require.NoError(t, err) +} + +// TestFilterWithOverlappingTimestamps tests filter behavior with overlapping timestamps +func TestFilterWithOverlappingTimestamps(t *testing.T) { + ctx := context.Background() + + ts := types.BuildTS(100, 0) + accountSnapshots := map[uint32][]types.TS{ + 1: { + types.BuildTS(45, 0), + types.BuildTS(55, 0), + types.BuildTS(65, 0), + }, + } + + job, transObjects := getJob(ctx, t, nil) + + filter, err := MakeSnapshotAndPitrFineFilter( + &ts, + accountSnapshots, + &logtail.PitrInfo{}, + &logtail.SnapshotMeta{}, + transObjects, + ) + require.NoError(t, err) + + job.filterProvider = &MockFilterProvider{ + fineFilterFn: filter, + coarseFilterFn: func(ctx context.Context, bm *bitmap.Bitmap, bat *batch.Batch, mp *mpool.MPool) error { + return nil + }, + } + + err = job.Execute(ctx) + require.NoError(t, err) +} + +// TestFilterWithEdgeCases tests filter behavior with edge cases +func TestFilterWithEdgeCases(t *testing.T) { + ctx := context.Background() + + ts := types.BuildTS(100, 0) + accountSnapshots := map[uint32][]types.TS{ + 1: {types.BuildTS(50, 0)}, + } + + // Test edge cases + testCases := []struct { + name string + createTS types.TS + dropTS types.TS + }{ + { + name: "same_timestamp", + createTS: types.BuildTS(50, 0), + dropTS: types.BuildTS(50, 0), + }, + { + name: "boundary_before", + createTS: types.BuildTS(49, 0), + dropTS: types.BuildTS(50, 0), + }, + { + name: "boundary_after", + createTS: types.BuildTS(50, 0), + dropTS: types.BuildTS(51, 0), + }, + { + name: "far_future", + createTS: types.BuildTS(200, 0), + dropTS: types.BuildTS(300, 0), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + transObjects := map[string]*ObjectEntry{ + tc.name: { + stats: &objectio.ObjectStats{}, + createTS: tc.createTS, + dropTS: tc.dropTS, + }, + } + + job, _ := getJob(ctx, t, transObjects) + + filter, err := MakeSnapshotAndPitrFineFilter( + &ts, + accountSnapshots, + &logtail.PitrInfo{}, + &logtail.SnapshotMeta{}, + transObjects, + ) + require.NoError(t, err) + + job.filterProvider = &MockFilterProvider{ + fineFilterFn: filter, + coarseFilterFn: func(ctx context.Context, bm *bitmap.Bitmap, bat *batch.Batch, mp *mpool.MPool) error { + return nil + }, + } + + err = job.Execute(ctx) + require.NoError(t, err) + }) + } +} diff --git a/pkg/vm/engine/tae/db/gc/v3/gc_test.go b/pkg/vm/engine/tae/db/gc/v3/gc_test.go new file mode 100644 index 0000000000000..2779e1f986d0d --- /dev/null +++ b/pkg/vm/engine/tae/db/gc/v3/gc_test.go @@ -0,0 +1,188 @@ +// Copyright 2023 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gc + +import ( + "context" + "fmt" + "github.com/matrixorigin/matrixone/pkg/common/bitmap" + "github.com/matrixorigin/matrixone/pkg/common/mpool" + "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +// TestComplexGCScenario tests a more complex GC scenario with multiple objects +func TestComplexGCScenario(t *testing.T) { + ctx := context.Background() + + // Create test objects with different timestamps + objects := map[string]*ObjectEntry{ + "obj1": { + stats: &objectio.ObjectStats{}, + createTS: types.BuildTS(1, 0), + dropTS: types.BuildTS(5, 0), + }, + "obj2": { + stats: &objectio.ObjectStats{}, + createTS: types.BuildTS(2, 0), + dropTS: types.BuildTS(6, 0), + }, + "obj3": { + stats: &objectio.ObjectStats{}, + createTS: types.BuildTS(3, 0), + dropTS: types.BuildTS(7, 0), + }, + } + + job, _ := getJob(ctx, t, objects) + // Setup mock filter provider + mockProvider := &MockFilterProvider{ + coarseFilterFn: func(ctx context.Context, bm *bitmap.Bitmap, bat *batch.Batch, mp *mpool.MPool) error { + // Simulate coarse filtering + return nil + }, + fineFilterFn: func(ctx context.Context, bm *bitmap.Bitmap, bat *batch.Batch, mp *mpool.MPool) error { + // Simulate fine filtering + return nil + }, + } + job.filterProvider = mockProvider + + // Execute GC + err := job.Execute(ctx) + require.NoError(t, err) + + // Verify results + assert.NotNil(t, job.result.filesToGC) + assert.NotNil(t, job.result.filesNotGC) +} + +// TestConcurrentGCOperations tests GC operations under concurrent scenarios +func TestConcurrentGCOperations(t *testing.T) { + ctx := context.Background() + + // Create multiple GC jobs + jobs := make([]*BaseCheckpointGCJob, 3) + now := time.Now().UTC() + for i := 0; i < 3; i++ { + + // Add some test objects + transObjects := make(map[string]*ObjectEntry) + transObjects[fmt.Sprintf("obj%d", i)] = &ObjectEntry{ + stats: &objectio.ObjectStats{}, + createTS: types.BuildTS(now.UnixNano()+1+int64(i), 0), + dropTS: types.BuildTS(now.UnixNano()+5+int64(i), 0), + } + jobs[i], _ = getJob(ctx, t, transObjects) + + // Setup mock filter provider + jobs[i].filterProvider = &MockFilterProvider{ + coarseFilterFn: func(ctx context.Context, bm *bitmap.Bitmap, bat *batch.Batch, mp *mpool.MPool) error { + return nil + }, + fineFilterFn: func(ctx context.Context, bm *bitmap.Bitmap, bat *batch.Batch, mp *mpool.MPool) error { + return nil + }, + } + } + + // Run jobs concurrently + errChan := make(chan error, len(jobs)) + for _, job := range jobs { + go func(j *BaseCheckpointGCJob) { + errChan <- j.Execute(ctx) + }(job) + } + + // Wait for all jobs to complete + for i := 0; i < len(jobs); i++ { + err := <-errChan + require.NoError(t, err) + } + + // Verify results + for _, job := range jobs { + assert.NotNil(t, job.result.filesToGC) + assert.NotNil(t, job.result.filesNotGC) + } +} + +// TestGCWithSnapshots tests GC behavior with different snapshot configurations +func TestGCWithSnapshots(t *testing.T) { + ctx := context.Background() + ts := types.BuildTS(100, 0) + + // Create test snapshots + accountSnapshots := map[uint32][]types.TS{ + 1: {types.BuildTS(50, 0), types.BuildTS(75, 0)}, + 2: {types.BuildTS(60, 0)}, + } + + snapshotMeta := &logtail.SnapshotMeta{} + pitrs := &logtail.PitrInfo{} + + // Create test objects + noid := objectio.NewObjectid() + stats := objectio.NewObjectStatsWithObjectID(&noid, false, false, false) + noid2 := objectio.NewObjectid() + stats2 := objectio.NewObjectStatsWithObjectID(&noid2, false, false, false) + transObjects := map[string]*ObjectEntry{ + "obj1": { + stats: stats, + createTS: types.BuildTS(40, 0), + dropTS: types.BuildTS(80, 0), + }, + "obj2": { + stats: stats2, + createTS: types.BuildTS(70, 0), + dropTS: types.BuildTS(90, 0), + }, + } + + // Create and test fine filter + filter, err := MakeSnapshotAndPitrFineFilter( + &ts, + accountSnapshots, + pitrs, + snapshotMeta, + transObjects, + ) + require.NoError(t, err) + require.NotNil(t, filter) + mp, err := mpool.NewMPool("GC", 0, mpool.NoFixed) + require.NoError(t, err) + + bat := createTestBatch() + for _, obj := range transObjects { + bat.Vecs[0].Append(obj.stats[:], false) + bat.Vecs[1].Append(obj.createTS, false) + bat.Vecs[2].Append(obj.dropTS, false) + bat.Vecs[3].Append(uint64(1), false) + bat.Vecs[4].Append(uint64(1), false) + } + + // Test filter + bm := bitmap.Bitmap{} + bm.TryExpandWithSize(len(transObjects)) + err = filter(ctx, &bm, containers.ToCNBatch(bat), mp) + require.NoError(t, err) +} diff --git a/pkg/vm/engine/tae/db/gc/v3/merge.go b/pkg/vm/engine/tae/db/gc/v3/merge.go index 09b72158227f1..cac66e628cfa8 100644 --- a/pkg/vm/engine/tae/db/gc/v3/merge.go +++ b/pkg/vm/engine/tae/db/gc/v3/merge.go @@ -47,10 +47,16 @@ func MergeCheckpoint( client checkpoint.Runner, pool *mpool.MPool, fs fileservice.FileService, -) (deleteFiles, newFiles []string, checkpointEntry *checkpoint.CheckpointEntry, ckpData *batch.Batch, err error) { +) (deleteFiles, + newFiles []string, + checkpointEntry *checkpoint.CheckpointEntry, + ckpData *batch.Batch, + err error) { + if len(ckpEntries) == 0 { + return + } ckpData = ckputil.NewObjectListBatch() - datas := make([]*logtail.CKPReader, 0) - deleteFiles = make([]string, 0) + deleteSet := make(map[string]struct{}) for _, ckpEntry := range ckpEntries { select { case <-ctx.Done(): @@ -63,78 +69,17 @@ func MergeCheckpoint( zap.String("task", taskName), zap.String("entry", ckpEntry.String()), ) - var data *logtail.CKPReader - var locations map[string]objectio.Location - if _, data, err = logtail.LoadCheckpointEntriesFromKey( - ctx, - sid, - fs, - ckpEntry.GetLocation(), - ckpEntry.GetVersion(), - nil, - &types.TS{}, - ); err != nil { - return - } - datas = append(datas, data) - var nameMeta string - if ckpEntry.GetType() == checkpoint.ET_Compacted { - nameMeta = ioutil.EncodeCompactCKPMetadataFullName( - ckpEntry.GetStart(), ckpEntry.GetEnd(), - ) - } else { - nameMeta = ioutil.EncodeCKPMetadataFullName( - ckpEntry.GetStart(), ckpEntry.GetEnd(), - ) - } - - // add checkpoint metafile(ckp/mete_ts-ts.ckp...) to deleteFiles - deleteFiles = append(deleteFiles, nameMeta) - // add checkpoint idx file to deleteFiles - deleteFiles = append(deleteFiles, ckpEntry.GetLocation().Name().String()) - locations, err = logtail.LoadCheckpointLocations( - ctx, sid, data, - ) + err = processCheckpointEntry(ctx, taskName, ckpEntry, sid, fs, bf, ckpData, pool, deleteSet) if err != nil { - if moerr.IsMoErrCode(err, moerr.ErrFileNotFound) { - deleteFiles = append(deleteFiles, nameMeta) - continue - } return } - - for name := range locations { - deleteFiles = append(deleteFiles, name) - } } - if len(datas) == 0 { - return - } - - newFiles = make([]string, 0) - // merge objects referenced by sansphot and pitr - for _, data := range datas { - select { - case <-ctx.Done(): - err = context.Cause(ctx) - return - default: - } - var objectBatch *batch.Batch - if objectBatch, err = data.GetCheckpointData(ctx); err != nil { - return - } - defer objectBatch.Clean(common.CheckpointAllocator) - statsVec := objectBatch.Vecs[ckputil.TableObjectsAttr_ID_Idx] - bf.Test(statsVec, - func(exists bool, i int) { - if !exists { - return - } - appendValToBatchForObjectListBatch(objectBatch, ckpData, i, pool) - }) + deleteFiles = make([]string, 0, len(deleteSet)) + for k := range deleteSet { + deleteFiles = append(deleteFiles, k) } + newFiles = make([]string, 0) sinker := ckputil.NewDataSinker(pool, fs) if err = sinker.Write(ctx, ckpData); err != nil { @@ -184,6 +129,81 @@ func MergeCheckpoint( return } +func processCheckpointEntry( + ctx context.Context, + taskName string, + ckpEntry *checkpoint.CheckpointEntry, + sid string, + fs fileservice.FileService, + bf *bloomfilter.BloomFilter, + ckpData *batch.Batch, + pool *mpool.MPool, + deleteSet map[string]struct{}, +) error { + var nameMeta string + if ckpEntry.GetType() == checkpoint.ET_Compacted { + nameMeta = ioutil.EncodeCompactCKPMetadataFullName( + ckpEntry.GetStart(), ckpEntry.GetEnd(), + ) + } else { + nameMeta = ioutil.EncodeCKPMetadataFullName( + ckpEntry.GetStart(), ckpEntry.GetEnd(), + ) + } + // add checkpoint metafile(ckp/mete_ts-ts.ckp...) to deleteFiles + deleteSet[nameMeta] = struct{}{} + // add checkpoint idx file to deleteFiles + deleteSet[ckpEntry.GetLocation().Name().String()] = struct{}{} + + var data *logtail.CKPReader + var err error + if _, data, err = logtail.LoadCheckpointEntriesFromKey( + ctx, + sid, + fs, + ckpEntry.GetLocation(), + ckpEntry.GetVersion(), + nil, + &types.TS{}, + ); err != nil { + return err + } + + locations, err := logtail.LoadCheckpointLocations( + ctx, sid, data, + ) + if err != nil { + if moerr.IsMoErrCode(err, moerr.ErrFileNotFound) { + logutil.Warn( + "GC-MERGE-CHECKPOINT-FILE-NOTFOUND", + zap.String("task", taskName), + zap.String("filename", nameMeta), + ) + return nil + } + return err + } + for name := range locations { + deleteSet[name] = struct{}{} + } + + var objectBatch *batch.Batch + if objectBatch, err = data.GetCheckpointData(ctx); err != nil { + return err + } + defer objectBatch.Clean(common.CheckpointAllocator) + statsVec := objectBatch.Vecs[ckputil.TableObjectsAttr_ID_Idx] + bf.Test(statsVec, + func(exists bool, i int) { + if !exists { + return + } + appendValToBatchForObjectListBatch(objectBatch, ckpData, i, pool) + }) + + return nil +} + func makeBatchFromSchema(schema *catalog.Schema) *containers.Batch { bat := containers.NewBatch() // Types() is not used, then empty schema can also be handled here diff --git a/pkg/vm/engine/tae/db/gc/v3/mock_cleaner.go b/pkg/vm/engine/tae/db/gc/v3/mock_cleaner.go index f6addfc35b962..b65262c836c8d 100644 --- a/pkg/vm/engine/tae/db/gc/v3/mock_cleaner.go +++ b/pkg/vm/engine/tae/db/gc/v3/mock_cleaner.go @@ -16,6 +16,7 @@ package gc import ( "context" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/types" @@ -65,7 +66,7 @@ func (c *MockCleaner) Replay(ctx context.Context) error { return nil } -func (c *MockCleaner) Process(ctx context.Context) error { +func (c *MockCleaner) Process(ctx context.Context, _ tasks.JobType, _ *types.TS) error { if c.processFunc != nil { return c.processFunc(ctx) } @@ -114,7 +115,7 @@ func (c *MockCleaner) GetMinMerged() *checkpoint.CheckpointEntry { return nil } -func (c *MockCleaner) DoCheck(_ context.Context) error { +func (c *MockCleaner) DoCheck(_ bool) error { return nil } diff --git a/pkg/vm/engine/tae/db/gc/v3/mock_test.go b/pkg/vm/engine/tae/db/gc/v3/mock_test.go new file mode 100644 index 0000000000000..3f28769faf537 --- /dev/null +++ b/pkg/vm/engine/tae/db/gc/v3/mock_test.go @@ -0,0 +1,146 @@ +// Copyright 2023 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gc + +import ( + "context" + "github.com/matrixorigin/matrixone/pkg/common/mpool" + "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/defines" + "github.com/matrixorigin/matrixone/pkg/fileservice" + "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/pb/plan" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/testutils" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "path" + "testing" +) + +// MockBaseReader implements engine.BaseReader interface for testing +type MockBaseReader struct { + batches []*batch.Batch + current int + err error +} + +func NewMockBaseReader(batches []*batch.Batch) *MockBaseReader { + return &MockBaseReader{ + batches: batches, + current: 0, + } +} + +func (m *MockBaseReader) Read(ctx context.Context, attrs []string, expr *plan.Expr, mp *mpool.MPool, bat *batch.Batch) (bool, error) { + if m.err != nil { + return false, m.err + } + if m.current >= len(m.batches) { + return true, nil + } + + // Copy data from current batch to output batch + srcBat := m.batches[m.current] + if err := bat.UnionWindow(srcBat, 0, srcBat.RowCount(), mp); err != nil { + return false, err + } + m.current++ + return false, nil +} + +func (m *MockBaseReader) Close() error { + return nil +} + +// MockFilterProvider implements FilterProvider interface for testing +type MockFilterProvider struct { + coarseFilterFn FilterFn + fineFilterFn FilterFn +} + +func (m *MockFilterProvider) CoarseFilter(ctx context.Context) (FilterFn, error) { + return m.coarseFilterFn, nil +} + +func (m *MockFilterProvider) FineFilter(ctx context.Context) (FilterFn, error) { + return m.fineFilterFn, nil +} + +// Helper functions to create test batches +func createTestBatch() *containers.Batch { + opt := containers.Options{} + opt.Capacity = 0 + bat := containers.BuildBatch(ObjectTableAttrs, ObjectTableTypes, opt) + // Add test data here if needed + return bat +} + +func getJob(ctx context.Context, t *testing.T, transObjects map[string]*ObjectEntry) (*BaseCheckpointGCJob, map[string]*ObjectEntry) { + if transObjects == nil { + noid := objectio.NewObjectid() + stats := objectio.NewObjectStatsWithObjectID(&noid, false, false, false) + noid2 := objectio.NewObjectid() + stats2 := objectio.NewObjectStatsWithObjectID(&noid2, false, false, false) + noid3 := objectio.NewObjectid() + stats3 := objectio.NewObjectStatsWithObjectID(&noid3, false, false, false) + transObjects = map[string]*ObjectEntry{ + "obj1": { + stats: stats, + createTS: types.BuildTS(40, 0), + dropTS: types.BuildTS(50, 0), + }, + "obj2": { + stats: stats2, + createTS: types.BuildTS(45, 0), + dropTS: types.BuildTS(60, 0), + }, + "obj3": { + stats: stats3, + createTS: types.BuildTS(55, 0), + dropTS: types.BuildTS(70, 0), + }, + } + } + // Create test batch + bat := createTestBatch() + for _, obj := range transObjects { + bat.Vecs[0].Append(obj.stats[:], false) + bat.Vecs[1].Append(obj.createTS, false) + bat.Vecs[2].Append(obj.dropTS, false) + bat.Vecs[3].Append(uint64(1), false) + bat.Vecs[4].Append(uint64(1), false) + } + dir := testutils.InitTestEnv("GCV3", t) + dir = path.Join(dir, "/local") + c := fileservice.Config{ + Name: defines.LocalFileServiceName, + Backend: "DISK", + DataDir: dir, + } + service, err := fileservice.NewFileService(ctx, c, nil) + assert.Nil(t, err) + buffer := containers.NewOneSchemaBatchBuffer(mpool.GB, ObjectTableAttrs, ObjectTableTypes) + require.NotNil(t, buffer) + job := &BaseCheckpointGCJob{ + transObjects: make(map[string]*ObjectEntry), + sourcer: NewMockBaseReader([]*batch.Batch{containers.ToCNBatch(bat)}), + buffer: buffer, + } + job.GCExecutor = *NewGCExecutor(buffer, true, mpool.GB, common.DefaultAllocator, service) + return job, transObjects +} diff --git a/pkg/vm/engine/tae/db/gc/v3/object.go b/pkg/vm/engine/tae/db/gc/v3/object.go new file mode 100644 index 0000000000000..de009f42e2dea --- /dev/null +++ b/pkg/vm/engine/tae/db/gc/v3/object.go @@ -0,0 +1,53 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gc + +import ( + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/objectio" + "sync" +) + +type ObjectEntry struct { + stats *objectio.ObjectStats + createTS types.TS + dropTS types.TS + db uint64 + table uint64 +} + +var objectEntryPool = sync.Pool{ + New: func() interface{} { + return &ObjectEntry{} + }, +} + +func NewObjectEntry() *ObjectEntry { + entry, ok := objectEntryPool.Get().(*ObjectEntry) + if !ok { + // Defensive programming: create a new instance when the pool is polluted + return &ObjectEntry{} + } + return entry +} + +func (e *ObjectEntry) Release() { + e.stats = nil + e.createTS = types.TS{} + e.dropTS = types.TS{} + e.db = 0 + e.table = 0 + objectEntryPool.Put(e) +} diff --git a/pkg/vm/engine/tae/db/gc/v3/types.go b/pkg/vm/engine/tae/db/gc/v3/types.go index d9e81048864cc..9b079247a5cb5 100644 --- a/pkg/vm/engine/tae/db/gc/v3/types.go +++ b/pkg/vm/engine/tae/db/gc/v3/types.go @@ -16,6 +16,7 @@ package gc import ( "context" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/batch" @@ -28,112 +29,9 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" ) -type BatchType int8 - -const CurrentVersion = uint16(3) - -const ( - ObjectList BatchType = iota - TombstoneList -) - -const ( - CreateBlock BatchType = iota - DeleteBlock - DropTable - DropDB - DeleteFile - Tombstone -) - -const ( - GCAttrObjectName = "name" - GCAttrBlockId = "block_id" - GCAttrTableId = "table_id" - GCAttrDBId = "db_id" - GCAttrCommitTS = "commit_ts" - GCCreateTS = "create_time" - GCDeleteTS = "delete_time" - GCAttrTombstone = "tombstone" - GCAttrVersion = "version" -) - -var ( - BlockSchemaAttr = []string{ - GCAttrObjectName, - GCCreateTS, - GCDeleteTS, - GCAttrCommitTS, - GCAttrTableId, - } - BlockSchemaTypes = []types.Type{ - types.New(types.T_varchar, 5000, 0), - types.New(types.T_TS, types.MaxVarcharLen, 0), - types.New(types.T_TS, types.MaxVarcharLen, 0), - types.New(types.T_TS, types.MaxVarcharLen, 0), - types.New(types.T_uint64, 0, 0), - } - - BlockSchemaAttrV1 = []string{ - GCAttrBlockId, - GCAttrTableId, - GCAttrDBId, - GCAttrObjectName, - } - BlockSchemaTypesV1 = []types.Type{ - types.New(types.T_Blockid, 0, 0), - types.New(types.T_uint64, 0, 0), - types.New(types.T_uint64, 0, 0), - types.New(types.T_varchar, 5000, 0), - } - - TombstoneSchemaAttr = []string{ - GCAttrTombstone, - GCAttrObjectName, - GCAttrCommitTS, - } - - TombstoneSchemaTypes = []types.Type{ - types.New(types.T_varchar, 5000, 0), - types.New(types.T_varchar, 5000, 0), - types.New(types.T_TS, types.MaxVarcharLen, 0), - } - - VersionsSchemaAttr = []string{ - GCAttrVersion, - } - - VersionsSchemaTypes = []types.Type{ - types.New(types.T_uint16, 0, 0), - } - - DropTableSchemaAttr = []string{ - GCAttrTableId, - GCAttrDBId, - } - DropTableSchemaTypes = []types.Type{ - types.New(types.T_uint64, 0, 0), - types.New(types.T_uint64, 0, 0), - } - - DropDBSchemaAtt = []string{ - GCAttrDBId, - } - DropDBSchemaTypes = []types.Type{ - types.New(types.T_uint64, 0, 0), - } - - DeleteFileSchemaAtt = []string{ - GCAttrObjectName, - } - DeleteFileSchemaTypes = []types.Type{ - types.New(types.T_varchar, 5000, 0), - } -) - type Cleaner interface { Replay(context.Context) error - Process(context.Context) error + Process(context.Context, tasks.JobType, *types.TS) error TryGC(context.Context) error AddChecker(checker func(item any) bool, key string) int RemoveChecker(key string) error @@ -142,7 +40,7 @@ type Cleaner interface { GetScannedWindow() *GCWindow Stop() GetMinMerged() *checkpoint.CheckpointEntry - DoCheck(context.Context) error + DoCheck(bool) error GetPITRs() (*logtail.PitrInfo, error) SetTid(tid uint64) EnableGC() @@ -165,9 +63,6 @@ var FSinkerFactory ioutil.FileSinkerFactory const ObjectTablePrimaryKeyIdx = 0 const ObjectTableVersion = 0 -const ( - DefaultInMemoryStagedSize = mpool.MB * 32 -) func init() { ObjectTableAttrs = []string{ @@ -203,16 +98,6 @@ func init() { ) } -func NewObjectTableBatch() *batch.Batch { - ret := batch.New(ObjectTableAttrs) - ret.SetVector(0, vector.NewVec(ObjectTableTypes[0])) - ret.SetVector(1, vector.NewVec(ObjectTableTypes[1])) - ret.SetVector(2, vector.NewVec(ObjectTableTypes[2])) - ret.SetVector(3, vector.NewVec(ObjectTableTypes[3])) - ret.SetVector(4, vector.NewVec(ObjectTableTypes[4])) - return ret -} - func addObjectToBatch( bat *batch.Batch, stats *objectio.ObjectStats, diff --git a/pkg/vm/engine/tae/db/gc/v3/window.go b/pkg/vm/engine/tae/db/gc/v3/window.go index c3604dd55b4bd..6f6ab9a5b6c02 100644 --- a/pkg/vm/engine/tae/db/gc/v3/window.go +++ b/pkg/vm/engine/tae/db/gc/v3/window.go @@ -18,11 +18,11 @@ import ( "bytes" "context" "fmt" - "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/objectio/mergeutil" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/pb/timestamp" + catalog2 "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint" @@ -41,14 +41,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" ) -type ObjectEntry struct { - stats *objectio.ObjectStats - createTS types.TS - dropTS types.TS - db uint64 - table uint64 -} - type WindowOption func(*GCWindow) func WithWindowDir(dir string) WindowOption { @@ -159,7 +151,56 @@ func (w *GCWindow) ExecuteGlobalCheckpointBasedGC( var metaFile string var err error if metaFile, err = w.writeMetaForRemainings( - ctx, filesNotGC, + ctx, filesNotGC, ioutil.EncodeGCMetadataName, + ); err != nil { + return nil, "", err + } + + w.files = filesNotGC + return filesToGC, metaFile, nil +} + +func (w *GCWindow) ExecuteFastBasedGC( + ctx context.Context, + ts *types.TS, + accountSnapshots map[uint32][]types.TS, + pitrs *logtail.PitrInfo, + snapshotMeta *logtail.SnapshotMeta, + buffer *containers.OneSchemaBatchBuffer, + cacheSize int, + estimateRows int, + probility float64, + catalog *catalog2.Catalog, + mp *mpool.MPool, + fs fileservice.FileService, +) ([]string, string, error) { + + sourcer := w.MakeFilesReader(ctx, fs) + + job := NewCheckpointFastGCJob( + ts, + sourcer, + pitrs, + accountSnapshots, + snapshotMeta, + buffer, + false, + catalog, + mp, + fs, + WithGCJobCoarseConfig(estimateRows, probility, cacheSize), + ) + defer job.Close() + + if err := job.Execute(ctx); err != nil { + return nil, "", err + } + + filesToGC, filesNotGC := job.Result() + var metaFile string + var err error + if metaFile, err = w.writeMetaForRemainings( + ctx, filesNotGC, ioutil.EncodeGCFastMetadataName, ); err != nil { return nil, "", err } @@ -241,7 +282,7 @@ func (w *GCWindow) ScanCheckpoints( w.tsRange.end = end newFiles, _ := sinker.GetResult() if metaFile, err = w.writeMetaForRemainings( - ctx, newFiles, + ctx, newFiles, ioutil.EncodeGCScanMetadataName, ); err != nil { return } @@ -268,13 +309,14 @@ func (w *GCWindow) getSinker( func (w *GCWindow) writeMetaForRemainings( ctx context.Context, stats []objectio.ObjectStats, + encode func(types.TS, types.TS) string, ) (string, error) { select { case <-ctx.Done(): return "", context.Cause(ctx) default: } - name := ioutil.EncodeGCMetadataName(w.tsRange.start, w.tsRange.end) + name := encode(w.tsRange.start, w.tsRange.end) ret := batch.NewWithSchema( false, ObjectTableMetaAttrs, ObjectTableMetaTypes, ) diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index bc24c82a4fa3d..07e506a214801 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -6719,7 +6719,7 @@ func TestAppendAndGC(t *testing.T) { } logutil.Infof("start gc") assert.Equal(t, uint64(0), db.Runtime.Scheduler.GetPenddingLSNCnt()) - err = db.DiskCleaner.GetCleaner().DoCheck(ctx) + err = db.DiskCleaner.GetCleaner().DoCheck(true) assert.Nil(t, err) testutils.WaitExpect(10000, func() bool { return db.DiskCleaner.GetCleaner().GetMinMerged() != nil @@ -6830,7 +6830,9 @@ func TestAppendAndGC2(t *testing.T) { // check gc meta files var gcFile bool for file := range files { - if strings.Contains(file, "/gc_") && strings.Contains(file, ".ckp") { + if strings.Contains(file, "/gc_") && (strings.Contains(file, ".ckp") || + strings.Contains(file, ".fast") || + strings.Contains(file, ".scan")) { gcFile = true break } @@ -7009,7 +7011,7 @@ func TestSnapshotGC(t *testing.T) { return } assert.NotNil(t, minMerged) - err = db.DiskCleaner.GetCleaner().DoCheck(ctx) + err = db.DiskCleaner.GetCleaner().DoCheck(true) assert.Nil(t, err) tae.RestartDisableGC(ctx) db = tae.DB @@ -7024,7 +7026,7 @@ func TestSnapshotGC(t *testing.T) { end := db.DiskCleaner.GetCleaner().GetScanWaterMark().GetEnd() minEnd := minMerged.GetEnd() assert.True(t, end.GE(&minEnd)) - err = db.DiskCleaner.GetCleaner().DoCheck(ctx) + err = db.DiskCleaner.GetCleaner().DoCheck(true) assert.Nil(t, err) tbl := rele2.GetMeta().(*catalog.TableEntry) db2, err := db.Catalog.GetDatabaseByID(tbl.GetDB().ID) @@ -7235,7 +7237,7 @@ func TestSnapshotMeta(t *testing.T) { for _, snap := range snaps { assert.Equal(t, len(snapshots), snap.Length()) } - err = db.DiskCleaner.GetCleaner().DoCheck(ctx) + err = db.DiskCleaner.GetCleaner().DoCheck(true) assert.Nil(t, err) tae.RestartDisableGC(ctx) db = tae.DB @@ -7260,7 +7262,7 @@ func TestSnapshotMeta(t *testing.T) { for _, snap := range snaps { assert.Equal(t, len(snapshots), snap.Length()) } - err = db.DiskCleaner.GetCleaner().DoCheck(ctx) + err = db.DiskCleaner.GetCleaner().DoCheck(true) assert.Nil(t, err) } @@ -7489,7 +7491,7 @@ func TestPitrMeta(t *testing.T) { } } - err = db.DiskCleaner.GetCleaner().DoCheck(ctx) + err = db.DiskCleaner.GetCleaner().DoCheck(true) assert.Nil(t, err) assert.NotNil(t, minMerged) pitr, err := db.DiskCleaner.GetCleaner().GetPITRs() @@ -7508,7 +7510,7 @@ func TestPitrMeta(t *testing.T) { end := db.DiskCleaner.GetCleaner().GetScanWaterMark().GetEnd() minEnd = minMerged.GetEnd() assert.True(t, end.GE(&minEnd)) - err = db.DiskCleaner.GetCleaner().DoCheck(ctx) + err = db.DiskCleaner.GetCleaner().DoCheck(true) assert.Nil(t, err) db.BGCheckpointRunner.EnableCheckpoint(cfg) txn, _ = db.StartTxn(nil) @@ -7782,6 +7784,73 @@ func TestCkpLeak(t *testing.T) { } +func TestFastGC(t *testing.T) { + defer testutils.AfterTest(t)() + testutils.EnsureNoLeak(t) + ctx := context.Background() + + opts := config.WithQuickScanAndCKPOpts(nil) + opts.GCCfg.GCTTL = 1 * time.Hour + opts.GCCfg.ScanGCInterval = 1 * time.Hour + options.WithDisableGCCheckpoint()(opts) + tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) + defer tae.Close() + db := tae.DB + db.MergeScheduler.PauseAll() + schema1 := catalog.MockSchemaAll(13, 2) + schema1.Extra.BlockMaxRows = 10 + schema1.Extra.ObjectMaxBlocks = 2 + + schema2 := catalog.MockSchemaAll(13, 2) + schema2.Extra.BlockMaxRows = 10 + schema2.Extra.ObjectMaxBlocks = 2 + { + txn, _ := db.StartTxn(nil) + database, err := testutil.CreateDatabase2(ctx, txn, "db") + assert.Nil(t, err) + _, err = testutil.CreateRelation2(ctx, txn, database, schema1) + assert.Nil(t, err) + _, err = testutil.CreateRelation2(ctx, txn, database, schema2) + assert.Nil(t, err) + assert.Nil(t, txn.Commit(context.Background())) + } + bat := catalog.MockBatch(schema1, int(schema1.Extra.BlockMaxRows*10-1)) + defer bat.Close() + bats := bat.Split(bat.Length()) + + pool, err := ants.NewPool(20) + assert.Nil(t, err) + defer pool.Release() + var wg sync.WaitGroup + + for _, data := range bats { + wg.Add(2) + err = pool.Submit(testutil.AppendClosure(t, data, schema1.Name, db, &wg)) + assert.Nil(t, err) + err = pool.Submit(testutil.AppendClosure(t, data, schema2.Name, db, &wg)) + assert.Nil(t, err) + } + wg.Wait() + testutils.WaitExpect(10000, func() bool { + return db.Runtime.Scheduler.GetPenddingLSNCnt() == 0 + }) + t.Log(tae.Catalog.SimplePPString(common.PPL1)) + if db.Runtime.Scheduler.GetPenddingLSNCnt() != 0 { + return + } + tae.ForceCheckpoint() + logutil.Infof("start gc") + minTS := tae.TxnMgr.Now() + err = db.DiskCleaner.FastGC(ctx, &minTS) + assert.Nil(t, err) + assert.Equal(t, uint64(0), db.Runtime.Scheduler.GetPenddingLSNCnt()) + testutils.WaitExpect(10000, func() bool { + return db.DiskCleaner.GetCleaner().GetScanWaterMark() != nil + }) + err = db.DiskCleaner.GetCleaner().DoCheck(false) + assert.Nil(t, err) +} + func TestGlobalCheckpoint2(t *testing.T) { defer testutils.AfterTest(t)() testutils.EnsureNoLeak(t) diff --git a/pkg/vm/engine/tae/logtail/snapshot.go b/pkg/vm/engine/tae/logtail/snapshot.go index a4d6c5dcf9291..d59e911097d34 100644 --- a/pkg/vm/engine/tae/logtail/snapshot.go +++ b/pkg/vm/engine/tae/logtail/snapshot.go @@ -170,6 +170,10 @@ type tableInfo struct { pk string } +func (t *tableInfo) IsDrop() bool { + return !t.deleteAt.IsEmpty() +} + type PitrInfo struct { cluster types.TS account map[uint32]types.TS @@ -331,6 +335,19 @@ func IsMoTable(tid uint64) bool { return tid == catalog2.MO_TABLES_ID } +func (sm *SnapshotMeta) GetSnapshotTableIDs() map[uint64]struct{} { + return sm.snapshotTableIDs +} +func (sm *SnapshotMeta) GetTableIDs() map[uint64]*tableInfo { + sm.RLock() + defer sm.RUnlock() + tables := make(map[uint64]*tableInfo) + for id, table := range sm.tableIDIndex { + tables[id] = table + } + return tables +} + type tombstone struct { rowid types.Rowid pk types.Tuple @@ -364,7 +381,6 @@ func (sm *SnapshotMeta) updateTableInfo( } id := stats.ObjectName().SegmentId() moTable := (*objects)[tid] - // dropped object will overwrite the created object, updating the deleteAt obj := moTable[id] if obj == nil { diff --git a/pkg/vm/engine/tae/rpc/handle_debug.go b/pkg/vm/engine/tae/rpc/handle_debug.go index 8aebc0a17dd7d..8afa3f92114ab 100644 --- a/pkg/vm/engine/tae/rpc/handle_debug.go +++ b/pkg/vm/engine/tae/rpc/handle_debug.go @@ -613,6 +613,10 @@ func (h *Handle) HandleDiskCleaner( return !end.GE(&ts) }, cmd_util.CheckerKeyMinTS) return + case cmd_util.FastGC: + minTS := types.StringToTS(value) + err = h.db.DiskCleaner.FastGC(ctx, &minTS) + return default: return nil, moerr.NewInvalidArgNoCtx(key, value) } diff --git a/test/distributed/cases/dml/checkpoint/checkpoint.result b/test/distributed/cases/dml/checkpoint/checkpoint.result index 40e52f6c6a5ae..7d9ffce735b1d 100644 --- a/test/distributed/cases/dml/checkpoint/checkpoint.result +++ b/test/distributed/cases/dml/checkpoint/checkpoint.result @@ -72,4 +72,7 @@ a 3 4 5 +select mo_ctl('dn','DiskCleaner','execute_gc.fast'); +mo_ctl(dn, DiskCleaner, execute_gc.fast) +{\n "method": "DiskCleaner",\n "result": [\n {\n "returnStr": "OK"\n }\n ]\n}\n drop database if exists db1; diff --git a/test/distributed/cases/dml/checkpoint/checkpoint.test b/test/distributed/cases/dml/checkpoint/checkpoint.test index 589e16274cc25..e2bdd594a5b9e 100644 --- a/test/distributed/cases/dml/checkpoint/checkpoint.test +++ b/test/distributed/cases/dml/checkpoint/checkpoint.test @@ -43,4 +43,7 @@ select mo_ctl('dn','globalcheckpoint',''); select mo_ctl('dn','checkpoint',''); select * from t; +-- @separator:table +select mo_ctl('dn','DiskCleaner','execute_gc.fast'); + drop database if exists db1;