Skip to content

Commit f9055cc

Browse files
authored
Add cleaner logic to clean partition compaction blocks and related files (#6507)
* Add cleaner logic to clean partition compaction blocks and related files Signed-off-by: Alex Le <[email protected]> * refactored metrics Signed-off-by: Alex Le <[email protected]> * refactor Signed-off-by: Alex Le <[email protected]> * update logs Signed-off-by: Alex Le <[email protected]> --------- Signed-off-by: Alex Le <[email protected]>
1 parent 0bc6d62 commit f9055cc

File tree

4 files changed

+248
-8
lines changed

4 files changed

+248
-8
lines changed

pkg/compactor/blocks_cleaner.go

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package compactor
33
import (
44
"context"
55
"fmt"
6+
"strings"
67
"sync"
78
"time"
89

@@ -39,6 +40,8 @@ type BlocksCleanerConfig struct {
3940
CleanupConcurrency int
4041
BlockDeletionMarksMigrationEnabled bool // TODO Discuss whether we should remove it in Cortex 1.8.0 and document that upgrading to 1.7.0 before 1.8.0 is required.
4142
TenantCleanupDelay time.Duration // Delay before removing tenant deletion mark and "debug".
43+
ShardingStrategy string
44+
CompactionStrategy string
4245
}
4346

4447
type BlocksCleaner struct {
@@ -57,6 +60,7 @@ type BlocksCleaner struct {
5760

5861
cleanerVisitMarkerTimeout time.Duration
5962
cleanerVisitMarkerFileUpdateInterval time.Duration
63+
compactionVisitMarkerTimeout time.Duration
6064

6165
// Metrics.
6266
runsStarted *prometheus.CounterVec
@@ -73,24 +77,44 @@ type BlocksCleaner struct {
7377
tenantBucketIndexLastUpdate *prometheus.GaugeVec
7478
tenantBlocksCleanedTotal *prometheus.CounterVec
7579
tenantCleanDuration *prometheus.GaugeVec
80+
remainingPlannedCompactions *prometheus.GaugeVec
81+
inProgressCompactions *prometheus.GaugeVec
82+
oldestPartitionGroupOffset *prometheus.GaugeVec
7683
}
7784

7885
func NewBlocksCleaner(
7986
cfg BlocksCleanerConfig,
8087
bucketClient objstore.InstrumentedBucket,
8188
usersScanner *cortex_tsdb.UsersScanner,
89+
compactionVisitMarkerTimeout time.Duration,
8290
cfgProvider ConfigProvider,
8391
logger log.Logger,
8492
ringLifecyclerID string,
8593
reg prometheus.Registerer,
8694
cleanerVisitMarkerTimeout time.Duration,
8795
cleanerVisitMarkerFileUpdateInterval time.Duration,
8896
blocksMarkedForDeletion *prometheus.CounterVec,
97+
remainingPlannedCompactions *prometheus.GaugeVec,
8998
) *BlocksCleaner {
99+
100+
var inProgressCompactions *prometheus.GaugeVec
101+
var oldestPartitionGroupOffset *prometheus.GaugeVec
102+
if cfg.ShardingStrategy == util.ShardingStrategyShuffle && cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
103+
inProgressCompactions = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
104+
Name: "cortex_compactor_in_progress_compactions",
105+
Help: "Total number of in progress compactions. Only available with shuffle-sharding strategy and partitioning compaction strategy",
106+
}, commonLabels)
107+
oldestPartitionGroupOffset = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
108+
Name: "cortex_compactor_oldest_partition_offset",
109+
Help: "Time in seconds between now and the oldest created partition group not completed. Only available with shuffle-sharding strategy and partitioning compaction strategy",
110+
}, commonLabels)
111+
}
112+
90113
c := &BlocksCleaner{
91114
cfg: cfg,
92115
bucketClient: bucketClient,
93116
usersScanner: usersScanner,
117+
compactionVisitMarkerTimeout: compactionVisitMarkerTimeout,
94118
cfgProvider: cfgProvider,
95119
logger: log.With(logger, "component", "cleaner"),
96120
ringLifecyclerID: ringLifecyclerID,
@@ -153,6 +177,9 @@ func NewBlocksCleaner(
153177
Name: "cortex_bucket_clean_duration_seconds",
154178
Help: "Duration of cleaner runtime for a tenant in seconds",
155179
}, commonLabels),
180+
remainingPlannedCompactions: remainingPlannedCompactions,
181+
inProgressCompactions: inProgressCompactions,
182+
oldestPartitionGroupOffset: oldestPartitionGroupOffset,
156183
}
157184

