Skip to content

Commit e04913a

Browse files
authored
Merge pull request prometheus#15778 from machine424/reuse-pools
feat(tsdb/(head|agent)): reuse pools across segments to reduce garbage during WL replay
2 parents 77a5698 + d644324 commit e04913a

File tree

4 files changed

+72
-59
lines changed

4 files changed

+72
-59
lines changed

tsdb/agent/db.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,12 @@ type DB struct {
235235
appenderPool sync.Pool
236236
bufPool sync.Pool
237237

238+
// These pools are used during WAL replay.
239+
walReplaySeriesPool zeropool.Pool[[]record.RefSeries]
240+
walReplaySamplesPool zeropool.Pool[[]record.RefSample]
241+
walReplayHistogramsPool zeropool.Pool[[]record.RefHistogramSample]
242+
walReplayFloatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
243+
238244
nextRef *atomic.Uint64
239245
series *stripeSeries
240246
// deleted is a map of (ref IDs that should be deleted from WAL) to (the WAL segment they
@@ -426,11 +432,6 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
426432

427433
decoded = make(chan interface{}, 10)
428434
errCh = make(chan error, 1)
429-
430-
seriesPool zeropool.Pool[[]record.RefSeries]
431-
samplesPool zeropool.Pool[[]record.RefSample]
432-
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
433-
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
434435
)
435436

436437
go func() {
@@ -440,7 +441,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
440441
rec := r.Record()
441442
switch dec.Type(rec) {
442443
case record.Series:
443-
series := seriesPool.Get()[:0]
444+
series := db.walReplaySeriesPool.Get()[:0]
444445
series, err = dec.Series(rec, series)
445446
if err != nil {
446447
errCh <- &wlog.CorruptionErr{
@@ -452,7 +453,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
452453
}
453454
decoded <- series
454455
case record.Samples:
455-
samples := samplesPool.Get()[:0]
456+
samples := db.walReplaySamplesPool.Get()[:0]
456457
samples, err = dec.Samples(rec, samples)
457458
if err != nil {
458459
errCh <- &wlog.CorruptionErr{
@@ -464,7 +465,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
464465
}
465466
decoded <- samples
466467
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
467-
histograms := histogramsPool.Get()[:0]
468+
histograms := db.walReplayHistogramsPool.Get()[:0]
468469
histograms, err = dec.HistogramSamples(rec, histograms)
469470
if err != nil {
470471
errCh <- &wlog.CorruptionErr{
@@ -476,7 +477,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
476477
}
477478
decoded <- histograms
478479
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
479-
floatHistograms := floatHistogramsPool.Get()[:0]
480+
floatHistograms := db.walReplayFloatHistogramsPool.Get()[:0]
480481
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
481482
if err != nil {
482483
errCh <- &wlog.CorruptionErr{
@@ -521,7 +522,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
521522
}
522523
}
523524
}
524-
seriesPool.Put(v)
525+
db.walReplaySeriesPool.Put(v)
525526
case []record.RefSample:
526527
for _, entry := range v {
527528
// Update the lastTs for the series based
@@ -535,7 +536,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
535536
series.lastTs = entry.T
536537
}
537538
}
538-
samplesPool.Put(v)
539+
db.walReplaySamplesPool.Put(v)
539540
case []record.RefHistogramSample:
540541
for _, entry := range v {
541542
// Update the lastTs for the series based
@@ -549,7 +550,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
549550
series.lastTs = entry.T
550551
}
551552
}
552-
histogramsPool.Put(v)
553+
db.walReplayHistogramsPool.Put(v)
553554
case []record.RefFloatHistogramSample:
554555
for _, entry := range v {
555556
// Update the lastTs for the series based
@@ -563,7 +564,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
563564
series.lastTs = entry.T
564565
}
565566
}
566-
floatHistogramsPool.Put(v)
567+
db.walReplayFloatHistogramsPool.Put(v)
567568
default:
568569
panic(fmt.Errorf("unexpected decoded type: %T", d))
569570
}

