Skip to content

Commit 63401ed

Browse files
authored
Avoid double compaction by cleaning partition files in 2 cycles (#7130)
* Avoid double compaction by cleaning partition files in 2 cycles Signed-off-by: Anna Tran <[email protected]> * Clean partition group blocks as first step in cleanUser Signed-off-by: Anna Tran <[email protected]> * Update CHANGELOG Signed-off-by: Anna Tran <[email protected]> --------- Signed-off-by: Anna Tran <[email protected]>
1 parent 13e7c40 commit 63401ed

File tree

4 files changed

+59
-20
lines changed

4 files changed

+59
-20
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
* [ENHANCEMENT] Distributor: Add a label references validation for remote write v2 request. #7074
1010
* [ENHANCEMENT] Distributor: Add count, spans, and buckets validations for native histogram. #7072
1111
* [ENHANCEMENT] Ruler: Add DecodingConcurrency config flag for Thanos Engine. #7118
12+
* [ENHANCEMENT] Compactor: Avoid double compaction by cleaning partition files in 2 cycles. #7129
1213
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
1314
* [BUGFIX] Ruler: Add XFunctions validation support. #7111
1415
* [BUGFIX] Distributor: Fix panic on health check failure when using stream push. #7116

pkg/compactor/blocks_cleaner.go

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,12 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
609609
c.tenantCleanDuration.WithLabelValues(userID).Set(time.Since(startTime).Seconds())
610610
}()
611611

612+
if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle && c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
613+
begin := time.Now()
614+
c.cleanPartitionedGroupInfo(ctx, userBucket, userLogger, userID)
615+
level.Info(userLogger).Log("msg", "finish cleaning partitioned group info files", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
616+
}
617+
612618
// Migrate block deletion marks to the global markers location. This operation is a best-effort.
613619
if firstRun && c.cfg.BlockDeletionMarksMigrationEnabled {
614620
if err := bucketindex.MigrateBlockDeletionMarksToGlobalLocation(ctx, c.bucketClient, userID, c.cfgProvider); err != nil {
@@ -753,12 +759,6 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
753759
level.Info(userLogger).Log("msg", "finish writing new index", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
754760
}
755761
c.updateBucketMetrics(userID, parquetEnabled, idx, float64(len(partials)), float64(totalBlocksBlocksMarkedForNoCompaction))
756-
757-
if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle && c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
758-
begin = time.Now()
759-
c.cleanPartitionedGroupInfo(ctx, userBucket, userLogger, userID)
760-
level.Info(userLogger).Log("msg", "finish cleaning partitioned group info files", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
761-
}
762762
return nil
763763
}
764764

@@ -787,28 +787,35 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke
787787
}
788788

