Skip to content

Commit 4f3085e

Browse files
authored
Merge recent commits from /db_main to /release (#138)
2 parents 4313121 + 2d14106 commit 4f3085e

24 files changed

+862
-81
lines changed

.github/workflows/docs.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@ jobs:
2525
with:
2626
go-version: 1.23.x
2727

28-
- uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2
28+
- uses: actions/cache@0c907a75c2c80ebcb7f088228285e798b750cf8f # v4.2.1
2929
with:
3030
path: ~/go/pkg/mod
3131
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
3232
restore-keys: |
3333
${{ runner.os }}-go-
3434
35-
- uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2
35+
- uses: actions/cache@0c907a75c2c80ebcb7f088228285e798b750cf8f # v4.2.1
3636
with:
3737
path: .mdoxcache
3838
key: ${{ runner.os }}-mdox-${{ hashFiles('docs/**/*.md', 'examples/**/*.md', 'mixin/**/*.md', '*.md') }}

.github/workflows/go.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ jobs:
5757
with:
5858
go-version: 1.23.x
5959

60-
- uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2
60+
- uses: actions/cache@0c907a75c2c80ebcb7f088228285e798b750cf8f # v4.2.1
6161
with:
6262
path: |
6363
~/.cache/go-build
@@ -84,7 +84,7 @@ jobs:
8484
with:
8585
go-version: 1.23.x
8686

87-
- uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2
87+
- uses: actions/cache@0c907a75c2c80ebcb7f088228285e798b750cf8f # v4.2.1
8888
with:
8989
path: |
9090
~/.cache/go-build
@@ -111,7 +111,7 @@ jobs:
111111
with:
112112
go-version: 1.23.x
113113

114-
- uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2
114+
- uses: actions/cache@0c907a75c2c80ebcb7f088228285e798b750cf8f # v4.2.1
115115
with:
116116
path: |
117117
~/.cache/go-build
@@ -160,7 +160,7 @@ jobs:
160160
with:
161161
go-version: 1.23.x
162162

163-
- uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2
163+
- uses: actions/cache@0c907a75c2c80ebcb7f088228285e798b750cf8f # v4.2.1
164164
with:
165165
path: |
166166
~/.cache/go-build

.github/workflows/react.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ jobs:
2525
with:
2626
node-version: ${{ matrix.node }}
2727

28-
- uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2
28+
- uses: actions/cache@0c907a75c2c80ebcb7f088228285e798b750cf8f # v4.2.1
2929
with:
3030
path: ~/.npm
3131
key: ${{ runner.os }}-node-${{ hashFiles('**/package-lock.json') }}

cmd/thanos/compact.go

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ func runCompact(
176176
) (rerr error) {
177177
deleteDelay := time.Duration(conf.deleteDelay)
178178
compactMetrics := newCompactMetrics(reg, deleteDelay)
179+
progressRegistry := compact.NewProgressRegistry(reg, logger)
179180
downsampleMetrics := newDownsampleMetrics(reg)
180181

181182
httpProbe := prober.NewHTTP()
@@ -325,6 +326,7 @@ func runCompact(
325326

326327
ctx, cancel := context.WithCancel(context.Background())
327328
ctx = tracing.ContextWithTracer(ctx, tracer)
329+
ctx = objstoretracing.ContextWithTracer(ctx, tracer) // objstore tracing uses a different tracer key in context.
328330

329331
defer func() {
330332
if rerr != nil {
@@ -444,14 +446,16 @@ func runCompact(
444446

445447
var cleanMtx sync.Mutex
446448
// TODO(GiedriusS): we could also apply retention policies here but the logic would be a bit more complex.
447-
cleanPartialMarked := func() error {
449+
cleanPartialMarked := func(progress *compact.Progress) error {
448450
cleanMtx.Lock()
449451
defer cleanMtx.Unlock()
450-
452+
defer progress.Idle()
453+
progress.Set(compact.SyncMeta)
451454
if err := sy.SyncMetas(ctx); err != nil {
452455
return errors.Wrap(err, "syncing metas")
453456
}
454457

458+
progress.Set(compact.CleanBlocks)
455459
compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), insBkt, compactMetrics.partialUploadDeleteAttempts, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures)
456460
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
457461
return errors.Wrap(err, "cleaning marked blocks")
@@ -461,19 +465,23 @@ func runCompact(
461465
return nil
462466
}
463467

464-
compactMainFn := func() error {
468+
compactMainFn := func(progress *compact.Progress) error {
469+
defer progress.Idle()
465470
// this should happen before any compaction to remove unnecessary process on backlogs beyond retention.
466471
if len(retentionByTenant) != 0 && len(sy.Metas()) == 0 {
467472
level.Info(logger).Log("msg", "sync before tenant retention due to no blocks")
473+
progress.Set(compact.SyncMeta)
468474
if err := sy.SyncMetas(ctx); err != nil {
469475
return errors.Wrap(err, "sync before tenant retention")
470476
}
471477
}
478+
479+
progress.Set(compact.ApplyRetention)
472480
if err := compact.ApplyRetentionPolicyByTenant(ctx, logger, insBkt, sy.Metas(), retentionByTenant, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, metadata.TenantRetentionExpired)); err != nil {
473481
return errors.Wrap(err, "retention by tenant failed")
474482
}
475483

476-
if err := compactor.Compact(ctx); err != nil {
484+
if err := compactor.Compact(ctx, progress); err != nil {
477485
return errors.Wrap(err, "whole compaction error")
478486
}
479487

@@ -482,10 +490,11 @@ func runCompact(
482490
// We run two passes of this to ensure that the 1h downsampling is generated
483491
// for 5m downsamplings created in the first run.
484492
level.Info(logger).Log("msg", "start first pass of downsampling")
493+
progress.Set(compact.SyncMeta)
485494
if err := sy.SyncMetas(ctx); err != nil {
486495
return errors.Wrap(err, "sync before first pass of downsampling")
487496
}
488-
497+
progress.Set(compact.DownSampling)
489498
filteredMetas := sy.Metas()
490499
noDownsampleBlocks := noDownsampleMarkerFilter.NoDownsampleMarkedBlocks()
491500
for ul := range noDownsampleBlocks {
@@ -514,6 +523,7 @@ func runCompact(
514523
}
515524

516525
level.Info(logger).Log("msg", "start second pass of downsampling")
526+
progress.Set(compact.SyncMeta)
517527
if err := sy.SyncMetas(ctx); err != nil {
518528
return errors.Wrap(err, "sync before second pass of downsampling")
519529
}
@@ -547,27 +557,29 @@ func runCompact(
547557
}
548558

549559
// TODO(bwplotka): Find a way to avoid syncing if no op was done.
560+
progress.Set(compact.SyncMeta)
550561
if err := sy.SyncMetas(ctx); err != nil {
551562
return errors.Wrap(err, "sync before retention")
552563
}
553564

565+
progress.Set(compact.ApplyRetention)
554566
if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, insBkt, sy.Metas(), retentionByResolution, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, "")); err != nil {
555567
return errors.Wrap(err, "retention failed")
556568
}
557569

558-
return cleanPartialMarked()
570+
return cleanPartialMarked(progress)
559571
}
560572

561573
g.Add(func() error {
562574
defer runutil.CloseWithLogOnErr(logger, insBkt, "bucket client")
563575

564576
if !conf.wait {
565-
return compactMainFn()
577+
return compactMainFn(progressRegistry.Get(compact.Main))
566578
}
567579

568580
// --wait=true is specified.
569581
return runutil.Repeat(conf.waitInterval, ctx.Done(), func() error {
570-
err := compactMainFn()
582+
err := compactMainFn(progressRegistry.Get(compact.Main))
571583
if err == nil {
572584
compactMetrics.iterations.Inc()
573585
return nil
@@ -633,9 +645,11 @@ func runCompact(
633645
// For /global state make sure to fetch periodically.
634646
return runutil.Repeat(conf.blockViewerSyncBlockInterval, ctx.Done(), func() error {
635647
return runutil.RetryWithLog(logger, time.Minute, ctx.Done(), func() error {
648+
progress := progressRegistry.Get(compact.Web)
649+
defer progress.Idle()
636650
iterCtx, iterCancel := context.WithTimeout(ctx, conf.blockViewerSyncBlockTimeout)
637651
defer iterCancel()
638-
652+
progress.Set(compact.SyncMeta)
639653
_, _, err := f.Fetch(iterCtx)
640654
return err
641655
})
@@ -650,7 +664,7 @@ func runCompact(
650664
if conf.cleanupBlocksInterval > 0 {
651665
g.Add(func() error {
652666
return runutil.Repeat(conf.cleanupBlocksInterval, ctx.Done(), func() error {
653-
err := cleanPartialMarked()
667+
err := cleanPartialMarked(progressRegistry.Get(compact.Cleanup))
654668
if err != nil && compact.IsRetryError(err) {
655669
// The RetryError signals that we hit an retriable error (transient error, no connection).
656670
// You should alert on this being triggered too frequently.
@@ -678,7 +692,9 @@ func runCompact(
678692
}
679693

680694
return runutil.Repeat(conf.progressCalculateInterval, ctx.Done(), func() error {
681-
695+
progress := progressRegistry.Get(compact.Calculate)
696+
defer progress.Idle()
697+
progress.Set(compact.SyncMeta)
682698
if err := sy.SyncMetas(ctx); err != nil {
683699
// The RetryError signals that we hit an retriable error (transient error, no connection).
684700
// You should alert on this being triggered too frequently.
@@ -693,29 +709,34 @@ func runCompact(
693709
}
694710

695711
metas := sy.Metas()
712+
progress.Set(compact.Grouping)
696713
groups, err := grouper.Groups(metas)
697714
if err != nil {
698715
return errors.Wrapf(err, "could not group metadata for compaction")
699716
}
700-
717+
progress.Set(compact.CalculateProgress)
701718
if err = ps.ProgressCalculate(ctx, groups); err != nil {
702719
return errors.Wrapf(err, "could not calculate compaction progress")
703720
}
704721

722+
progress.Set(compact.Grouping)
705723
retGroups, err := grouper.Groups(metas)
706724
if err != nil {
707725
return errors.Wrapf(err, "could not group metadata for retention")
708726
}
709727

728+
progress.Set(compact.CalculateProgress)
710729
if err = rs.ProgressCalculate(ctx, retGroups); err != nil {
711730
return errors.Wrapf(err, "could not calculate retention progress")
712731
}
713732

714733
if !conf.disableDownsampling {
734+
progress.Set(compact.Grouping)
715735
groups, err = grouper.Groups(metas)
716736
if err != nil {
717737
return errors.Wrapf(err, "could not group metadata into downsample groups")
718738
}
739+
progress.Set(compact.CalculateProgress)
719740
if err := ds.ProgressCalculate(ctx, groups); err != nil {
720741
return errors.Wrapf(err, "could not calculate downsampling progress")
721742
}

cmd/thanos/query.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,8 @@ func registerQuery(app *extkingpin.App) {
245245
enforceTenancy := cmd.Flag("query.enforce-tenancy", "Enforce tenancy on Query APIs. Responses are returned only if the label value of the configured tenant-label-name and the value of the tenant header matches.").Default("false").Bool()
246246
tenantLabel := cmd.Flag("query.tenant-label-name", "Label name to use when enforcing tenancy (if --query.enforce-tenancy is enabled).").Default(tenancy.DefaultTenantLabel).String()
247247

248+
rewriteAggregationLabelTo := cmd.Flag("query.aggregation-label-value-override", "The value override for __rollup__ label for aggregated metrics. If set to x, all queries on aggregated metrics will have a __rollup__=x matcher. Leave empty to disable this behavior. Default is empty.").Default("").String()
249+
248250
var storeRateLimits store.SeriesSelectLimits
249251
storeRateLimits.RegisterFlags(cmd)
250252

@@ -384,6 +386,7 @@ func registerQuery(app *extkingpin.App) {
384386
*enforceTenancy,
385387
*tenantLabel,
386388
*enableGroupReplicaPartialStrategy,
389+
*rewriteAggregationLabelTo,
387390
)
388391
})
389392
}
@@ -468,6 +471,7 @@ func runQuery(
468471
enforceTenancy bool,
469472
tenantLabel string,
470473
groupReplicaPartialResponseStrategy bool,
474+
rewriteAggregationLabelTo string,
471475
) error {
472476
comp := component.Query
473477
if alertQueryURL == "" {
@@ -597,6 +601,7 @@ func runQuery(
597601
opts := query.Options{
598602
GroupReplicaPartialResponseStrategy: groupReplicaPartialResponseStrategy,
599603
DeduplicationFunc: queryDeduplicationFunc,
604+
RewriteAggregationLabelTo: rewriteAggregationLabelTo,
600605
}
601606
level.Info(logger).Log("msg", "databricks querier features", "opts", fmt.Sprintf("%+v", opts))
602607
queryableCreator = query.NewQueryableCreatorWithOptions(

cmd/thanos/receive.go

Lines changed: 44 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,10 @@ import (
2727
"github.com/prometheus/client_golang/prometheus/promauto"
2828
"github.com/prometheus/common/model"
2929
"github.com/prometheus/prometheus/model/labels"
30-
"github.com/prometheus/prometheus/model/relabel"
3130
"github.com/prometheus/prometheus/tsdb"
3231
"github.com/prometheus/prometheus/tsdb/wlog"
3332
"github.com/thanos-io/thanos/pkg/store/storepb"
3433
"google.golang.org/grpc"
35-
"gopkg.in/yaml.v2"
3634

3735
"github.com/thanos-io/objstore"
3836
"github.com/thanos-io/objstore/client"
@@ -231,14 +229,11 @@ func runReceive(
231229
return errors.Wrapf(err, "migrate legacy storage in %v to default tenant %v", conf.dataDir, conf.defaultTenantID)
232230
}
233231

234-
relabelContentYaml, err := conf.relabelConfigPath.Content()
232+
relabeller, err := receive.NewRelabeller(conf.relabelConfigPath, reg, logger, conf.relabelConfigReloadTimer)
233+
235234
if err != nil {
236235
return errors.Wrap(err, "get content of relabel configuration")
237236
}
238-
var relabelConfig []*relabel.Config
239-
if err := yaml.Unmarshal(relabelContentYaml, &relabelConfig); err != nil {
240-
return errors.Wrap(err, "parse relabel configuration")
241-
}
242237

243238
dbs := receive.NewMultiTSDB(
244239
conf.dataDir,
@@ -286,30 +281,47 @@ func runReceive(
286281
}
287282

288283
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
289-
Writer: writer,
290-
ListenAddress: conf.rwAddress,
291-
Registry: reg,
292-
Endpoint: conf.endpoint,
293-
TenantHeader: conf.tenantHeader,
294-
TenantField: conf.tenantField,
295-
DefaultTenantID: conf.defaultTenantID,
296-
ReplicaHeader: conf.replicaHeader,
297-
ReplicationFactor: conf.replicationFactor,
298-
RelabelConfigs: relabelConfig,
299-
ReceiverMode: receiveMode,
300-
Tracer: tracer,
301-
TLSConfig: rwTLSConfig,
302-
SplitTenantLabelName: conf.splitTenantLabelName,
303-
DialOpts: dialOpts,
304-
ForwardTimeout: time.Duration(*conf.forwardTimeout),
305-
MaxBackoff: time.Duration(*conf.maxBackoff),
306-
TSDBStats: dbs,
307-
Limiter: limiter,
308-
284+
Writer: writer,
285+
ListenAddress: conf.rwAddress,
286+
Registry: reg,
287+
Endpoint: conf.endpoint,
288+
TenantHeader: conf.tenantHeader,
289+
TenantField: conf.tenantField,
290+
DefaultTenantID: conf.defaultTenantID,
291+
ReplicaHeader: conf.replicaHeader,
292+
ReplicationFactor: conf.replicationFactor,
293+
Relabeller: relabeller,
294+
ReceiverMode: receiveMode,
295+
Tracer: tracer,
296+
TLSConfig: rwTLSConfig,
297+
SplitTenantLabelName: conf.splitTenantLabelName,
298+
DialOpts: dialOpts,
299+
ForwardTimeout: time.Duration(*conf.forwardTimeout),
300+
MaxBackoff: time.Duration(*conf.maxBackoff),
301+
TSDBStats: dbs,
302+
Limiter: limiter,
309303
AsyncForwardWorkerCount: conf.asyncForwardWorkerCount,
310304
ReplicationProtocol: receive.ReplicationProtocol(conf.replicationProtocol),
311305
})
312306

307+
{
308+
if relabeller.CanReload() {
309+
ctx, cancel := context.WithCancel(context.Background())
310+
g.Add(func() error {
311+
level.Debug(logger).Log("msg", "relabel config initialized with file watcher.")
312+
if err := relabeller.StartConfigReloader(ctx); err != nil {
313+
level.Error(logger).Log("msg", "initializing relabel config reloading.", "err", err)
314+
return err
315+
}
316+
level.Info(logger).Log("msg", "relabel config reloading initialized.")
317+
<-ctx.Done()
318+
return nil
319+
}, func(error) {
320+
cancel()
321+
})
322+
}
323+
}
324+
313325
grpcProbe := prober.NewGRPC()
314326
httpProbe := prober.NewHTTP()
315327
statusProber := prober.Combine(
@@ -974,8 +986,9 @@ type receiveConfig struct {
974986
ignoreBlockSize bool
975987
allowOutOfOrderUpload bool
976988

977-
reqLogConfig *extflag.PathOrContent
978-
relabelConfigPath *extflag.PathOrContent
989+
reqLogConfig *extflag.PathOrContent
990+
relabelConfigPath *extflag.PathOrContent
991+
relabelConfigReloadTimer time.Duration
979992

980993
writeLimitsConfig *extflag.PathOrContent
981994
storeRateLimits store.SeriesSelectLimits
@@ -1073,6 +1086,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
10731086
rc.maxBackoff = extkingpin.ModelDuration(cmd.Flag("receive-forward-max-backoff", "Maximum backoff for each forward fan-out request").Default("5s").Hidden())
10741087

10751088
rc.relabelConfigPath = extflag.RegisterPathOrContent(cmd, "receive.relabel-config", "YAML file that contains relabeling configuration.", extflag.WithEnvSubstitution())
1089+
cmd.Flag("receive.relabel-config-reload-timer", "Minimum amount of time to pass for the relabel configuration to be reloaded. Helps to avoid excessive reloads.").
1090+
Default("0s").Hidden().DurationVar(&rc.relabelConfigReloadTimer)
10761091

10771092
rc.tsdbMinBlockDuration = extkingpin.ModelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden())
10781093

0 commit comments

Comments
 (0)