158185
c.Service = services.NewBasicService(c.starting, c.loop, nil)
@@ -327,6 +354,13 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro
327354
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
328355
c.tenantPartialBlocks.DeleteLabelValues(userID)
329356
c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID)
357+
if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
358+
c.remainingPlannedCompactions.DeleteLabelValues(userID)
359+
if c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
360+
c.inProgressCompactions.DeleteLabelValues(userID)
361+
c.oldestPartitionGroupOffset.DeleteLabelValues(userID)
362+
}
363+
}
330364
}
331365
}
332366
c.lastOwnedUsers = allUsers
@@ -447,6 +481,15 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog
447481
level.Info(userLogger).Log("msg", "deleted files under "+block.DebugMetas+" for tenant marked for deletion", "count", deleted)
448482
}
449483

484+
if c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
485+
// Clean up partitioned group info files
486+
if deleted, err := bucket.DeletePrefix(ctx, userBucket, PartitionedGroupDirectory, userLogger); err != nil {
487+
return errors.Wrap(err, "failed to delete "+PartitionedGroupDirectory)
488+
} else if deleted > 0 {
489+
level.Info(userLogger).Log("msg", "deleted files under "+PartitionedGroupDirectory+" for tenant marked for deletion", "count", deleted)
490+
}
491+
}
492+
450493
if deleted, err := bucket.DeletePrefix(ctx, userBucket, bucketindex.MarkersPathname, userLogger); err != nil {
451494
return errors.Wrap(err, "failed to delete marker files")
452495
} else if deleted > 0 {
@@ -592,6 +635,12 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
592635
}
593636
level.Info(userLogger).Log("msg", "finish writing new index", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
594637

638+
if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle && c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
639+
begin = time.Now()
640+
c.cleanPartitionedGroupInfo(ctx, userBucket, userLogger, userID)
641+
level.Info(userLogger).Log("msg", "finish cleaning partitioned group info files", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
642+
}
643+
595644
c.tenantBlocks.WithLabelValues(userID).Set(float64(len(idx.Blocks)))
596645
c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks)))
597646
c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction))
@@ -600,6 +649,90 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
600649
return nil
601650
}
602651