789789
for partitionedGroupInfo, extraInfo := range existentPartitionedGroupInfo {
790+
isPartitionGroupInfoDeleted := false
790791
partitionedGroupInfoFile := extraInfo.path
791792

792793
if extraInfo.status.CanDelete {
793794
if extraInfo.status.IsCompleted {
794795
// Try to remove all blocks included in partitioned group info
795-
if err := partitionedGroupInfo.markAllBlocksForDeletion(ctx, userBucket, userLogger, c.blocksMarkedForDeletion, userID); err != nil {
796+
deletedBlocksCount, err := partitionedGroupInfo.markAllBlocksForDeletion(ctx, userBucket, userLogger, c.blocksMarkedForDeletion, userID)
797+
if err != nil {
796798
level.Warn(userLogger).Log("msg", "unable to mark all blocks in partitioned group info for deletion", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID)
797799
// if one block can not be marked for deletion, we should
798800
// skip delete this partitioned group. next iteration
799801
// would try it again.
800802
continue
801803
}
802-
}
803-
804-
if err := userBucket.Delete(ctx, partitionedGroupInfoFile); err != nil {
805-
level.Warn(userLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_info", partitionedGroupInfoFile, "err", err)
806-
} else {
807-
level.Info(userLogger).Log("msg", "deleted partitioned group info", "partitioned_group_info", partitionedGroupInfoFile)
804+
if deletedBlocksCount > 0 {
805+
level.Info(userLogger).Log("msg", "parent blocks deleted, will delete partition group file in next cleaning cycle", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID)
806+
} else {
807+
level.Info(userLogger).Log("msg", "deleting partition group now that all associated blocks have been deleted", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID)
808+
if err := userBucket.Delete(ctx, partitionedGroupInfoFile); err != nil {
809+
level.Warn(userLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_info", partitionedGroupInfoFile, "err", err)
810+
} else {
811+
level.Info(userLogger).Log("msg", "deleted partitioned group info", "partitioned_group_info", partitionedGroupInfoFile)
812+
isPartitionGroupInfoDeleted = true
813+
}
814+
}
808815
}
809816
}
810817

811-
if extraInfo.status.CanDelete || extraInfo.status.DeleteVisitMarker {
818+
if isPartitionGroupInfoDeleted && (extraInfo.status.CanDelete || extraInfo.status.DeleteVisitMarker) {
812819
// Remove partition visit markers
813820
if _, err := bucket.DeletePrefix(ctx, userBucket, GetPartitionVisitMarkerDirectoryPath(partitionedGroupInfo.PartitionedGroupID), userLogger, defaultDeleteBlocksConcurrency); err != nil {
814821
level.Warn(userLogger).Log("msg", "failed to delete partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile, "err", err)

pkg/compactor/blocks_cleaner_test.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -925,7 +925,24 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
925925
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})
926926

927927
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)
928-
928+
idx := &bucketindex.Index{
929+
Blocks: bucketindex.Blocks{
930+
{
931+
ID: block1,
932+
MinTime: startTime,
933+
MaxTime: endTime,
934+
Parquet: &parquet.ConverterMarkMeta{},
935+
},
936+
{
937+
ID: block2,
938+
MinTime: startTime,
939+
MaxTime: endTime,
940+
Parquet: &parquet.ConverterMarkMeta{},
941+
},
942+
},
943+
}
944+
err = bucketindex.WriteIndex(ctx, bucketClient, userID, nil, idx)
945+
require.NoError(t, err)
929946
userBucket := bucket.NewUserBucketClient(userID, bucketClient, cfgProvider)
930947

931948
partitionedGroupInfo := PartitionedGroupInfo{
@@ -955,11 +972,17 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
955972
err = visitMarkerManager.updateVisitMarker(ctx)
956973
require.NoError(t, err)
957974

958-
cleaner.cleanPartitionedGroupInfo(ctx, userBucket, logger, userID)
975+
// first cleaning cycle deletes only the blocks
976+
err = cleaner.cleanUser(ctx, logger, userBucket, userID, false)
977+
require.NoError(t, err)
978+
979+
idx, err = bucketindex.ReadIndex(ctx, bucketClient, userID, cfgProvider, logger)
980+
require.NoError(t, err)
981+
require.Equal(t, []ulid.ULID{block1}, idx.BlockDeletionMarks.GetULIDs())
959982

960983
partitionedGroupFileExists, err := userBucket.Exists(ctx, GetPartitionedGroupFile(partitionedGroupID))
961984
require.NoError(t, err)
962-
require.False(t, partitionedGroupFileExists)
985+
require.True(t, partitionedGroupFileExists)
963986

964987
block1DeletionMarkerExists, err := userBucket.Exists(ctx, path.Join(block1.String(), metadata.DeletionMarkFilename))
965988
require.NoError(t, err)
@@ -968,6 +991,14 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
968991
block2DeletionMarkerExists, err := userBucket.Exists(ctx, path.Join(block2.String(), metadata.DeletionMarkFilename))
969992
require.NoError(t, err)
970993
require.False(t, block2DeletionMarkerExists)
994+
995+
// second cleaning cycle deletes the partition group info after all blocks are deleted
996+
err = cleaner.cleanUser(ctx, logger, userBucket, userID, false)
997+
require.NoError(t, err)
998+
999+
partitionedGroupFileExists, err = userBucket.Exists(ctx, GetPartitionedGroupFile(partitionedGroupID))
1000+
require.NoError(t, err)
1001+
require.False(t, partitionedGroupFileExists)
9711002
}
9721003

9731004
func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {

pkg/compactor/partitioned_group_info.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ func (p *PartitionedGroupInfo) isBlockNoCompact(ctx context.Context, userBucket
234234
return noCompactMarkerExists
235235
}
236236

237-
func (p *PartitionedGroupInfo) markAllBlocksForDeletion(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, blocksMarkedForDeletion *prometheus.CounterVec, userID string) error {
237+
func (p *PartitionedGroupInfo) markAllBlocksForDeletion(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, blocksMarkedForDeletion *prometheus.CounterVec, userID string) (int, error) {
238238
blocks := p.getAllBlocks()
239239
deleteBlocksCount := 0
240240
defer func() {
@@ -244,13 +244,13 @@ func (p *PartitionedGroupInfo) markAllBlocksForDeletion(ctx context.Context, use
244244
if p.doesBlockExist(ctx, userBucket, userLogger, blockID) && !p.isBlockDeleted(ctx, userBucket, userLogger, blockID) && !p.isBlockNoCompact(ctx, userBucket, userLogger, blockID) {
245245
if err := block.MarkForDeletion(ctx, userLogger, userBucket, blockID, "delete block during partitioned group completion check", blocksMarkedForDeletion.WithLabelValues(userID, reasonValueRetention)); err != nil {
246246
level.Warn(userLogger).Log("msg", "unable to mark block for deletion", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String())
247-
return err
247+
return deleteBlocksCount, err
248248
}
249249
deleteBlocksCount++
250250
level.Debug(userLogger).Log("msg", "marked block for deletion during partitioned group info clean up", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String())
251251
}
252252
}
253-
return nil
253+
return deleteBlocksCount, nil
254254
}
255255

256256
func (p *PartitionedGroupInfo) String() string {

0 commit comments

Comments
 (0)