tsdb/head.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,16 @@ type Head struct {
9494
bytesPool zeropool.Pool[[]byte]
9595
memChunkPool sync.Pool
9696

97+
// These pools are used during WAL/WBL replay.
98+
wlReplaySeriesPool zeropool.Pool[[]record.RefSeries]
99+
wlReplaySamplesPool zeropool.Pool[[]record.RefSample]
100+
wlReplaytStonesPool zeropool.Pool[[]tombstones.Stone]
101+
wlReplayExemplarsPool zeropool.Pool[[]record.RefExemplar]
102+
wlReplayHistogramsPool zeropool.Pool[[]record.RefHistogramSample]
103+
wlReplayFloatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
104+
wlReplayMetadataPool zeropool.Pool[[]record.RefMetadata]
105+
wlReplayMmapMarkersPool zeropool.Pool[[]record.RefMmapMarker]
106+
97107
// All series addressable by their ID or hash.
98108
series *stripeSeries
99109

tsdb/head_test.go

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/prometheus/prometheus/storage"
4747
"github.com/prometheus/prometheus/tsdb/chunkenc"
4848
"github.com/prometheus/prometheus/tsdb/chunks"
49+
"github.com/prometheus/prometheus/tsdb/fileutil"
4950
"github.com/prometheus/prometheus/tsdb/index"
5051
"github.com/prometheus/prometheus/tsdb/record"
5152
"github.com/prometheus/prometheus/tsdb/tombstones"
@@ -440,27 +441,41 @@ func BenchmarkLoadWLs(b *testing.B) {
440441

441442
// BenchmarkLoadRealWLs will be skipped unless the BENCHMARK_LOAD_REAL_WLS_DIR environment variable is set.
442443
// BENCHMARK_LOAD_REAL_WLS_DIR should be the folder where `wal` and `chunks_head` are located.
444+
//
445+
// Using an absolute path for BENCHMARK_LOAD_REAL_WLS_DIR is recommended.
446+
//
447+
// Because WLs loading may alter BENCHMARK_LOAD_REAL_WLS_DIR which can affect benchmark results and to ensure consistency,
448+
// a copy of BENCHMARK_LOAD_REAL_WLS_DIR is made for each iteration and deleted at the end.
449+
// Make sure there is sufficient disk space for that.
443450
func BenchmarkLoadRealWLs(b *testing.B) {
444-
dir := os.Getenv("BENCHMARK_LOAD_REAL_WLS_DIR")
445-
if dir == "" {
451+
srcDir := os.Getenv("BENCHMARK_LOAD_REAL_WLS_DIR")
452+
if srcDir == "" {
446453
b.SkipNow()
447454
}
448455

449-
wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone)
450-
require.NoError(b, err)
451-
b.Cleanup(func() { wal.Close() })
452-
453-
wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone)
454-
require.NoError(b, err)
455-
b.Cleanup(func() { wbl.Close() })
456-
457456
// Load the WAL.
458457
for i := 0; i < b.N; i++ {
458+
b.StopTimer()
459+
dir := b.TempDir()
460+
require.NoError(b, fileutil.CopyDirs(srcDir, dir))
461+
462+
wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone)
463+
require.NoError(b, err)
464+
b.Cleanup(func() { wal.Close() })
465+
466+
wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone)
467+
require.NoError(b, err)
468+
b.Cleanup(func() { wbl.Close() })
469+
b.StartTimer()
470+
459471
opts := DefaultHeadOptions()
460472
opts.ChunkDirRoot = dir
461473
h, err := NewHead(nil, nil, wal, wbl, opts, nil)
462474
require.NoError(b, err)
463475
require.NoError(b, h.Init(0))
476+
477+
b.StopTimer()
478+
require.NoError(b, os.RemoveAll(dir))
464479
}
465480
}
466481

tsdb/head_wal.go

Lines changed: 23 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import (
3939
"github.com/prometheus/prometheus/tsdb/record"
4040
"github.com/prometheus/prometheus/tsdb/tombstones"
4141
"github.com/prometheus/prometheus/tsdb/wlog"
42-
"github.com/prometheus/prometheus/util/zeropool"
4342
)
4443

4544
// histogramRecord combines both RefHistogramSample and RefFloatHistogramSample
@@ -73,14 +72,6 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
7372

