diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index fe1ad01d028..58a9360b304 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -7,7 +7,6 @@ import ( "hash/fnv" "math/rand" "os" - "path" "path/filepath" "strings" "time" @@ -832,7 +831,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { c.blocksGrouperFactory(currentCtx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks, c.remainingPlannedCompactions, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.ring, c.ringLifecycler, c.limits, userID, noCompactMarkerFilter), c.blocksPlannerFactory(currentCtx, bucket, ulogger, c.compactorCfg, noCompactMarkerFilter, c.ringLifecycler, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed), c.blocksCompactor, - path.Join(c.compactorCfg.DataDir, "compact"), + c.compactDirForUser(userID), bucket, c.compactorCfg.CompactionConcurrency, c.compactorCfg.SkipBlocksWithOutOfOrderChunksEnabled, @@ -845,6 +844,13 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { return errors.Wrap(err, "compaction") } + // Remove all files on the compact root dir + // We do this only if there is no error because potentially on the next run we would not have to download + // everything again. + if err := os.RemoveAll(c.compactRootDir()); err != nil { + level.Error(c.logger).Log("msg", "failed to remove compaction work directory", "path", c.compactRootDir(), "err", err) + } + return nil } @@ -941,6 +947,16 @@ func (c *Compactor) metaSyncDirForUser(userID string) string { return filepath.Join(c.compactorCfg.DataDir, compactorMetaPrefix+userID) } +// compactDirForUser returns the directory to be used to download and compact the blocks for a user +func (c *Compactor) compactDirForUser(userID string) string { + return filepath.Join(c.compactRootDir(), userID) +} + +// compactRootDir returns the root directory to be used to download and compact blocks +func (c *Compactor) compactRootDir() string { + return filepath.Join(c.compactorCfg.DataDir, "compact") +} + // This function returns tenants with meta sync directories found on local disk. On error, it returns nil map. func (c *Compactor) listTenantsWithMetaSyncDirectories() map[string]struct{} { result := map[string]struct{}{} diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 99c5753e064..3e880418f4f 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -539,6 +539,47 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant( )) } +func TestCompactor_ShouldCompactAndRemoveUserFolder(t *testing.T) { + bucketClient := &bucket.ClientMock{} + bucketClient.MockIter("", []string{"user-1"}, nil) + bucketClient.MockExists(path.Join("user-1", cortex_tsdb.TenantDeletionMarkPath), false, nil) + bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil) + bucketClient.MockIter("user-1/markers/", nil, nil) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", "", nil) + bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil) + bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json", mockBlockMetaJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil) + bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/deletion-mark.json", "", nil) + bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/visit-mark.json", "", nil) + bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil) + bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil) + bucketClient.MockIter("user-1/markers/", nil, nil) + bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) + bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil) + + c, _, tsdbPlanner, _, _ := prepare(t, prepareConfig(), bucketClient, nil) + + // Make sure the user folder is created and is being used + // This will be called during compaction + tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + _, err := os.Stat(c.compactDirForUser("user-1")) + require.NoError(t, err) + }).Return([]*metadata.Meta{}, nil) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) + + // Wait until a run has completed. + cortex_testutil.Poll(t, time.Second, 1.0, func() interface{} { + return prom_testutil.ToFloat64(c.compactionRunsCompleted) + }) + + _, err := os.Stat(c.compactDirForUser("user-1")) + require.True(t, os.IsNotExist(err)) +} + func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { t.Parallel()