652+
func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, userID string) {
653+
existentPartitionedGroupInfo := make(map[*PartitionedGroupInfo]struct {
654+
path string
655+
status PartitionedGroupStatus
656+
})
657+
err := userBucket.Iter(ctx, PartitionedGroupDirectory, func(file string) error {
658+
if strings.Contains(file, PartitionVisitMarkerDirectory) {
659+
return nil
660+
}
661+
partitionedGroupInfo, err := ReadPartitionedGroupInfoFile(ctx, userBucket, userLogger, file)
662+
if err != nil {
663+
level.Warn(userLogger).Log("msg", "failed to read partitioned group info", "partitioned_group_info", file)
664+
return nil
665+
}
666+
667+
status := partitionedGroupInfo.getPartitionedGroupStatus(ctx, userBucket, c.compactionVisitMarkerTimeout, userLogger)
668+
level.Debug(userLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String())
669+
existentPartitionedGroupInfo[partitionedGroupInfo] = struct {
670+
path string
671+
status PartitionedGroupStatus
672+
}{
673+
path: file,
674+
status: status,
675+
}
676+
return nil
677+
})
678+
679+
if err != nil {
680+
level.Warn(userLogger).Log("msg", "error return when going through partitioned group directory", "err", err)
681+
}
682+
683+
remainingCompactions := 0
684+
inProgressCompactions := 0
685+
var oldestPartitionGroup *PartitionedGroupInfo
686+
defer func() {
687+
c.remainingPlannedCompactions.WithLabelValues(userID).Set(float64(remainingCompactions))
688+
c.inProgressCompactions.WithLabelValues(userID).Set(float64(inProgressCompactions))
689+
if c.oldestPartitionGroupOffset != nil {
690+
if oldestPartitionGroup != nil {
691+
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(float64(time.Now().Unix() - oldestPartitionGroup.CreationTime))
692+
level.Debug(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTime)
693+
} else {
694+
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(0)
695+
}
696+
}
697+
}()
698+
for partitionedGroupInfo, extraInfo := range existentPartitionedGroupInfo {
699+
partitionedGroupInfoFile := extraInfo.path
700+
701+
remainingCompactions += extraInfo.status.PendingPartitions
702+
inProgressCompactions += extraInfo.status.InProgressPartitions
703+
if oldestPartitionGroup == nil || partitionedGroupInfo.CreationTime < oldestPartitionGroup.CreationTime {
704+
oldestPartitionGroup = partitionedGroupInfo
705+
}
706+
if extraInfo.status.CanDelete {
707+
if extraInfo.status.IsCompleted {
708+
// Try to remove all blocks included in partitioned group info
709+
if err := partitionedGroupInfo.markAllBlocksForDeletion(ctx, userBucket, userLogger, c.blocksMarkedForDeletion, userID); err != nil {
710+
level.Warn(userLogger).Log("msg", "unable to mark all blocks in partitioned group info for deletion", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID)
711+
// if one block can not be marked for deletion, we should
712+
// skip delete this partitioned group. next iteration
713+
// would try it again.
714+
continue
715+
}
716+
}
717+
718+
if err := userBucket.Delete(ctx, partitionedGroupInfoFile); err != nil {
719+
level.Warn(userLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_info", partitionedGroupInfoFile, "err", err)
720+
} else {
721+
level.Info(userLogger).Log("msg", "deleted partitioned group info", "partitioned_group_info", partitionedGroupInfoFile)
722+
}
723+
}
724+
725+
if extraInfo.status.CanDelete || extraInfo.status.DeleteVisitMarker {
726+
// Remove partition visit markers
727+
if _, err := bucket.DeletePrefix(ctx, userBucket, GetPartitionVisitMarkerDirectoryPath(partitionedGroupInfo.PartitionedGroupID), userLogger); err != nil {
728+
level.Warn(userLogger).Log("msg", "failed to delete partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile, "err", err)
729+
} else {
730+
level.Info(userLogger).Log("msg", "deleted partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile)
731+
}
732+
}
733+
}
734+
}
735+
603736
// cleanUserPartialBlocks delete partial blocks which are safe to be deleted. The provided partials map
604737
// and index are updated accordingly.
605738
func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, userID string, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket objstore.InstrumentedBucket, userLogger log.Logger) {

pkg/compactor/blocks_cleaner_test.go

Lines changed: 90 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/cortexproject/cortex/pkg/storage/tsdb"
2525
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
2626
cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
27+
"github.com/cortexproject/cortex/pkg/util"
2728
util_log "github.com/cortexproject/cortex/pkg/util/log"
2829
"github.com/cortexproject/cortex/pkg/util/services"
2930
)
@@ -86,8 +87,9 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
8687
Name: blocksMarkedForDeletionName,
8788
Help: blocksMarkedForDeletionHelp,
8889
}, append(commonLabels, reasonLabelName))
90+
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})
8991

90-
cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)
92+
cleaner := NewBlocksCleaner(cfg, mbucket, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)
9193

9294
// Clean User with no error
9395
cleaner.bucketClient = bkt
@@ -193,8 +195,9 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
193195
Name: blocksMarkedForDeletionName,
194196
Help: blocksMarkedForDeletionHelp,
195197
}, append(commonLabels, reasonLabelName))
198+
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})
196199

197-
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)
200+
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)
198201
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
199202
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck
200203

@@ -354,8 +357,9 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) {
354357
Name: blocksMarkedForDeletionName,
355358
Help: blocksMarkedForDeletionHelp,
356359
}, append(commonLabels, reasonLabelName))
360+
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})
357361

358-
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)
362+
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)
359363
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
360364
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck
361365

@@ -418,8 +422,9 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
418422
Name: blocksMarkedForDeletionName,
419423
Help: blocksMarkedForDeletionHelp,
420424
}, append(commonLabels, reasonLabelName))
425+
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})
421426

422-
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)
427+
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)
423428
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
424429
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck
425430

@@ -476,8 +481,9 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
476481
Name: blocksMarkedForDeletionName,
477482
Help: blocksMarkedForDeletionHelp,
478483
}, append(commonLabels, reasonLabelName))
484+
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})
479485

480-
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)
486+
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)
481487
activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
482488
require.NoError(t, err)
483489
require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, true))
@@ -617,8 +623,9 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
617623
Name: blocksMarkedForDeletionName,
618624
Help: blocksMarkedForDeletionHelp,
619625
}, append(commonLabels, reasonLabelName))
626+
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})
620627

621-
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)
628+
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)
622629

623630
assertBlockExists := func(user string, block ulid.ULID, expectExists bool) {
624631
exists, err := bucketClient.Exists(ctx, path.Join(user, block.String(), metadata.MetaFilename))
@@ -811,6 +818,83 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
811818
}
812819
}
813820

