Skip to content

Commit fc6c40d

Browse files
authored
Cleaner should remove debug and partitioned-groups directories if bucket store is empty for the tenant (#6604)
Signed-off-by: Alex Le <[email protected]>
1 parent cfca43b commit fc6c40d

File tree

2 files changed

+50
-10
lines changed

2 files changed

+50
-10
lines changed

pkg/compactor/blocks_cleaner.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,23 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog
475475
}
476476
level.Info(userLogger).Log("msg", "cleaning up remaining blocks data for tenant marked for deletion")
477477
// Let's do final cleanup of tenant.
478+
err = c.deleteNonDataFiles(ctx, userLogger, userBucket)
479+
if err != nil {
480+
return err
481+
}
482+
483+
if deleted, err := bucket.DeletePrefix(ctx, userBucket, bucketindex.MarkersPathname, userLogger); err != nil {
484+
return errors.Wrap(err, "failed to delete marker files")
485+
} else if deleted > 0 {
486+
level.Info(userLogger).Log("msg", "deleted marker files for tenant marked for deletion", "count", deleted)
487+
}
488+
if err := cortex_tsdb.DeleteTenantDeletionMark(ctx, c.bucketClient, userID); err != nil {
489+
return errors.Wrap(err, "failed to delete tenant deletion mark")
490+
}
491+
return nil
492+
}
493+
494+
func (c *BlocksCleaner) deleteNonDataFiles(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) error {
478495
if deleted, err := bucket.DeletePrefix(ctx, userBucket, block.DebugMetas, userLogger); err != nil {
479496
return errors.Wrap(err, "failed to delete "+block.DebugMetas)
480497
} else if deleted > 0 {
@@ -489,15 +506,6 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog
489506
level.Info(userLogger).Log("msg", "deleted files under "+PartitionedGroupDirectory+" for tenant marked for deletion", "count", deleted)
490507
}
491508
}
492-
493-
if deleted, err := bucket.DeletePrefix(ctx, userBucket, bucketindex.MarkersPathname, userLogger); err != nil {
494-
return errors.Wrap(err, "failed to delete marker files")
495-
} else if deleted > 0 {
496-
level.Info(userLogger).Log("msg", "deleted marker files for tenant marked for deletion", "count", deleted)
497-
}
498-
if err := cortex_tsdb.DeleteTenantDeletionMark(ctx, c.bucketClient, userID); err != nil {
499-
return errors.Wrap(err, "failed to delete tenant deletion mark")
500-
}
501509
return nil
502510
}
503511

@@ -547,6 +555,9 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
547555
if err := bucketindex.DeleteIndexSyncStatus(ctx, c.bucketClient, userID); err != nil {
548556
level.Warn(userLogger).Log("msg", "error deleting index sync status when index is empty", "err", err)
549557
}
558+
if err := c.deleteNonDataFiles(ctx, userLogger, userBucket); err != nil {
559+
level.Warn(userLogger).Log("msg", "error deleting non-data files", "err", err)
560+
}
550561
} else {
551562
bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, idxs, userLogger)
552563
}

pkg/compactor/blocks_cleaner_test.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -912,6 +912,8 @@ func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {
912912
DeletionDelay: time.Hour,
913913
CleanupInterval: time.Minute,
914914
CleanupConcurrency: 1,
915+
ShardingStrategy: util.ShardingStrategyShuffle,
916+
CompactionStrategy: util.CompactionStrategyPartitioning,
915917
}
916918

917919
ctx := context.Background()
@@ -929,14 +931,41 @@ func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {
929931

930932
userBucket := bucket.NewUserBucketClient(userID, bucketClient, cfgProvider)
931933

932-
err := cleaner.cleanUser(ctx, logger, userBucket, userID, false)
934+
debugMetaFile := path.Join(block.DebugMetas, "meta.json")
935+
require.NoError(t, userBucket.Upload(context.Background(), debugMetaFile, strings.NewReader("some random content here")))
936+
937+
partitionedGroupInfo := PartitionedGroupInfo{
938+
PartitionedGroupID: 1234,
939+
PartitionCount: 1,
940+
Partitions: []Partition{
941+
{
942+
PartitionID: 0,
943+
Blocks: []ulid.ULID{},
944+
},
945+
},
946+
RangeStart: 0,
947+
RangeEnd: 2,
948+
CreationTime: time.Now().Add(-5 * time.Minute).Unix(),
949+
Version: PartitionedGroupInfoVersion1,
950+
}
951+
_, err := UpdatePartitionedGroupInfo(ctx, userBucket, logger, partitionedGroupInfo)
952+
require.NoError(t, err)
953+
partitionedGroupFile := GetPartitionedGroupFile(partitionedGroupInfo.PartitionedGroupID)
954+
955+
err = cleaner.cleanUser(ctx, logger, userBucket, userID, false)
933956
require.NoError(t, err)
934957

935958
_, err = bucketindex.ReadIndex(ctx, bucketClient, userID, cfgProvider, logger)
936959
require.ErrorIs(t, err, bucketindex.ErrIndexNotFound)
937960

938961
_, err = userBucket.WithExpectedErrs(userBucket.IsObjNotFoundErr).Get(ctx, bucketindex.SyncStatusFile)
939962
require.True(t, userBucket.IsObjNotFoundErr(err))
963+
964+
_, err = userBucket.WithExpectedErrs(userBucket.IsObjNotFoundErr).Get(ctx, debugMetaFile)
965+
require.True(t, userBucket.IsObjNotFoundErr(err))
966+
967+
_, err = userBucket.WithExpectedErrs(userBucket.IsObjNotFoundErr).Get(ctx, partitionedGroupFile)
968+
require.True(t, userBucket.IsObjNotFoundErr(err))
940969
}
941970

942971
type mockConfigProvider struct {

0 commit comments

Comments
 (0)