7473
decoded = make(chan interface{}, 10)
7574
decodeErr, seriesCreationErr error
76-
77-
seriesPool zeropool.Pool[[]record.RefSeries]
78-
samplesPool zeropool.Pool[[]record.RefSample]
79-
tstonesPool zeropool.Pool[[]tombstones.Stone]
80-
exemplarsPool zeropool.Pool[[]record.RefExemplar]
81-
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
82-
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
83-
metadataPool zeropool.Pool[[]record.RefMetadata]
8475
)
8576

8677
defer func() {
@@ -140,7 +131,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
140131
rec := r.Record()
141132
switch dec.Type(rec) {
142133
case record.Series:
143-
series := seriesPool.Get()[:0]
134+
series := h.wlReplaySeriesPool.Get()[:0]
144135
series, err = dec.Series(rec, series)
145136
if err != nil {
146137
decodeErr = &wlog.CorruptionErr{
@@ -152,7 +143,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
152143
}
153144
decoded <- series
154145
case record.Samples:
155-
samples := samplesPool.Get()[:0]
146+
samples := h.wlReplaySamplesPool.Get()[:0]
156147
samples, err = dec.Samples(rec, samples)
157148
if err != nil {
158149
decodeErr = &wlog.CorruptionErr{
@@ -164,7 +155,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
164155
}
165156
decoded <- samples
166157
case record.Tombstones:
167-
tstones := tstonesPool.Get()[:0]
158+
tstones := h.wlReplaytStonesPool.Get()[:0]
168159
tstones, err = dec.Tombstones(rec, tstones)
169160
if err != nil {
170161
decodeErr = &wlog.CorruptionErr{
@@ -176,7 +167,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
176167
}
177168
decoded <- tstones
178169
case record.Exemplars:
179-
exemplars := exemplarsPool.Get()[:0]
170+
exemplars := h.wlReplayExemplarsPool.Get()[:0]
180171
exemplars, err = dec.Exemplars(rec, exemplars)
181172
if err != nil {
182173
decodeErr = &wlog.CorruptionErr{
@@ -188,7 +179,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
188179
}
189180
decoded <- exemplars
190181
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
191-
hists := histogramsPool.Get()[:0]
182+
hists := h.wlReplayHistogramsPool.Get()[:0]
192183
hists, err = dec.HistogramSamples(rec, hists)
193184
if err != nil {
194185
decodeErr = &wlog.CorruptionErr{
@@ -200,7 +191,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
200191
}
201192
decoded <- hists
202193
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
203-
hists := floatHistogramsPool.Get()[:0]
194+
hists := h.wlReplayFloatHistogramsPool.Get()[:0]
204195
hists, err = dec.FloatHistogramSamples(rec, hists)
205196
if err != nil {
206197
decodeErr = &wlog.CorruptionErr{
@@ -212,7 +203,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
212203
}
213204
decoded <- hists
214205
case record.Metadata:
215-
meta := metadataPool.Get()[:0]
206+
meta := h.wlReplayMetadataPool.Get()[:0]
216207
meta, err := dec.Metadata(rec, meta)
217208
if err != nil {
218209
decodeErr = &wlog.CorruptionErr{
@@ -251,7 +242,7 @@ Outer:
251242
idx := uint64(mSeries.ref) % uint64(concurrency)
252243
processors[idx].input <- walSubsetProcessorInputItem{walSeriesRef: walSeries.Ref, existingSeries: mSeries}
253244
}
254-
seriesPool.Put(v)
245+
h.wlReplaySeriesPool.Put(v)
255246
case []record.RefSample:
256247
samples := v
257248
minValidTime := h.minValidTime.Load()
@@ -287,7 +278,7 @@ Outer:
287278
}
288279
samples = samples[m:]
289280
}
290-
samplesPool.Put(v)
281+
h.wlReplaySamplesPool.Put(v)
291282
case []tombstones.Stone:
292283
for _, s := range v {
293284
for _, itv := range s.Intervals {
@@ -301,12 +292,12 @@ Outer:
301292
h.tombstones.AddInterval(s.Ref, itv)
302293
}
303294
}
304-
tstonesPool.Put(v)
295+
h.wlReplaytStonesPool.Put(v)
305296
case []record.RefExemplar:
306297
for _, e := range v {
307298
exemplarsInput <- e
308299
}
309-
exemplarsPool.Put(v)
300+
h.wlReplayExemplarsPool.Put(v)
310301
case []record.RefHistogramSample:
311302
samples := v
312303
minValidTime := h.minValidTime.Load()
@@ -342,7 +333,7 @@ Outer:
342333
}
343334
samples = samples[m:]
344335
}
345-
histogramsPool.Put(v)
336+
h.wlReplayHistogramsPool.Put(v)
346337
case []record.RefFloatHistogramSample:
347338
samples := v
348339
minValidTime := h.minValidTime.Load()
@@ -378,7 +369,7 @@ Outer:
378369
}
379370
samples = samples[m:]
380371
}
381-
floatHistogramsPool.Put(v)
372+
h.wlReplayFloatHistogramsPool.Put(v)
382373
case []record.RefMetadata:
383374
for _, m := range v {
384375
s := h.series.getByID(m.Ref)
@@ -392,7 +383,7 @@ Outer:
392383
Help: m.Help,
393384
}
394385
}
395-
metadataPool.Put(v)
386+
h.wlReplayMetadataPool.Put(v)
396387
default:
397388
panic(fmt.Errorf("unexpected decoded type: %T", d))
398389
}
@@ -659,12 +650,8 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
659650
shards = make([][]record.RefSample, concurrency)
660651
histogramShards = make([][]histogramRecord, concurrency)
661652

662-
decodedCh = make(chan interface{}, 10)
663-
decodeErr error
664-
samplesPool zeropool.Pool[[]record.RefSample]
665-
markersPool zeropool.Pool[[]record.RefMmapMarker]
666-
histogramSamplesPool zeropool.Pool[[]record.RefHistogramSample]
667-
floatHistogramSamplesPool zeropool.Pool[[]record.RefFloatHistogramSample]
653+
decodedCh = make(chan interface{}, 10)
654+
decodeErr error
668655
)
669656

