Skip to content

Commit dc1b95b

Browse files
authored
Merge branch 'prometheus:main' into fix/stable-cardUI-key
2 parents 5db9a9d + e04913a commit dc1b95b

File tree

6 files changed

+109
-79
lines changed

6 files changed

+109
-79
lines changed

rules/group.go

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1110,36 +1110,53 @@ func buildDependencyMap(rules []Rule) dependencyMap {
11101110
return dependencies
11111111
}
11121112

1113-
inputs := make(map[string][]Rule, len(rules))
1114-
outputs := make(map[string][]Rule, len(rules))
1115-
11161113
var indeterminate bool
11171114

11181115
for _, rule := range rules {
11191116
if indeterminate {
11201117
break
11211118
}
11221119

1123-
name := rule.Name()
1124-
outputs[name] = append(outputs[name], rule)
1125-
11261120
parser.Inspect(rule.Query(), func(node parser.Node, path []parser.Node) error {
11271121
if n, ok := node.(*parser.VectorSelector); ok {
1122+
// Find the name matcher for the rule.
1123+
var nameMatcher *labels.Matcher
1124+
if n.Name != "" {
1125+
nameMatcher = labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, n.Name)
1126+
} else {
1127+
for _, m := range n.LabelMatchers {
1128+
if m.Name == model.MetricNameLabel {
1129+
nameMatcher = m
1130+
break
1131+
}
1132+
}
1133+
}
1134+
11281135
// A wildcard metric expression means we cannot reliably determine if this rule depends on any other,
11291136
// which means we cannot safely run any rules concurrently.
1130-
if n.Name == "" && len(n.LabelMatchers) > 0 {
1137+
if nameMatcher == nil {
11311138
indeterminate = true
11321139
return nil
11331140
}
11341141

11351142
// Rules which depend on "meta-metrics" like ALERTS and ALERTS_FOR_STATE will have undefined behaviour
11361143
// if they run concurrently.
1137-
if n.Name == alertMetricName || n.Name == alertForStateMetricName {
1144+
if nameMatcher.Matches(alertMetricName) || nameMatcher.Matches(alertForStateMetricName) {
11381145
indeterminate = true
11391146
return nil
11401147
}
11411148

1142-
inputs[n.Name] = append(inputs[n.Name], rule)
1149+
// Find rules which depend on the output of this rule.
1150+
for _, other := range rules {
1151+
if other == rule {
1152+
continue
1153+
}
1154+
1155+
otherName := other.Name()
1156+
if nameMatcher.Matches(otherName) {
1157+
dependencies[other] = append(dependencies[other], rule)
1158+
}
1159+
}
11431160
}
11441161
return nil
11451162
})
@@ -1149,13 +1166,5 @@ func buildDependencyMap(rules []Rule) dependencyMap {
11491166
return nil
11501167
}
11511168

1152-
for output, outRules := range outputs {
1153-
for _, outRule := range outRules {
1154-
if inRules, found := inputs[output]; found && len(inRules) > 0 {
1155-
dependencies[outRule] = append(dependencies[outRule], inRules...)
1156-
}
1157-
}
1158-
}
1159-
11601169
return dependencies
11611170
}

rules/manager_test.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1601,10 +1601,14 @@ func TestDependencyMap(t *testing.T) {
16011601
require.NoError(t, err)
16021602
rule4 := NewRecordingRule("user:requests:increase1h", expr, labels.Labels{})
16031603

1604+
expr, err = parser.ParseExpr(`sum by (user) ({__name__=~"user:requests.+5m"})`)
1605+
require.NoError(t, err)
1606+
rule5 := NewRecordingRule("user:requests:sum5m", expr, labels.Labels{})
1607+
16041608
group := NewGroup(GroupOptions{
16051609
Name: "rule_group",
16061610
Interval: time.Second,
1607-
Rules: []Rule{rule, rule2, rule3, rule4},
1611+
Rules: []Rule{rule, rule2, rule3, rule4, rule5},
16081612
Opts: opts,
16091613
})
16101614

@@ -1619,13 +1623,17 @@ func TestDependencyMap(t *testing.T) {
16191623
require.Equal(t, []Rule{rule}, depMap.dependencies(rule2))
16201624
require.False(t, depMap.isIndependent(rule2))
16211625

1622-
require.Zero(t, depMap.dependents(rule3))
1626+
require.Equal(t, []Rule{rule5}, depMap.dependents(rule3))
16231627
require.Zero(t, depMap.dependencies(rule3))
1624-
require.True(t, depMap.isIndependent(rule3))
1628+
require.False(t, depMap.isIndependent(rule3))
16251629

16261630
require.Zero(t, depMap.dependents(rule4))
16271631
require.Equal(t, []Rule{rule}, depMap.dependencies(rule4))
16281632
require.False(t, depMap.isIndependent(rule4))
1633+
1634+
require.Zero(t, depMap.dependents(rule5))
1635+
require.Equal(t, []Rule{rule3}, depMap.dependencies(rule5))
1636+
require.False(t, depMap.isIndependent(rule5))
16291637
}
16301638

16311639
func TestNoDependency(t *testing.T) {

tsdb/agent/db.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,12 @@ type DB struct {
235235
appenderPool sync.Pool
236236
bufPool sync.Pool
237237

238+
// These pools are used during WAL replay.
239+
walReplaySeriesPool zeropool.Pool[[]record.RefSeries]
240+
walReplaySamplesPool zeropool.Pool[[]record.RefSample]
241+
walReplayHistogramsPool zeropool.Pool[[]record.RefHistogramSample]
242+
walReplayFloatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
243+
238244
nextRef *atomic.Uint64
239245
series *stripeSeries
240246
// deleted is a map of (ref IDs that should be deleted from WAL) to (the WAL segment they
@@ -426,11 +432,6 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
426432

427433
decoded = make(chan interface{}, 10)
428434
errCh = make(chan error, 1)
429-
430-
seriesPool zeropool.Pool[[]record.RefSeries]
431-
samplesPool zeropool.Pool[[]record.RefSample]
432-
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
433-
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
434435
)
435436

436437
go func() {
@@ -440,7 +441,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
440441
rec := r.Record()
441442
switch dec.Type(rec) {
442443
case record.Series:
443-
series := seriesPool.Get()[:0]
444+
series := db.walReplaySeriesPool.Get()[:0]
444445
series, err = dec.Series(rec, series)
445446
if err != nil {
446447
errCh <- &wlog.CorruptionErr{
@@ -452,7 +453,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
452453
}
453454
decoded <- series
454455
case record.Samples:
455-
samples := samplesPool.Get()[:0]
456+
samples := db.walReplaySamplesPool.Get()[:0]
456457
samples, err = dec.Samples(rec, samples)
457458
if err != nil {
458459
errCh <- &wlog.CorruptionErr{
@@ -464,7 +465,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
464465
}
465466
decoded <- samples
466467
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
467-
histograms := histogramsPool.Get()[:0]
468+
histograms := db.walReplayHistogramsPool.Get()[:0]
468469
histograms, err = dec.HistogramSamples(rec, histograms)
469470
if err != nil {
470471
errCh <- &wlog.CorruptionErr{
@@ -476,7 +477,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
476477
}
477478
decoded <- histograms
478479
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
479-
floatHistograms := floatHistogramsPool.Get()[:0]
480+
floatHistograms := db.walReplayFloatHistogramsPool.Get()[:0]
480481
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
481482
if err != nil {
482483
errCh <- &wlog.CorruptionErr{
@@ -521,7 +522,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
521522
}
522523
}
523524
}
524-
seriesPool.Put(v)
525+
db.walReplaySeriesPool.Put(v)
525526
case []record.RefSample:
526527
for _, entry := range v {
527528
// Update the lastTs for the series based
@@ -535,7 +536,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
535536
series.lastTs = entry.T
536537
}
537538
}
538-
samplesPool.Put(v)
539+
db.walReplaySamplesPool.Put(v)
539540
case []record.RefHistogramSample:
540541
for _, entry := range v {
541542
// Update the lastTs for the series based
@@ -549,7 +550,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
549550
series.lastTs = entry.T
550551
}
551552
}
552-
histogramsPool.Put(v)
553+
db.walReplayHistogramsPool.Put(v)
553554
case []record.RefFloatHistogramSample:
554555
for _, entry := range v {
555556
// Update the lastTs for the series based
@@ -563,7 +564,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
563564
series.lastTs = entry.T
564565
}
565566
}
566-
floatHistogramsPool.Put(v)
567+
db.walReplayFloatHistogramsPool.Put(v)
567568
default:
568569
panic(fmt.Errorf("unexpected decoded type: %T", d))
569570
}

tsdb/head.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,16 @@ type Head struct {
9494
bytesPool zeropool.Pool[[]byte]
9595
memChunkPool sync.Pool
9696

97+
// These pools are used during WAL/WBL replay.
98+
wlReplaySeriesPool zeropool.Pool[[]record.RefSeries]
99+
wlReplaySamplesPool zeropool.Pool[[]record.RefSample]
100+
wlReplaytStonesPool zeropool.Pool[[]tombstones.Stone]
101+
wlReplayExemplarsPool zeropool.Pool[[]record.RefExemplar]
102+
wlReplayHistogramsPool zeropool.Pool[[]record.RefHistogramSample]
103+
wlReplayFloatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
104+
wlReplayMetadataPool zeropool.Pool[[]record.RefMetadata]
105+
wlReplayMmapMarkersPool zeropool.Pool[[]record.RefMmapMarker]
106+
97107
// All series addressable by their ID or hash.
98108
series *stripeSeries
99109

tsdb/head_test.go

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/prometheus/prometheus/storage"
4747
"github.com/prometheus/prometheus/tsdb/chunkenc"
4848
"github.com/prometheus/prometheus/tsdb/chunks"
49+
"github.com/prometheus/prometheus/tsdb/fileutil"
4950
"github.com/prometheus/prometheus/tsdb/index"
5051
"github.com/prometheus/prometheus/tsdb/record"
5152
"github.com/prometheus/prometheus/tsdb/tombstones"
@@ -440,27 +441,41 @@ func BenchmarkLoadWLs(b *testing.B) {
440441

441442
// BenchmarkLoadRealWLs will be skipped unless the BENCHMARK_LOAD_REAL_WLS_DIR environment variable is set.
442443
// BENCHMARK_LOAD_REAL_WLS_DIR should be the folder where `wal` and `chunks_head` are located.
444+
//
445+
// Using an absolute path for BENCHMARK_LOAD_REAL_WLS_DIR is recommended.
446+
//
447+
// Because WLs loading may alter BENCHMARK_LOAD_REAL_WLS_DIR which can affect benchmark results and to ensure consistency,
448+
// a copy of BENCHMARK_LOAD_REAL_WLS_DIR is made for each iteration and deleted at the end.
449+
// Make sure there is sufficient disk space for that.
443450
func BenchmarkLoadRealWLs(b *testing.B) {
444-
dir := os.Getenv("BENCHMARK_LOAD_REAL_WLS_DIR")
445-
if dir == "" {
451+
srcDir := os.Getenv("BENCHMARK_LOAD_REAL_WLS_DIR")
452+
if srcDir == "" {
446453
b.SkipNow()
447454
}
448455

449-
wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone)
450-
require.NoError(b, err)
451-
b.Cleanup(func() { wal.Close() })
452-
453-
wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone)
454-
require.NoError(b, err)
455-
b.Cleanup(func() { wbl.Close() })
456-
457456
// Load the WAL.
458457
for i := 0; i < b.N; i++ {
458+
b.StopTimer()
459+
dir := b.TempDir()
460+
require.NoError(b, fileutil.CopyDirs(srcDir, dir))
461+
462+
wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone)
463+
require.NoError(b, err)
464+
b.Cleanup(func() { wal.Close() })
465+
466+
wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone)
467+
require.NoError(b, err)
468+
b.Cleanup(func() { wbl.Close() })
469+
b.StartTimer()
470+
459471
opts := DefaultHeadOptions()
460472
opts.ChunkDirRoot = dir
461473
h, err := NewHead(nil, nil, wal, wbl, opts, nil)
462474
require.NoError(b, err)
463475
require.NoError(b, h.Init(0))
476+
477+
b.StopTimer()
478+
require.NoError(b, os.RemoveAll(dir))
464479
}
465480
}
466481

0 commit comments

Comments
 (0)