Skip to content

Commit 4f01c47

Browse files
committed
db: improve metrics and logs to measure all filesystem ops
Enhance the log writer to instrument all filesystem operations done for writing the log file in the latency histogram (create, write, sync, etc.).
1 parent 0f13c8c commit 4f01c47

File tree

12 files changed

+173
-73
lines changed

12 files changed

+173
-73
lines changed

commit_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"github.com/cockroachdb/pebble/internal/invariants"
2121
"github.com/cockroachdb/pebble/record"
2222
"github.com/cockroachdb/pebble/vfs"
23-
"github.com/prometheus/client_golang/prometheus"
2423
"github.com/stretchr/testify/require"
2524
)
2625

@@ -253,7 +252,6 @@ func TestCommitPipelineWALClose(t *testing.T) {
253252
}
254253
p := newCommitPipeline(testEnv)
255254
wal = record.NewLogWriter(sf, 0 /* logNum */, record.LogWriterConfig{
256-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
257255
QueueSemChan: p.logSyncQSem,
258256
WriteWALSyncOffsets: func() bool { return false },
259257
})
@@ -395,7 +393,6 @@ func BenchmarkCommitPipeline(b *testing.B) {
395393
p := newCommitPipeline(nullCommitEnv)
396394
wal = record.NewLogWriter(io.Discard, 0, /* logNum */
397395
record.LogWriterConfig{
398-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
399396
QueueSemChan: p.logSyncQSem,
400397
WriteWALSyncOffsets: func() bool { return false },
401398
})

db.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,12 @@ import (
3333
"github.com/cockroachdb/pebble/objstorage"
3434
"github.com/cockroachdb/pebble/objstorage/remote"
3535
"github.com/cockroachdb/pebble/rangekey"
36-
"github.com/cockroachdb/pebble/record"
3736
"github.com/cockroachdb/pebble/sstable"
3837
"github.com/cockroachdb/pebble/sstable/blob"
3938
"github.com/cockroachdb/pebble/sstable/block"
4039
"github.com/cockroachdb/pebble/vfs/atomicfs"
4140
"github.com/cockroachdb/pebble/wal"
4241
"github.com/cockroachdb/tokenbucket"
43-
"github.com/prometheus/client_golang/prometheus"
4442
)
4543

4644
const (
@@ -376,13 +374,7 @@ type DB struct {
376374
// commitPipeline.mu and DB.mu to be held when rotating the WAL/memtable
377375
// (i.e. makeRoomForWrite). Can be nil.
378376
writer wal.Writer
379-
metrics struct {
380-
// fsyncLatency has its own internal synchronization, and is not
381-
// protected by mu.
382-
fsyncLatency prometheus.Histogram
383-
// Updated whenever a wal.Writer is closed.
384-
record.LogWriterMetrics
385-
}
377+
metrics WALMetrics
386378
}
387379

388380
mem struct {
@@ -1972,8 +1964,9 @@ func (d *DB) Metrics() *Metrics {
19721964
metrics.BlobFiles.ReferencedValueSize = blobStats.ReferencedValueSize
19731965
metrics.BlobFiles.ReferencedBackingValueSize = blobStats.ReferencedBackingValueSize
19741966

1975-
metrics.LogWriter.FsyncLatency = d.mu.log.metrics.fsyncLatency
1976-
if err := metrics.LogWriter.Merge(&d.mu.log.metrics.LogWriterMetrics); err != nil {
1967+
metrics.WALMetrics.PrimaryFileOpLatency = d.mu.log.metrics.PrimaryFileOpLatency
1968+
metrics.WALMetrics.SecondaryFileOpLatency = d.mu.log.metrics.SecondaryFileOpLatency
1969+
if err := metrics.WALMetrics.Merge(&d.mu.log.metrics.LogWriterMetrics); err != nil {
19771970
d.opts.Logger.Errorf("metrics error: %s", err)
19781971
}
19791972
metrics.Flush.WriteThroughput = d.mu.compact.flushWriteThroughput

metrics.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -486,10 +486,7 @@ type Metrics struct {
486486
// deletion. These can be relevant if free disk space is unexplainably low.
487487
DeletePacer deletepacer.Metrics
488488

489-
LogWriter struct {
490-
FsyncLatency prometheus.Histogram
491-
record.LogWriterMetrics
492-
}
489+
WALMetrics WALMetrics
493490

494491
CategoryStats []block.CategoryStatsAggregate
495492

@@ -544,6 +541,28 @@ func (cm *CompressionMetrics) MergeWith(o *CompressionMetrics) {
544541
cm.Zstd.Add(o.Zstd)
545542
}
546543

544+
// DirectoryContext identifies which WAL directory an operation targets
545+
type DirectoryContext int
546+
547+
const (
548+
// DirectoryPrimary indicates operation targets the primary WAL directory
549+
DirectoryPrimary DirectoryContext = iota
550+
// DirectorySecondary indicates operation targets the secondary WAL directory
551+
DirectorySecondary
552+
// DirectoryUnknown indicates directory context is unknown or not applicable
553+
DirectoryUnknown
554+
)
555+
556+
// WALMetrics contains directory-specific latency histograms for all WAL operations
557+
type WALMetrics struct {
558+
// PrimaryFileOpLatency tracks all file operations for the primary directory
559+
PrimaryFileOpLatency prometheus.Histogram
560+
// SecondaryFileOpLatency tracks all file operations for the secondary directory
561+
SecondaryFileOpLatency prometheus.Histogram
562+
// Updated whenever a wal.Writer is closed
563+
record.LogWriterMetrics
564+
}
565+
547566
var (
548567
// FsyncLatencyBuckets are prometheus histogram buckets suitable for a histogram
549568
// that records latencies for fsyncs.

open.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -289,9 +289,8 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
289289
d.mu.mem.queue = append(d.mu.mem.queue, entry)
290290
}
291291

292-
d.mu.log.metrics.fsyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
293-
Buckets: FsyncLatencyBuckets,
294-
})
292+
// Initialize WAL metrics structure (histograms populated below)
293+
d.mu.log.metrics = WALMetrics{}
295294

296295
walOpts := wal.Options{
297296
Primary: dirs.WALPrimary,
@@ -302,17 +301,33 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
302301
BytesPerSync: opts.WALBytesPerSync,
303302
PreallocateSize: d.walPreallocateSize,
304303
MinSyncInterval: opts.WALMinSyncInterval,
305-
FsyncLatency: d.mu.log.metrics.fsyncLatency,
306304
QueueSemChan: d.commit.logSyncQSem,
307305
Logger: opts.Logger,
308306
EventListener: walEventListenerAdaptor{l: opts.EventListener},
309307
WriteWALSyncOffsets: func() bool { return d.FormatMajorVersion() >= FormatWALSyncChunks },
310308
}
309+
310+
// Create and assign WAL file operation histograms
311+
walPrimaryFileOpHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{
312+
Buckets: FsyncLatencyBuckets,
313+
})
314+
d.mu.log.metrics.PrimaryFileOpLatency = walPrimaryFileOpHistogram
315+
walOpts.PrimaryFileOpHistogram = walPrimaryFileOpHistogram
316+
317+
// Configure failover-specific histograms and options
311318
if !opts.ReadOnly && opts.WALFailover != nil {
312-
walOpts.FailoverOptions = opts.WALFailover.FailoverOptions
313-
walOpts.FailoverWriteAndSyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
319+
walSecondaryFileOpHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{
320+
Buckets: FsyncLatencyBuckets,
321+
})
322+
d.mu.log.metrics.SecondaryFileOpLatency = walSecondaryFileOpHistogram
323+
walOpts.SecondaryFileOpHistogram = walSecondaryFileOpHistogram
324+
325+
walFailoverWriteAndSyncHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{
314326
Buckets: FsyncLatencyBuckets,
315327
})
328+
walOpts.FailoverWriteAndSyncLatency = walFailoverWriteAndSyncHistogram
329+
330+
walOpts.FailoverOptions = opts.WALFailover.FailoverOptions
316331
}
317332
walDirs := walOpts.Dirs()
318333
var recoveryDirLocks base.DirLockSet

record/log_writer.go

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -455,8 +455,10 @@ type LogWriter struct {
455455
err error
456456
// minSyncInterval is the minimum duration between syncs.
457457
minSyncInterval durationFunc
458-
fsyncLatency prometheus.Histogram
459-
pending []*block
458+
459+
// WAL file operation latency histogram
460+
walFileOpHistogram WALFileOpHistogram
461+
pending []*block
460462
// Pushing and popping from pendingSyncs does not require flusher mutex to
461463
// be held.
462464
pendingSyncs pendingSyncs
@@ -486,10 +488,17 @@ type LogWriter struct {
486488
emitFragment func(n int, p []byte) (remainingP []byte)
487489
}
488490

491+
// WALFileOpHistogram is a prometheus histogram for tracking WAL file operation latencies
492+
// (create, write, fsync, close, stat, opendir).
493+
type WALFileOpHistogram = prometheus.Histogram
494+
489495
// LogWriterConfig is a struct used for configuring new LogWriters
490496
type LogWriterConfig struct {
491497
WALMinSyncInterval durationFunc
492-
WALFsyncLatency prometheus.Histogram
498+
499+
// WAL file operation latency histogram for this directory
500+
WALFileOpHistogram WALFileOpHistogram
501+
493502
// QueueSemChan is an optional channel to pop from when popping from
494503
// LogWriter.flusher.syncQueue. It functions as a semaphore that prevents
495504
// the syncQueue from overflowing (which will cause a panic). All production
@@ -581,7 +590,7 @@ func NewLogWriter(
581590

582591
f := &r.flusher
583592
f.minSyncInterval = logWriterConfig.WALMinSyncInterval
584-
f.fsyncLatency = logWriterConfig.WALFsyncLatency
593+
f.walFileOpHistogram = logWriterConfig.WALFileOpHistogram
585594

586595
go func() {
587596
pprof.Do(context.Background(), walSyncLabels, r.flushLoop)
@@ -725,9 +734,9 @@ func (w *LogWriter) flushLoop(context.Context) {
725734
writtenOffset += uint64(len(data))
726735
synced, syncLatency, bytesWritten, err := w.flushPending(data, pending, snap)
727736
f.Lock()
728-
if synced && f.fsyncLatency != nil {
737+
if synced && f.walFileOpHistogram != nil {
729738
w.syncedOffset.Store(writtenOffset)
730-
f.fsyncLatency.Observe(float64(syncLatency))
739+
f.walFileOpHistogram.Observe(float64(syncLatency))
731740
}
732741
f.err = err
733742
if f.err != nil {
@@ -784,7 +793,12 @@ func (w *LogWriter) flushPending(
784793
}
785794
if n := len(data); err == nil && n > 0 {
786795
bytesWritten += int64(n)
796+
// Measure write latency
797+
writeStart := crtime.NowMono()
787798
_, err = w.w.Write(data)
799+
if writeLatency := writeStart.Elapsed(); w.flusher.walFileOpHistogram != nil {
800+
w.flusher.walFileOpHistogram.Observe(float64(writeLatency))
801+
}
788802
}
789803

790804
synced = !snap.empty()
@@ -811,7 +825,13 @@ func (w *LogWriter) syncWithLatency() (time.Duration, error) {
811825
}
812826

813827
func (w *LogWriter) flushBlock(b *block) error {
814-
if _, err := w.w.Write(b.buf[b.flushed:]); err != nil {
828+
// Measure write latency for block flush
829+
writeStart := crtime.NowMono()
830+
_, err := w.w.Write(b.buf[b.flushed:])
831+
if writeLatency := writeStart.Elapsed(); w.flusher.walFileOpHistogram != nil {
832+
w.flusher.walFileOpHistogram.Observe(float64(writeLatency))
833+
}
834+
if err != nil {
815835
return err
816836
}
817837
b.written.Store(0)
@@ -885,8 +905,8 @@ func (w *LogWriter) closeInternal(lastQueuedRecord PendingSyncIndex) error {
885905
syncLatency, err = w.syncWithLatency()
886906
}
887907
f.Lock()
888-
if err == nil && f.fsyncLatency != nil {
889-
f.fsyncLatency.Observe(float64(syncLatency))
908+
if err == nil && f.walFileOpHistogram != nil {
909+
f.walFileOpHistogram.Observe(float64(syncLatency))
890910
}
891911
free := w.free.blocks
892912
f.Unlock()
@@ -897,7 +917,12 @@ func (w *LogWriter) closeInternal(lastQueuedRecord PendingSyncIndex) error {
897917
w.pendingSyncsBackingIndex.externalSyncQueueCallback(lastQueuedRecord, err)
898918
}
899919
if w.c != nil {
920+
// Measure close latency
921+
closeStart := crtime.NowMono()
900922
cerr := w.c.Close()
923+
if closeLatency := closeStart.Elapsed(); w.flusher.walFileOpHistogram != nil {
924+
w.flusher.walFileOpHistogram.Observe(float64(closeLatency))
925+
}
901926
w.c = nil
902927
err = firstError(err, cerr)
903928
}

record/log_writer_test.go

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,6 @@ func TestSyncError(t *testing.T) {
150150

151151
injectedErr := errors.New("injected error")
152152
w := NewLogWriter(syncErrorFile{f, injectedErr}, 0, LogWriterConfig{
153-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
154153
WriteWALSyncOffsets: func() bool { return false },
155154
})
156155

@@ -193,7 +192,6 @@ func (f *syncFile) Sync() error {
193192
func TestSyncRecord(t *testing.T) {
194193
f := &syncFile{}
195194
w := NewLogWriter(f, 0, LogWriterConfig{
196-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
197195
WriteWALSyncOffsets: func() bool { return false },
198196
})
199197

@@ -221,7 +219,6 @@ func TestSyncRecordWithSignalChan(t *testing.T) {
221219
semChan <- struct{}{}
222220
}
223221
w := NewLogWriter(f, 0, LogWriterConfig{
224-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
225222
QueueSemChan: semChan,
226223
WriteWALSyncOffsets: func() bool { return false },
227224
})
@@ -273,7 +270,6 @@ func TestMinSyncInterval(t *testing.T) {
273270
WALMinSyncInterval: func() time.Duration {
274271
return minSyncInterval
275272
},
276-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
277273
WriteWALSyncOffsets: func() bool { return false },
278274
})
279275

@@ -346,7 +342,6 @@ func TestMinSyncIntervalClose(t *testing.T) {
346342
WALMinSyncInterval: func() time.Duration {
347343
return minSyncInterval
348344
},
349-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
350345
WriteWALSyncOffsets: func() bool { return false },
351346
})
352347

@@ -398,7 +393,6 @@ func TestMetricsWithoutSync(t *testing.T) {
398393
f := &syncFileWithWait{}
399394
f.writeWG.Add(1)
400395
w := NewLogWriter(f, 0, LogWriterConfig{
401-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
402396
WriteWALSyncOffsets: func() bool { return false },
403397
})
404398
offset, err := w.SyncRecord([]byte("hello"), nil, nil)
@@ -444,10 +438,9 @@ func TestMetricsWithSync(t *testing.T) {
444438
})
445439

446440
w := NewLogWriter(f, 0, LogWriterConfig{
447-
WALFsyncLatency: syncLatencyMicros,
448441
WriteWALSyncOffsets: func() bool { return false },
449-
},
450-
)
442+
WALFileOpHistogram: syncLatencyMicros,
443+
})
451444
var wg sync.WaitGroup
452445
wg.Add(100)
453446
for i := 0; i < 100; i++ {
@@ -551,7 +544,6 @@ func TestQueueWALBlocks(t *testing.T) {
551544
return nil
552545
}))
553546
w := NewLogWriter(f, 0, LogWriterConfig{
554-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
555547
WriteWALSyncOffsets: func() bool { return false },
556548
})
557549
const numBlocks = 1024
@@ -662,7 +654,6 @@ func TestSyncRecordGeneralized(t *testing.T) {
662654
lastSync := int64(-1)
663655
cbChan := make(chan struct{}, 2)
664656
w := NewLogWriter(f, 0, LogWriterConfig{
665-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
666657
ExternalSyncQueueCallback: func(doneSync PendingSyncIndex, err error) {
667658
require.NoError(t, err)
668659
require.Equal(t, lastSync+1, doneSync.Index)
@@ -714,7 +705,6 @@ func TestSyncRecordGeneralizedWithCloseError(t *testing.T) {
714705
lastSync := int64(-1)
715706
cbChan := make(chan struct{}, 2)
716707
w := NewLogWriter(f, 0, LogWriterConfig{
717-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
718708
ExternalSyncQueueCallback: func(doneSync PendingSyncIndex, err error) {
719709
if doneSync.Index == 1 {
720710
require.Error(t, err)
@@ -755,7 +745,6 @@ func TestSyncRecordGeneralizedWithCloseError(t *testing.T) {
755745
func writeWALSyncRecords(t *testing.T, numRecords int, recordSizes []int) *syncFile {
756746
f := &syncFile{}
757747
w := NewLogWriter(f, 1, LogWriterConfig{
758-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
759748
WriteWALSyncOffsets: func() bool { return true },
760749
})
761750
var syncErr error
@@ -864,7 +853,6 @@ func BenchmarkQueueWALBlocks(b *testing.B) {
864853
return nil
865854
}))
866855
w := NewLogWriter(f, 0, LogWriterConfig{
867-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
868856
WriteWALSyncOffsets: func() bool { return false },
869857
})
870858

@@ -897,7 +885,6 @@ func BenchmarkWriteWALBlocksAllocs(b *testing.B) {
897885
b.StopTimer()
898886
f := vfstest.DiscardFile
899887
w := NewLogWriter(f, 0, LogWriterConfig{
900-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
901888
ExternalSyncQueueCallback: func(doneSync PendingSyncIndex, err error) {},
902889
WriteWALSyncOffsets: func() bool { return false },
903890
})

0 commit comments

Comments
 (0)