Skip to content

Commit 2350d6b

Browse files
committed
db: improve metrics and logs to measure all filesystem ops
Enhance the log writer to instrument all filesystem operations done for writing the log file in the latency histogram (create, write, sync, etc.).
1 parent 3d22cee commit 2350d6b

File tree

12 files changed

+165
-70
lines changed

12 files changed

+165
-70
lines changed

commit_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"github.com/cockroachdb/pebble/internal/invariants"
2121
"github.com/cockroachdb/pebble/record"
2222
"github.com/cockroachdb/pebble/vfs"
23-
"github.com/prometheus/client_golang/prometheus"
2423
"github.com/stretchr/testify/require"
2524
)
2625

@@ -253,7 +252,6 @@ func TestCommitPipelineWALClose(t *testing.T) {
253252
}
254253
p := newCommitPipeline(testEnv)
255254
wal = record.NewLogWriter(sf, 0 /* logNum */, record.LogWriterConfig{
256-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
257255
QueueSemChan: p.logSyncQSem,
258256
WriteWALSyncOffsets: func() bool { return false },
259257
})
@@ -395,7 +393,6 @@ func BenchmarkCommitPipeline(b *testing.B) {
395393
p := newCommitPipeline(nullCommitEnv)
396394
wal = record.NewLogWriter(io.Discard, 0, /* logNum */
397395
record.LogWriterConfig{
398-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
399396
QueueSemChan: p.logSyncQSem,
400397
WriteWALSyncOffsets: func() bool { return false },
401398
})

db.go

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,13 @@ import (
3131
"github.com/cockroachdb/pebble/objstorage"
3232
"github.com/cockroachdb/pebble/objstorage/remote"
3333
"github.com/cockroachdb/pebble/rangekey"
34-
"github.com/cockroachdb/pebble/record"
3534
"github.com/cockroachdb/pebble/sstable"
3635
"github.com/cockroachdb/pebble/sstable/blob"
3736
"github.com/cockroachdb/pebble/sstable/block"
3837
"github.com/cockroachdb/pebble/vfs"
3938
"github.com/cockroachdb/pebble/vfs/atomicfs"
4039
"github.com/cockroachdb/pebble/wal"
4140
"github.com/cockroachdb/tokenbucket"
42-
"github.com/prometheus/client_golang/prometheus"
4341
)
4442

4543
const (
@@ -376,13 +374,7 @@ type DB struct {
376374
// commitPipeline.mu and DB.mu to be held when rotating the WAL/memtable
377375
// (i.e. makeRoomForWrite). Can be nil.
378376
writer wal.Writer
379-
metrics struct {
380-
// fsyncLatency has its own internal synchronization, and is not
381-
// protected by mu.
382-
fsyncLatency prometheus.Histogram
383-
// Updated whenever a wal.Writer is closed.
384-
record.LogWriterMetrics
385-
}
377+
metrics WALMetrics
386378
}
387379

388380
mem struct {
@@ -1962,8 +1954,7 @@ func (d *DB) Metrics() *Metrics {
19621954
metrics.BlobFiles.ReferencedValueSize = blobStats.ReferencedValueSize
19631955
metrics.BlobFiles.ReferencedBackingValueSize = blobStats.ReferencedBackingValueSize
19641956

1965-
metrics.LogWriter.FsyncLatency = d.mu.log.metrics.fsyncLatency
1966-
if err := metrics.LogWriter.Merge(&d.mu.log.metrics.LogWriterMetrics); err != nil {
1957+
if err := metrics.WALMetrics.Merge(&d.mu.log.metrics.LogWriterMetrics); err != nil {
19671958
d.opts.Logger.Errorf("metrics error: %s", err)
19681959
}
19691960
metrics.Flush.WriteThroughput = d.mu.compact.flushWriteThroughput

metrics.go

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -481,10 +481,7 @@ type Metrics struct {
481481
Failover wal.FailoverStats
482482
}
483483

484-
LogWriter struct {
485-
FsyncLatency prometheus.Histogram
486-
record.LogWriterMetrics
487-
}
484+
WALMetrics WALMetrics
488485

489486
CategoryStats []block.CategoryStatsAggregate
490487

@@ -539,6 +536,28 @@ func (cm *CompressionMetrics) MergeWith(o *CompressionMetrics) {
539536
cm.Zstd.Add(o.Zstd)
540537
}
541538

539+
// DirectoryContext identifies which WAL directory an operation targets
540+
type DirectoryContext int
541+
542+
const (
543+
// DirectoryPrimary indicates operation targets the primary WAL directory
544+
DirectoryPrimary DirectoryContext = iota
545+
// DirectorySecondary indicates operation targets the secondary WAL directory
546+
DirectorySecondary
547+
// DirectoryUnknown indicates directory context is unknown or not applicable
548+
DirectoryUnknown
549+
)
550+
551+
// WALMetrics contains directory-specific latency histograms for all WAL operations
552+
type WALMetrics struct {
553+
// PrimaryFileOpLatency tracks all file operations for the primary directory
554+
PrimaryFileOpLatency prometheus.Histogram
555+
// SecondaryFileOpLatency tracks all file operations for the secondary directory
556+
SecondaryFileOpLatency prometheus.Histogram
557+
// Embedded runtime metrics from record.LogWriterMetrics
558+
record.LogWriterMetrics
559+
}
560+
542561
var (
543562
// FsyncLatencyBuckets are prometheus histogram buckets suitable for a histogram
544563
// that records latencies for fsyncs.
@@ -547,6 +566,12 @@ var (
547566
prometheus.ExponentialBucketsRange(float64(time.Millisecond*5), float64(10*time.Second), 50)...,
548567
)
549568

569+
// WALFileOpLatencyBuckets are histogram buckets for all WAL file operations.
570+
WALFileOpLatencyBuckets = append(
571+
prometheus.LinearBuckets(0.0, 0.01, 50), // 0 to 0.5ms in 10μs increments
572+
prometheus.ExponentialBucketsRange(1.0, 10000.0, 50)..., // 1ms to 10s
573+
)
574+
550575
// SecondaryCacheIOBuckets exported to enable exporting from package pebble to
551576
// enable exporting metrics with below buckets in CRDB.
552577
SecondaryCacheIOBuckets = sharedcache.IOBuckets

open.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -324,10 +324,19 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
324324
d.mu.mem.queue = append(d.mu.mem.queue, entry)
325325
}
326326

327-
d.mu.log.metrics.fsyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
328-
Buckets: FsyncLatencyBuckets,
327+
// Create WAL file operation histogram for filesystem operation tracking
328+
walFileOpHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{
329+
Name: "pebble_wal_file_op_duration_milliseconds",
330+
Help: "Histogram of WAL file operation latencies in milliseconds",
331+
Buckets: WALFileOpLatencyBuckets,
329332
})
330333

334+
// Initialize WAL metrics structure
335+
d.mu.log.metrics = WALMetrics{
336+
PrimaryFileOpLatency: walFileOpHistogram,
337+
// SecondaryFileOpLatency could be populated for failover scenarios
338+
}
339+
331340
walOpts := wal.Options{
332341
Primary: wal.Dir{FS: opts.FS, Dirname: walDirname},
333342
Secondary: wal.Dir{},
@@ -337,7 +346,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
337346
BytesPerSync: opts.WALBytesPerSync,
338347
PreallocateSize: d.walPreallocateSize,
339348
MinSyncInterval: opts.WALMinSyncInterval,
340-
FsyncLatency: d.mu.log.metrics.fsyncLatency,
349+
WALFileOpHistogram: walFileOpHistogram,
341350
QueueSemChan: d.commit.logSyncQSem,
342351
Logger: opts.Logger,
343352
EventListener: walEventListenerAdaptor{l: opts.EventListener},

record/log_writer.go

Lines changed: 66 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -455,8 +455,12 @@ type LogWriter struct {
455455
err error
456456
// minSyncInterval is the minimum duration between syncs.
457457
minSyncInterval durationFunc
458-
fsyncLatency prometheus.Histogram
459-
pending []*block
458+
459+
// WAL file operation latency histogram
460+
walFileOpHistogram WALFileOpHistogram
461+
// Directory type for metrics recording
462+
directoryType DirectoryType
463+
pending []*block
460464
// Pushing and popping from pendingSyncs does not require flusher mutex to
461465
// be held.
462466
pendingSyncs pendingSyncs
@@ -486,10 +490,46 @@ type LogWriter struct {
486490
emitFragment func(n int, p []byte) (remainingP []byte)
487491
}
488492

493+
// DirectoryType identifies which WAL directory an operation targets
494+
type DirectoryType int
495+
496+
const (
497+
// DirectoryPrimary indicates operation targets the primary WAL directory
498+
DirectoryPrimary DirectoryType = iota
499+
// DirectorySecondary indicates operation targets the secondary WAL directory
500+
DirectorySecondary
501+
// DirectoryUnknown indicates directory context is unknown or not applicable
502+
DirectoryUnknown
503+
)
504+
505+
// String returns the string representation of DirectoryType.
506+
func (d DirectoryType) String() string {
507+
switch d {
508+
case DirectoryPrimary:
509+
return "Primary"
510+
case DirectorySecondary:
511+
return "Secondary"
512+
case DirectoryUnknown:
513+
return "Unknown"
514+
default:
515+
return "Invalid"
516+
}
517+
}
518+
519+
// WALFileOpHistogram is a prometheus histogram for tracking WAL file operation latencies
520+
// (create, write, fsync, close, stat, opendir).
521+
type WALFileOpHistogram = prometheus.Histogram
522+
489523
// LogWriterConfig is a struct used for configuring new LogWriters
490524
type LogWriterConfig struct {
491525
WALMinSyncInterval durationFunc
492-
WALFsyncLatency prometheus.Histogram
526+
527+
// Directory type for this LogWriter (Primary, Secondary, or Unknown)
528+
DirectoryType DirectoryType
529+
530+
// WAL file operation latency histogram for this directory
531+
WALFileOpHistogram WALFileOpHistogram
532+
493533
// QueueSemChan is an optional channel to pop from when popping from
494534
// LogWriter.flusher.syncQueue. It functions as a semaphore that prevents
495535
// the syncQueue from overflowing (which will cause a panic). All production
@@ -581,7 +621,8 @@ func NewLogWriter(
581621

582622
f := &r.flusher
583623
f.minSyncInterval = logWriterConfig.WALMinSyncInterval
584-
f.fsyncLatency = logWriterConfig.WALFsyncLatency
624+
f.walFileOpHistogram = logWriterConfig.WALFileOpHistogram
625+
f.directoryType = logWriterConfig.DirectoryType
585626

586627
go func() {
587628
pprof.Do(context.Background(), walSyncLabels, r.flushLoop)
@@ -725,9 +766,9 @@ func (w *LogWriter) flushLoop(context.Context) {
725766
writtenOffset += uint64(len(data))
726767
synced, syncLatency, bytesWritten, err := w.flushPending(data, pending, snap)
727768
f.Lock()
728-
if synced && f.fsyncLatency != nil {
769+
if synced && f.walFileOpHistogram != nil {
729770
w.syncedOffset.Store(writtenOffset)
730-
f.fsyncLatency.Observe(float64(syncLatency))
771+
f.walFileOpHistogram.Observe(syncLatency.Seconds() * 1000) // Convert to milliseconds
731772
}
732773
f.err = err
733774
if f.err != nil {
@@ -784,7 +825,12 @@ func (w *LogWriter) flushPending(
784825
}
785826
if n := len(data); err == nil && n > 0 {
786827
bytesWritten += int64(n)
828+
// Measure write latency
829+
writeStart := crtime.NowMono()
787830
_, err = w.w.Write(data)
831+
if writeLatency := writeStart.Elapsed(); w.flusher.walFileOpHistogram != nil {
832+
w.flusher.walFileOpHistogram.Observe(writeLatency.Seconds() * 1000) // Convert to milliseconds
833+
}
788834
}
789835

790836
synced = !snap.empty()
@@ -811,7 +857,13 @@ func (w *LogWriter) syncWithLatency() (time.Duration, error) {
811857
}
812858

813859
func (w *LogWriter) flushBlock(b *block) error {
814-
if _, err := w.w.Write(b.buf[b.flushed:]); err != nil {
860+
// Measure write latency for block flush
861+
writeStart := crtime.NowMono()
862+
_, err := w.w.Write(b.buf[b.flushed:])
863+
if writeLatency := writeStart.Elapsed(); w.flusher.walFileOpHistogram != nil {
864+
w.flusher.walFileOpHistogram.Observe(writeLatency.Seconds() * 1000) // Convert to milliseconds
865+
}
866+
if err != nil {
815867
return err
816868
}
817869
b.written.Store(0)
@@ -885,8 +937,8 @@ func (w *LogWriter) closeInternal(lastQueuedRecord PendingSyncIndex) error {
885937
syncLatency, err = w.syncWithLatency()
886938
}
887939
f.Lock()
888-
if err == nil && f.fsyncLatency != nil {
889-
f.fsyncLatency.Observe(float64(syncLatency))
940+
if err == nil && f.walFileOpHistogram != nil {
941+
f.walFileOpHistogram.Observe(syncLatency.Seconds() * 1000) // Convert to milliseconds
890942
}
891943
free := w.free.blocks
892944
f.Unlock()
@@ -897,7 +949,12 @@ func (w *LogWriter) closeInternal(lastQueuedRecord PendingSyncIndex) error {
897949
w.pendingSyncsBackingIndex.externalSyncQueueCallback(lastQueuedRecord, err)
898950
}
899951
if w.c != nil {
952+
// Measure close latency
953+
closeStart := crtime.NowMono()
900954
cerr := w.c.Close()
955+
if closeLatency := closeStart.Elapsed(); w.flusher.walFileOpHistogram != nil {
956+
w.flusher.walFileOpHistogram.Observe(closeLatency.Seconds() * 1000) // Convert to milliseconds
957+
}
901958
w.c = nil
902959
err = firstError(err, cerr)
903960
}

record/log_writer_test.go

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,6 @@ func TestSyncError(t *testing.T) {
150150

151151
injectedErr := errors.New("injected error")
152152
w := NewLogWriter(syncErrorFile{f, injectedErr}, 0, LogWriterConfig{
153-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
154153
WriteWALSyncOffsets: func() bool { return false },
155154
})
156155

@@ -193,7 +192,6 @@ func (f *syncFile) Sync() error {
193192
func TestSyncRecord(t *testing.T) {
194193
f := &syncFile{}
195194
w := NewLogWriter(f, 0, LogWriterConfig{
196-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
197195
WriteWALSyncOffsets: func() bool { return false },
198196
})
199197

@@ -221,7 +219,6 @@ func TestSyncRecordWithSignalChan(t *testing.T) {
221219
semChan <- struct{}{}
222220
}
223221
w := NewLogWriter(f, 0, LogWriterConfig{
224-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
225222
QueueSemChan: semChan,
226223
WriteWALSyncOffsets: func() bool { return false },
227224
})
@@ -273,7 +270,6 @@ func TestMinSyncInterval(t *testing.T) {
273270
WALMinSyncInterval: func() time.Duration {
274271
return minSyncInterval
275272
},
276-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
277273
WriteWALSyncOffsets: func() bool { return false },
278274
})
279275

@@ -346,7 +342,6 @@ func TestMinSyncIntervalClose(t *testing.T) {
346342
WALMinSyncInterval: func() time.Duration {
347343
return minSyncInterval
348344
},
349-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
350345
WriteWALSyncOffsets: func() bool { return false },
351346
})
352347

@@ -398,7 +393,6 @@ func TestMetricsWithoutSync(t *testing.T) {
398393
f := &syncFileWithWait{}
399394
f.writeWG.Add(1)
400395
w := NewLogWriter(f, 0, LogWriterConfig{
401-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
402396
WriteWALSyncOffsets: func() bool { return false },
403397
})
404398
offset, err := w.SyncRecord([]byte("hello"), nil, nil)
@@ -444,10 +438,9 @@ func TestMetricsWithSync(t *testing.T) {
444438
})
445439

446440
w := NewLogWriter(f, 0, LogWriterConfig{
447-
WALFsyncLatency: syncLatencyMicros,
448441
WriteWALSyncOffsets: func() bool { return false },
449-
},
450-
)
442+
WALFileOpHistogram: syncLatencyMicros,
443+
})
451444
var wg sync.WaitGroup
452445
wg.Add(100)
453446
for i := 0; i < 100; i++ {
@@ -551,7 +544,6 @@ func TestQueueWALBlocks(t *testing.T) {
551544
return nil
552545
}))
553546
w := NewLogWriter(f, 0, LogWriterConfig{
554-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
555547
WriteWALSyncOffsets: func() bool { return false },
556548
})
557549
const numBlocks = 1024
@@ -662,7 +654,6 @@ func TestSyncRecordGeneralized(t *testing.T) {
662654
lastSync := int64(-1)
663655
cbChan := make(chan struct{}, 2)
664656
w := NewLogWriter(f, 0, LogWriterConfig{
665-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
666657
ExternalSyncQueueCallback: func(doneSync PendingSyncIndex, err error) {
667658
require.NoError(t, err)
668659
require.Equal(t, lastSync+1, doneSync.Index)
@@ -714,7 +705,6 @@ func TestSyncRecordGeneralizedWithCloseError(t *testing.T) {
714705
lastSync := int64(-1)
715706
cbChan := make(chan struct{}, 2)
716707
w := NewLogWriter(f, 0, LogWriterConfig{
717-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
718708
ExternalSyncQueueCallback: func(doneSync PendingSyncIndex, err error) {
719709
if doneSync.Index == 1 {
720710
require.Error(t, err)
@@ -755,7 +745,6 @@ func TestSyncRecordGeneralizedWithCloseError(t *testing.T) {
755745
func writeWALSyncRecords(t *testing.T, numRecords int, recordSizes []int) *syncFile {
756746
f := &syncFile{}
757747
w := NewLogWriter(f, 1, LogWriterConfig{
758-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
759748
WriteWALSyncOffsets: func() bool { return true },
760749
})
761750
var syncErr error
@@ -864,7 +853,6 @@ func BenchmarkQueueWALBlocks(b *testing.B) {
864853
return nil
865854
}))
866855
w := NewLogWriter(f, 0, LogWriterConfig{
867-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
868856
WriteWALSyncOffsets: func() bool { return false },
869857
})
870858

@@ -897,7 +885,6 @@ func BenchmarkWriteWALBlocksAllocs(b *testing.B) {
897885
b.StopTimer()
898886
f := vfstest.DiscardFile
899887
w := NewLogWriter(f, 0, LogWriterConfig{
900-
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
901888
ExternalSyncQueueCallback: func(doneSync PendingSyncIndex, err error) {},
902889
WriteWALSyncOffsets: func() bool { return false },
903890
})

0 commit comments

Comments
 (0)