821+
func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
822+
bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t)
823+
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)
824+
825+
ts := func(hours int) int64 {
826+
return time.Now().Add(time.Duration(hours)*time.Hour).Unix() * 1000
827+
}
828+
829+
userID := "user-1"
830+
partitionedGroupID := uint32(123)
831+
partitionCount := 1
832+
startTime := ts(-10)
833+
endTime := ts(-8)
834+
block1 := createTSDBBlock(t, bucketClient, userID, startTime, endTime, nil)
835+
836+
cfg := BlocksCleanerConfig{
837+
DeletionDelay: time.Hour,
838+
CleanupInterval: time.Minute,
839+
CleanupConcurrency: 1,
840+
ShardingStrategy: util.ShardingStrategyShuffle,
841+
CompactionStrategy: util.CompactionStrategyPartitioning,
842+
}
843+
844+
ctx := context.Background()
845+
logger := log.NewNopLogger()
846+
reg := prometheus.NewPedanticRegistry()
847+
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
848+
cfgProvider := newMockConfigProvider()
849+
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
850+
Name: blocksMarkedForDeletionName,
851+
Help: blocksMarkedForDeletionHelp,
852+
}, append(commonLabels, reasonLabelName))
853+
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})
854+
855+
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)
856+
857+
userBucket := bucket.NewUserBucketClient(userID, bucketClient, cfgProvider)
858+
859+
partitionedGroupInfo := PartitionedGroupInfo{
860+
PartitionedGroupID: partitionedGroupID,
861+
PartitionCount: partitionCount,
862+
Partitions: []Partition{
863+
{
864+
PartitionID: 0,
865+
Blocks: []ulid.ULID{block1},
866+
},
867+
},
868+
RangeStart: startTime,
869+
RangeEnd: endTime,
870+
CreationTime: time.Now().Add(-5 * time.Minute).Unix(),
871+
Version: PartitionedGroupInfoVersion1,
872+
}
873+
_, err := UpdatePartitionedGroupInfo(ctx, userBucket, logger, partitionedGroupInfo)
874+
require.NoError(t, err)
875+
876+
visitMarker := &partitionVisitMarker{
877+
PartitionedGroupID: partitionedGroupID,
878+
PartitionID: 0,
879+
Status: Completed,
880+
VisitTime: time.Now().Add(-2 * time.Minute).Unix(),
881+
}
882+
visitMarkerManager := NewVisitMarkerManager(userBucket, logger, "dummy-cleaner", visitMarker)
883+
err = visitMarkerManager.updateVisitMarker(ctx)
884+
require.NoError(t, err)
885+
886+
cleaner.cleanPartitionedGroupInfo(ctx, userBucket, logger, userID)
887+
888+
partitionedGroupFileExists, err := userBucket.Exists(ctx, GetPartitionedGroupFile(partitionedGroupID))
889+
require.NoError(t, err)
890+
require.False(t, partitionedGroupFileExists)
891+
892+
block1DeletionMarkerExists, err := userBucket.Exists(ctx, path.Join(block1.String(), metadata.DeletionMarkFilename))
893+
require.NoError(t, err)
894+
require.True(t, block1DeletionMarkerExists)
895+
896+
}
897+
814898
type mockConfigProvider struct {
815899
userRetentionPeriods map[string]time.Duration
816900
}

pkg/compactor/compactor.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -657,8 +657,10 @@ func (c *Compactor) starting(ctx context.Context) error {
657657
CleanupConcurrency: c.compactorCfg.CleanupConcurrency,
658658
BlockDeletionMarksMigrationEnabled: c.compactorCfg.BlockDeletionMarksMigrationEnabled,
659659
TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay,
660-
}, c.bucketClient, c.usersScanner, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval,
661-
c.compactorMetrics.syncerBlocksMarkedForDeletion)
660+
ShardingStrategy: c.compactorCfg.ShardingStrategy,
661+
CompactionStrategy: c.compactorCfg.CompactionStrategy,
662+
}, c.bucketClient, c.usersScanner, c.compactorCfg.CompactionVisitMarkerTimeout, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval,
663+
c.compactorMetrics.syncerBlocksMarkedForDeletion, c.compactorMetrics.remainingPlannedCompactions)
662664

663665
// Ensure an initial cleanup occurred before starting the compactor.
664666
if err := services.StartAndAwaitRunning(ctx, c.blocksCleaner); err != nil {

0 commit comments

Comments
 (0)