Skip to content

Commit 7179065

Browse files
committed
[ES-1365579] add visibility into compact progress
Signed-off-by: Yi Jin <[email protected]>
1 parent cdd22dc commit 7179065

File tree

4 files changed

+145
-20
lines changed

4 files changed

+145
-20
lines changed

cmd/thanos/compact.go

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ func runCompact(
176176
) (rerr error) {
177177
deleteDelay := time.Duration(conf.deleteDelay)
178178
compactMetrics := newCompactMetrics(reg, deleteDelay)
179+
progressRegistry := compact.NewProgressRegistry(reg, logger)
179180
downsampleMetrics := newDownsampleMetrics(reg)
180181

181182
httpProbe := prober.NewHTTP()
@@ -444,14 +445,16 @@ func runCompact(
444445

445446
var cleanMtx sync.Mutex
446447
// TODO(GiedriusS): we could also apply retention policies here but the logic would be a bit more complex.
447-
cleanPartialMarked := func() error {
448+
cleanPartialMarked := func(progress *compact.Progress) error {
448449
cleanMtx.Lock()
449450
defer cleanMtx.Unlock()
450-
451+
defer progress.Idle()
452+
progress.Set(compact.SyncMeta)
451453
if err := sy.SyncMetas(ctx); err != nil {
452454
return errors.Wrap(err, "syncing metas")
453455
}
454456

457+
progress.Set(compact.CleanBlocks)
455458
compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), insBkt, compactMetrics.partialUploadDeleteAttempts, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures)
456459
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
457460
return errors.Wrap(err, "cleaning marked blocks")
@@ -461,19 +464,23 @@ func runCompact(
461464
return nil
462465
}
463466

464-
compactMainFn := func() error {
467+
compactMainFn := func(progress *compact.Progress) error {
468+
defer progress.Idle()
465469
// this should happen before any compaction to remove unnecessary process on backlogs beyond retention.
466470
if len(retentionByTenant) != 0 && len(sy.Metas()) == 0 {
467471
level.Info(logger).Log("msg", "sync before tenant retention due to no blocks")
472+
progress.Set(compact.SyncMeta)
468473
if err := sy.SyncMetas(ctx); err != nil {
469474
return errors.Wrap(err, "sync before tenant retention")
470475
}
471476
}
477+
478+
progress.Set(compact.ApplyRetention)
472479
if err := compact.ApplyRetentionPolicyByTenant(ctx, logger, insBkt, sy.Metas(), retentionByTenant, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, metadata.TenantRetentionExpired)); err != nil {
473480
return errors.Wrap(err, "retention by tenant failed")
474481
}
475482

476-
if err := compactor.Compact(ctx); err != nil {
483+
if err := compactor.Compact(ctx, progress); err != nil {
477484
return errors.Wrap(err, "whole compaction error")
478485
}
479486

@@ -482,10 +489,11 @@ func runCompact(
482489
// We run two passes of this to ensure that the 1h downsampling is generated
483490
// for 5m downsamplings created in the first run.
484491
level.Info(logger).Log("msg", "start first pass of downsampling")
492+
progress.Set(compact.SyncMeta)
485493
if err := sy.SyncMetas(ctx); err != nil {
486494
return errors.Wrap(err, "sync before first pass of downsampling")
487495
}
488-
496+
progress.Set(compact.DownSampling)
489497
filteredMetas := sy.Metas()
490498
noDownsampleBlocks := noDownsampleMarkerFilter.NoDownsampleMarkedBlocks()
491499
for ul := range noDownsampleBlocks {
@@ -514,6 +522,7 @@ func runCompact(
514522
}
515523

516524
level.Info(logger).Log("msg", "start second pass of downsampling")
525+
progress.Set(compact.SyncMeta)
517526
if err := sy.SyncMetas(ctx); err != nil {
518527
return errors.Wrap(err, "sync before second pass of downsampling")
519528
}
@@ -547,27 +556,29 @@ func runCompact(
547556
}
548557

549558
// TODO(bwplotka): Find a way to avoid syncing if no op was done.
559+
progress.Set(compact.SyncMeta)
550560
if err := sy.SyncMetas(ctx); err != nil {
551561
return errors.Wrap(err, "sync before retention")
552562
}
553563

564+
progress.Set(compact.ApplyRetention)
554565
if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, insBkt, sy.Metas(), retentionByResolution, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, "")); err != nil {
555566
return errors.Wrap(err, "retention failed")
556567
}
557568

558-
return cleanPartialMarked()
569+
return cleanPartialMarked(progress)
559570
}
560571

561572
g.Add(func() error {
562573
defer runutil.CloseWithLogOnErr(logger, insBkt, "bucket client")
563574

564575
if !conf.wait {
565-
return compactMainFn()
576+
return compactMainFn(progressRegistry.Get(compact.Main))
566577
}
567578

568579
// --wait=true is specified.
569580
return runutil.Repeat(conf.waitInterval, ctx.Done(), func() error {
570-
err := compactMainFn()
581+
err := compactMainFn(progressRegistry.Get(compact.Main))
571582
if err == nil {
572583
compactMetrics.iterations.Inc()
573584
return nil
@@ -633,9 +644,11 @@ func runCompact(
633644
// For /global state make sure to fetch periodically.
634645
return runutil.Repeat(conf.blockViewerSyncBlockInterval, ctx.Done(), func() error {
635646
return runutil.RetryWithLog(logger, time.Minute, ctx.Done(), func() error {
647+
progress := progressRegistry.Get(compact.Web)
648+
defer progress.Idle()
636649
iterCtx, iterCancel := context.WithTimeout(ctx, conf.blockViewerSyncBlockTimeout)
637650
defer iterCancel()
638-
651+
progress.Set(compact.SyncMeta)
639652
_, _, err := f.Fetch(iterCtx)
640653
return err
641654
})
@@ -650,7 +663,7 @@ func runCompact(
650663
if conf.cleanupBlocksInterval > 0 {
651664
g.Add(func() error {
652665
return runutil.Repeat(conf.cleanupBlocksInterval, ctx.Done(), func() error {
653-
err := cleanPartialMarked()
666+
err := cleanPartialMarked(progressRegistry.Get(compact.Cleanup))
654667
if err != nil && compact.IsRetryError(err) {
655668
// The RetryError signals that we hit an retriable error (transient error, no connection).
656669
// You should alert on this being triggered too frequently.
@@ -678,7 +691,9 @@ func runCompact(
678691
}
679692

680693
return runutil.Repeat(conf.progressCalculateInterval, ctx.Done(), func() error {
681-
694+
progress := progressRegistry.Get(compact.Calculate)
695+
defer progress.Idle()
696+
progress.Set(compact.SyncMeta)
682697
if err := sy.SyncMetas(ctx); err != nil {
683698
// The RetryError signals that we hit an retriable error (transient error, no connection).
684699
// You should alert on this being triggered too frequently.
@@ -693,29 +708,34 @@ func runCompact(
693708
}
694709

695710
metas := sy.Metas()
711+
progress.Set(compact.Grouping)
696712
groups, err := grouper.Groups(metas)
697713
if err != nil {
698714
return errors.Wrapf(err, "could not group metadata for compaction")
699715
}
700-
716+
progress.Set(compact.CalculateProgress)
701717
if err = ps.ProgressCalculate(ctx, groups); err != nil {
702718
return errors.Wrapf(err, "could not calculate compaction progress")
703719
}
704720

721+
progress.Set(compact.Grouping)
705722
retGroups, err := grouper.Groups(metas)
706723
if err != nil {
707724
return errors.Wrapf(err, "could not group metadata for retention")
708725
}
709726

727+
progress.Set(compact.CalculateProgress)
710728
if err = rs.ProgressCalculate(ctx, retGroups); err != nil {
711729
return errors.Wrapf(err, "could not calculate retention progress")
712730
}
713731

714732
if !conf.disableDownsampling {
733+
progress.Set(compact.Grouping)
715734
groups, err = grouper.Groups(metas)
716735
if err != nil {
717736
return errors.Wrapf(err, "could not group metadata into downsample groups")
718737
}
738+
progress.Set(compact.CalculateProgress)
719739
if err := ds.ProgressCalculate(ctx, groups); err != nil {
720740
return errors.Wrapf(err, "could not calculate downsampling progress")
721741
}

pkg/compact/compact.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ type SyncerMetrics struct {
7575
GarbageCollectionFailures prometheus.Counter
7676
GarbageCollectionDuration prometheus.Observer
7777
BlocksMarkedForDeletion prometheus.Counter
78+
SyncMetas prometheus.Counter
79+
SyncMetasDuration prometheus.Observer
7880
}
7981

8082
func NewSyncerMetrics(reg prometheus.Registerer, blocksMarkedForDeletion, garbageCollectedBlocks prometheus.Counter) *SyncerMetrics {
@@ -96,10 +98,22 @@ func NewSyncerMetrics(reg prometheus.Registerer, blocksMarkedForDeletion, garbag
9698
})
9799

98100
m.BlocksMarkedForDeletion = blocksMarkedForDeletion
99-
101+
m.SyncMetas = promauto.With(reg).NewCounter(prometheus.CounterOpts{
102+
Name: "thanos_compact_sync_metas_total",
103+
Help: "Total number of synced block metas.",
104+
})
105+
m.SyncMetasDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
106+
Name: "thanos_compact_sync_metas_duration_seconds",
107+
Help: "Time it took to sync block metas.",
108+
Buckets: []float64{10, 60, 180, 300, 450, 600, 1200, 1800, 3600, 7200, 10800, 21600},
109+
})
100110
return &m
101111
}
102112

113+
func measureFuncDuration(duration prometheus.Observer, beginTime time.Time) {
114+
duration.Observe(time.Since(beginTime).Seconds())
115+
}
116+
103117
// NewMetaSyncer returns a new Syncer for the given Bucket and directory.
104118
// Blocks must be at least as old as the sync delay for being considered.
105119
func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks prometheus.Counter, syncMetasTimeout time.Duration) (*Syncer, error) {
@@ -147,6 +161,8 @@ func UntilNextDownsampling(m *metadata.Meta) (time.Duration, error) {
147161

148162
// SyncMetas synchronizes local state of block metas with what we have in the bucket.
149163
func (s *Syncer) SyncMetas(ctx context.Context) error {
164+
s.metrics.SyncMetas.Inc()
165+
defer measureFuncDuration(s.metrics.SyncMetasDuration, time.Now())
150166
var cancel func() = func() {}
151167
if s.syncMetasTimeout > 0 {
152168
ctx, cancel = context.WithTimeout(ctx, s.syncMetasTimeout)
@@ -197,7 +213,8 @@ func (s *Syncer) Metas() map[ulid.ULID]*metadata.Meta {
197213
// block with a higher compaction level.
198214
// Call to SyncMetas function is required to populate duplicateIDs in duplicateBlocksFilter.
199215
func (s *Syncer) GarbageCollect(ctx context.Context) error {
200-
begin := time.Now()
216+
s.metrics.GarbageCollections.Inc()
217+
defer measureFuncDuration(s.metrics.GarbageCollectionDuration, time.Now())
201218

202219
// Ignore filter exists before deduplicate filter.
203220
deletionMarkMap := s.ignoreDeletionMarkFilter.DeletionMarkBlocks()
@@ -236,8 +253,6 @@ func (s *Syncer) GarbageCollect(ctx context.Context) error {
236253
s.mtx.Unlock()
237254
s.metrics.GarbageCollectedBlocks.Inc()
238255
}
239-
s.metrics.GarbageCollections.Inc()
240-
s.metrics.GarbageCollectionDuration.Observe(time.Since(begin).Seconds())
241256
return nil
242257
}
243258

@@ -1443,7 +1458,7 @@ func NewBucketCompactorWithCheckerAndCallback(
14431458
}
14441459

14451460
// Compact runs compaction over bucket.
1446-
func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {
1461+
func (c *BucketCompactor) Compact(ctx context.Context, progress *Progress) (rerr error) {
14471462
if c.concurrency == 0 {
14481463
level.Warn(c.logger).Log("msg", "compactor is disabled")
14491464
return nil
@@ -1462,6 +1477,7 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {
14621477

14631478
// Loop over bucket and compact until there's no work left.
14641479
for {
1480+
SetProgress(progress, Initializing)
14651481
var (
14661482
wg sync.WaitGroup
14671483
workCtx, workCtxCancel = context.WithCancel(ctx)
@@ -1521,17 +1537,20 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {
15211537
}
15221538

15231539
level.Info(c.logger).Log("msg", "start sync of metas")
1540+
SetProgress(progress, SyncMeta)
15241541
if err := c.sy.SyncMetas(ctx); err != nil {
15251542
return errors.Wrap(err, "sync")
15261543
}
15271544

15281545
level.Info(c.logger).Log("msg", "start of GC")
1546+
SetProgress(progress, GarbageCollect)
15291547
// Blocks that were compacted are garbage collected after each Compaction.
15301548
// However if compactor crashes we need to resolve those on startup.
15311549
if err := c.sy.GarbageCollect(ctx); err != nil {
15321550
return errors.Wrap(err, "garbage")
15331551
}
15341552

1553+
SetProgress(progress, Grouping)
15351554
groups, err := c.grouper.Groups(c.sy.Metas())
15361555
if err != nil {
15371556
return errors.Wrap(err, "build compaction groups")
@@ -1544,12 +1563,13 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {
15441563
}
15451564
}
15461565

1566+
SetProgress(progress, CleanBlocks)
15471567
if err := runutil.DeleteAll(c.compactDir, ignoreDirs...); err != nil {
15481568
level.Warn(c.logger).Log("msg", "failed deleting non-compaction group directories/files, some disk space usage might have leaked. Continuing", "err", err, "dir", c.compactDir)
15491569
}
15501570

15511571
level.Info(c.logger).Log("msg", "start of compactions")
1552-
1572+
SetProgress(progress, Compacting)
15531573
// Send all groups found during this pass to the compaction workers.
15541574
var groupErrs errutil.MultiError
15551575
groupLoop:

pkg/compact/compact_e2e_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
221221
testutil.Ok(t, err)
222222

223223
// Compaction on empty should not fail.
224-
testutil.Ok(t, bComp.Compact(ctx))
224+
testutil.Ok(t, bComp.Compact(ctx, nil))
225225
testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.GarbageCollectedBlocks))
226226
testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.BlocksMarkedForDeletion))
227227
testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.GarbageCollectionFailures))
@@ -314,7 +314,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
314314
groupKey1 := metas[0].Thanos.GroupKey()
315315
groupKey2 := metas[6].Thanos.GroupKey()
316316

317-
testutil.Ok(t, bComp.Compact(ctx))
317+
testutil.Ok(t, bComp.Compact(ctx, nil))
318318
testutil.Equals(t, 2.0, promtest.ToFloat64(sy.metrics.GarbageCollectedBlocks))
319319
testutil.Equals(t, 2.0, promtest.ToFloat64(sy.metrics.BlocksMarkedForDeletion))
320320
testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.GarbageCollectionFailures))

pkg/compact/progress.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Copyright (c) The Thanos Authors.
2+
// Licensed under the Apache License 2.0.
3+
4+
package compact
5+
6+
import (
7+
"github.com/go-kit/log"
8+
"github.com/go-kit/log/level"
9+
"github.com/prometheus/client_golang/prometheus"
10+
"github.com/prometheus/client_golang/prometheus/promauto"
11+
)
12+
13+
type CompactorState int64
14+
15+
// Use a gauge to track the state of a compactor pod.
16+
// DO not inject new state in the middle of the enum otherwise it will break the old state syntax.
17+
const (
18+
Initializing CompactorState = iota
19+
Idle
20+
SyncMeta
21+
ApplyRetention
22+
Compacting
23+
GarbageCollect
24+
DownSampling
25+
CleanBlocks
26+
Grouping
27+
CalculateProgress
28+
)
29+
30+
// Register all the compactor important threads here.
31+
const (
32+
Main = "main" // Main compaction goroutine
33+
Web = "web" // Web server goroutine
34+
Cleanup = "cleanup" // Cleanup goroutine
35+
Calculate = "Calculate" // Calculate calculation compactor progress goroutine
36+
)
37+
38+
type Progress struct {
39+
state prometheus.Gauge
40+
logger log.Logger
41+
}
42+
43+
func (p *Progress) Set(state CompactorState) {
44+
level.Info(p.logger).Log("msg", "Setting compactor progress state", "state", state)
45+
p.state.Set(float64(state))
46+
}
47+
48+
func (p *Progress) Idle() {
49+
p.Set(Idle)
50+
}
51+
52+
type ProgressRegistry struct {
53+
*prometheus.GaugeVec
54+
logger log.Logger
55+
}
56+
57+
func NewProgressRegistry(reg *prometheus.Registry, logger log.Logger) *ProgressRegistry {
58+
registry := promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
59+
Name: "thanos_compactor_progress_state",
60+
Help: "The compaction progress state of the compactor.",
61+
}, []string{"thread"})
62+
level.Info(logger).Log("msg", "Registering compactor progress state gauge")
63+
registry.WithLabelValues(Main).Set(float64(Initializing))
64+
registry.WithLabelValues(Web).Set(float64(Initializing))
65+
registry.WithLabelValues(Cleanup).Set(float64(Initializing))
66+
return &ProgressRegistry{
67+
registry,
68+
logger,
69+
}
70+
}
71+
72+
func (pr *ProgressRegistry) Get(thread string) *Progress {
73+
return &Progress{
74+
state: pr.WithLabelValues(thread),
75+
logger: pr.logger,
76+
}
77+
}
78+
79+
// SetProgress is a helper function to set the progress state of a compactor and can be no-op if the progress is nil.
80+
func SetProgress(progress *Progress, state CompactorState) {
81+
if progress == nil {
82+
return
83+
}
84+
progress.Set(state)
85+
}

0 commit comments

Comments
 (0)