670657
defer func() {
@@ -700,7 +687,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
700687
rec := r.Record()
701688
switch dec.Type(rec) {
702689
case record.Samples:
703-
samples := samplesPool.Get()[:0]
690+
samples := h.wlReplaySamplesPool.Get()[:0]
704691
samples, err = dec.Samples(rec, samples)
705692
if err != nil {
706693
decodeErr = &wlog.CorruptionErr{
@@ -712,7 +699,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
712699
}
713700
decodedCh <- samples
714701
case record.MmapMarkers:
715-
markers := markersPool.Get()[:0]
702+
markers := h.wlReplayMmapMarkersPool.Get()[:0]
716703
markers, err = dec.MmapMarkers(rec, markers)
717704
if err != nil {
718705
decodeErr = &wlog.CorruptionErr{
@@ -724,7 +711,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
724711
}
725712
decodedCh <- markers
726713
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
727-
hists := histogramSamplesPool.Get()[:0]
714+
hists := h.wlReplayHistogramsPool.Get()[:0]
728715
hists, err = dec.HistogramSamples(rec, hists)
729716
if err != nil {
730717
decodeErr = &wlog.CorruptionErr{
@@ -736,7 +723,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
736723
}
737724
decodedCh <- hists
738725
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
739-
hists := floatHistogramSamplesPool.Get()[:0]
726+
hists := h.wlReplayFloatHistogramsPool.Get()[:0]
740727
hists, err = dec.FloatHistogramSamples(rec, hists)
741728
if err != nil {
742729
decodeErr = &wlog.CorruptionErr{
@@ -787,7 +774,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
787774
}
788775
samples = samples[m:]
789776
}
790-
samplesPool.Put(v)
777+
h.wlReplaySamplesPool.Put(v)
791778
case []record.RefMmapMarker:
792779
markers := v
793780
for _, rm := range markers {
@@ -842,7 +829,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
842829
}
843830
samples = samples[m:]
844831
}
845-
histogramSamplesPool.Put(v)
832+
h.wlReplayHistogramsPool.Put(v)
846833
case []record.RefFloatHistogramSample:
847834
samples := v
848835
// We split up the samples into chunks of 5000 samples or less.
@@ -874,7 +861,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
874861
}
875862
samples = samples[m:]
876863
}
877-
floatHistogramSamplesPool.Put(v)
864+
h.wlReplayFloatHistogramsPool.Put(v)
878865
default:
879866
panic(fmt.Errorf("unexpected decodedCh type: %T", d))
880867
}

0 commit comments

Comments
 (0)