Skip to content

Commit 85c58c8

Browse files
authored
do direct deletion instead of delayed deletion for tenant retention policy to clean backlogs (#121)
e2e tests are flaky
2 parents 0b0ec9a + 16132be commit 85c58c8

File tree

3 files changed

+20
-7
lines changed

3 files changed

+20
-7
lines changed

cmd/thanos/compact.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,12 @@ func runCompact(
463463

464464
compactMainFn := func() error {
465465
// this should happen before any compaction to remove unnecessary process on backlogs beyond retention.
466+
if len(retentionByTenant) != 0 && len(sy.Metas()) == 0 {
467+
level.Info(logger).Log("msg", "sync before tenant retention due to no blocks")
468+
if err := sy.SyncMetas(ctx); err != nil {
469+
return errors.Wrap(err, "sync before tenant retention")
470+
}
471+
}
466472
if err := compact.ApplyRetentionPolicyByTenant(ctx, logger, insBkt, sy.Metas(), retentionByTenant, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, metadata.TenantRetentionExpired)); err != nil {
467473
return errors.Wrap(err, "retention by tenant failed")
468474
}

pkg/compact/retention.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,20 +114,28 @@ func ApplyRetentionPolicyByTenant(
114114
level.Info(logger).Log("msg", "tenant retention is disabled due to no policy")
115115
return nil
116116
}
117-
level.Info(logger).Log("msg", "start tenant retention")
117+
level.Info(logger).Log("msg", "start tenant retention", "total", len(metas))
118+
deleted, skipped, notExpired := 0, 0, 0
118119
for id, m := range metas {
119120
policy, ok := retentionByTenant[m.Thanos.GetTenant()]
120121
if !ok {
122+
skipped++
121123
continue
122124
}
123125
maxTime := time.Unix(m.MaxTime/1000, 0)
124126
if policy.isExpired(maxTime) {
125127
level.Info(logger).Log("msg", "applying retention: marking block for deletion", "id", id, "maxTime", maxTime.String())
126-
if err := block.MarkForDeletion(ctx, logger, bkt, id, fmt.Sprintf("block exceeding retention of %v", policy), blocksMarkedForDeletion); err != nil {
127-
return errors.Wrap(err, "delete block")
128+
if err := block.Delete(ctx, logger, bkt, id); err != nil {
129+
level.Error(logger).Log("msg", "failed to delete block", "id", id, "err", err)
130+
continue // continue to next block to clean up backlogs
131+
} else {
132+
blocksMarkedForDeletion.Inc()
133+
deleted++
128134
}
135+
} else {
136+
notExpired++
129137
}
130138
}
131-
level.Info(logger).Log("msg", "tenant retention apply done")
139+
level.Info(logger).Log("msg", "tenant retention apply done", "deleted", deleted, "skipped", skipped, "notExpired", notExpired)
132140
return nil
133141
}

pkg/compact/retention_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,6 @@ func TestApplyRetentionPolicyByTenant(t *testing.T) {
525525
}
526526

527527
got := []string{}
528-
gotMarkedBlocksCount := 0.0
529528
testutil.Ok(t, bkt.Iter(context.TODO(), "", func(name string) error {
530529
exists, err := bkt.Exists(ctx, filepath.Join(name, metadata.DeletionMarkFilename))
531530
if err != nil {
@@ -535,12 +534,12 @@ func TestApplyRetentionPolicyByTenant(t *testing.T) {
535534
got = append(got, name)
536535
return nil
537536
}
538-
gotMarkedBlocksCount += 1.0
539537
return nil
540538
}))
539+
deleted := float64(len(tt.blocks) - len(got))
541540

542541
testutil.Equals(t, got, tt.want)
543-
testutil.Equals(t, gotMarkedBlocksCount, promtest.ToFloat64(blocksMarkedForDeletion))
542+
testutil.Equals(t, deleted, promtest.ToFloat64(blocksMarkedForDeletion))
544543
})
545544
}
546545
}

0 commit comments

Comments
 (0)