diff --git a/commit_test.go b/commit_test.go index 617a059d8e..baca9e1d5d 100644 --- a/commit_test.go +++ b/commit_test.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/record" "github.com/cockroachdb/pebble/vfs" - "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" ) @@ -253,7 +252,6 @@ func TestCommitPipelineWALClose(t *testing.T) { } p := newCommitPipeline(testEnv) wal = record.NewLogWriter(sf, 0 /* logNum */, record.LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), QueueSemChan: p.logSyncQSem, WriteWALSyncOffsets: func() bool { return false }, }) @@ -395,7 +393,6 @@ func BenchmarkCommitPipeline(b *testing.B) { p := newCommitPipeline(nullCommitEnv) wal = record.NewLogWriter(io.Discard, 0, /* logNum */ record.LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), QueueSemChan: p.logSyncQSem, WriteWALSyncOffsets: func() bool { return false }, }) diff --git a/db.go b/db.go index 411b7a01bb..63b5860bcb 100644 --- a/db.go +++ b/db.go @@ -33,14 +33,12 @@ import ( "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/objstorage/remote" "github.com/cockroachdb/pebble/rangekey" - "github.com/cockroachdb/pebble/record" "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/sstable/blob" "github.com/cockroachdb/pebble/sstable/block" "github.com/cockroachdb/pebble/vfs/atomicfs" "github.com/cockroachdb/pebble/wal" "github.com/cockroachdb/tokenbucket" - "github.com/prometheus/client_golang/prometheus" ) const ( @@ -376,13 +374,7 @@ type DB struct { // commitPipeline.mu and DB.mu to be held when rotating the WAL/memtable // (i.e. makeRoomForWrite). Can be nil. writer wal.Writer - metrics struct { - // fsyncLatency has its own internal synchronization, and is not - // protected by mu. - fsyncLatency prometheus.Histogram - // Updated whenever a wal.Writer is closed. - record.LogWriterMetrics - } + metrics WALMetrics } mem struct { @@ -1972,8 +1964,9 @@ func (d *DB) Metrics() *Metrics { metrics.BlobFiles.ReferencedValueSize = blobStats.ReferencedValueSize metrics.BlobFiles.ReferencedBackingValueSize = blobStats.ReferencedBackingValueSize - metrics.LogWriter.FsyncLatency = d.mu.log.metrics.fsyncLatency - if err := metrics.LogWriter.Merge(&d.mu.log.metrics.LogWriterMetrics); err != nil { + metrics.WALMetrics.PrimaryFileOpLatency = d.mu.log.metrics.PrimaryFileOpLatency + metrics.WALMetrics.SecondaryFileOpLatency = d.mu.log.metrics.SecondaryFileOpLatency + if err := metrics.WALMetrics.Merge(&d.mu.log.metrics.LogWriterMetrics); err != nil { d.opts.Logger.Errorf("metrics error: %s", err) } metrics.Flush.WriteThroughput = d.mu.compact.flushWriteThroughput diff --git a/metrics.go b/metrics.go index 6940f9a793..fb2577661f 100644 --- a/metrics.go +++ b/metrics.go @@ -486,10 +486,7 @@ type Metrics struct { // deletion. These can be relevant if free disk space is unexplainably low. DeletePacer deletepacer.Metrics - LogWriter struct { - FsyncLatency prometheus.Histogram - record.LogWriterMetrics - } + WALMetrics WALMetrics CategoryStats []block.CategoryStatsAggregate @@ -544,6 +541,28 @@ func (cm *CompressionMetrics) MergeWith(o *CompressionMetrics) { cm.Zstd.Add(o.Zstd) } +// DirectoryContext identifies which WAL directory an operation targets +type DirectoryContext int + +const ( + // DirectoryPrimary indicates operation targets the primary WAL directory + DirectoryPrimary DirectoryContext = iota + // DirectorySecondary indicates operation targets the secondary WAL directory + DirectorySecondary + // DirectoryUnknown indicates directory context is unknown or not applicable + DirectoryUnknown +) + +// WALMetrics contains directory-specific latency histograms for all WAL operations +type WALMetrics struct { + // PrimaryFileOpLatency tracks all file operations for the primary directory + PrimaryFileOpLatency prometheus.Histogram + // SecondaryFileOpLatency tracks all file operations for the secondary directory + SecondaryFileOpLatency prometheus.Histogram + // Updated whenever a wal.Writer is closed + record.LogWriterMetrics +} + var ( // FsyncLatencyBuckets are prometheus histogram buckets suitable for a histogram // that records latencies for fsyncs. diff --git a/open.go b/open.go index 53c4763e09..618b3cd6b8 100644 --- a/open.go +++ b/open.go @@ -289,9 +289,8 @@ func Open(dirname string, opts *Options) (db *DB, err error) { d.mu.mem.queue = append(d.mu.mem.queue, entry) } - d.mu.log.metrics.fsyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{ - Buckets: FsyncLatencyBuckets, - }) + // Initialize WAL metrics structure (histograms populated below) + d.mu.log.metrics = WALMetrics{} walOpts := wal.Options{ Primary: dirs.WALPrimary, @@ -302,17 +301,33 @@ func Open(dirname string, opts *Options) (db *DB, err error) { BytesPerSync: opts.WALBytesPerSync, PreallocateSize: d.walPreallocateSize, MinSyncInterval: opts.WALMinSyncInterval, - FsyncLatency: d.mu.log.metrics.fsyncLatency, QueueSemChan: d.commit.logSyncQSem, Logger: opts.Logger, EventListener: walEventListenerAdaptor{l: opts.EventListener}, WriteWALSyncOffsets: func() bool { return d.FormatMajorVersion() >= FormatWALSyncChunks }, } + + // Create and assign WAL file operation histograms + walPrimaryFileOpHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{ + Buckets: FsyncLatencyBuckets, + }) + d.mu.log.metrics.PrimaryFileOpLatency = walPrimaryFileOpHistogram + walOpts.PrimaryFileOpHistogram = walPrimaryFileOpHistogram + + // Configure failover-specific histograms and options if !opts.ReadOnly && opts.WALFailover != nil { - walOpts.FailoverOptions = opts.WALFailover.FailoverOptions - walOpts.FailoverWriteAndSyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{ + walSecondaryFileOpHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{ + Buckets: FsyncLatencyBuckets, + }) + d.mu.log.metrics.SecondaryFileOpLatency = walSecondaryFileOpHistogram + walOpts.SecondaryFileOpHistogram = walSecondaryFileOpHistogram + + walFailoverWriteAndSyncHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{ Buckets: FsyncLatencyBuckets, }) + walOpts.FailoverWriteAndSyncLatency = walFailoverWriteAndSyncHistogram + + walOpts.FailoverOptions = opts.WALFailover.FailoverOptions } walDirs := walOpts.Dirs() var recoveryDirLocks base.DirLockSet diff --git a/record/log_writer.go b/record/log_writer.go index 03ffe70455..6fe4c35d8e 100644 --- a/record/log_writer.go +++ b/record/log_writer.go @@ -455,8 +455,10 @@ type LogWriter struct { err error // minSyncInterval is the minimum duration between syncs. minSyncInterval durationFunc - fsyncLatency prometheus.Histogram - pending []*block + + // WAL file operation latency histogram + walFileOpHistogram WALFileOpHistogram + pending []*block // Pushing and popping from pendingSyncs does not require flusher mutex to // be held. pendingSyncs pendingSyncs @@ -486,10 +488,17 @@ type LogWriter struct { emitFragment func(n int, p []byte) (remainingP []byte) } +// WALFileOpHistogram is a prometheus histogram for tracking WAL file operation latencies +// (create, write, fsync, close, stat, opendir). +type WALFileOpHistogram = prometheus.Histogram + // LogWriterConfig is a struct used for configuring new LogWriters type LogWriterConfig struct { WALMinSyncInterval durationFunc - WALFsyncLatency prometheus.Histogram + + // WAL file operation latency histogram for this directory + WALFileOpHistogram WALFileOpHistogram + // QueueSemChan is an optional channel to pop from when popping from // LogWriter.flusher.syncQueue. It functions as a semaphore that prevents // the syncQueue from overflowing (which will cause a panic). All production @@ -581,7 +590,7 @@ func NewLogWriter( f := &r.flusher f.minSyncInterval = logWriterConfig.WALMinSyncInterval - f.fsyncLatency = logWriterConfig.WALFsyncLatency + f.walFileOpHistogram = logWriterConfig.WALFileOpHistogram go func() { pprof.Do(context.Background(), walSyncLabels, r.flushLoop) @@ -725,9 +734,9 @@ func (w *LogWriter) flushLoop(context.Context) { writtenOffset += uint64(len(data)) synced, syncLatency, bytesWritten, err := w.flushPending(data, pending, snap) f.Lock() - if synced && f.fsyncLatency != nil { + if synced && f.walFileOpHistogram != nil { w.syncedOffset.Store(writtenOffset) - f.fsyncLatency.Observe(float64(syncLatency)) + f.walFileOpHistogram.Observe(float64(syncLatency)) } f.err = err if f.err != nil { @@ -784,7 +793,12 @@ func (w *LogWriter) flushPending( } if n := len(data); err == nil && n > 0 { bytesWritten += int64(n) + // Measure write latency + writeStart := crtime.NowMono() _, err = w.w.Write(data) + if writeLatency := writeStart.Elapsed(); w.flusher.walFileOpHistogram != nil { + w.flusher.walFileOpHistogram.Observe(float64(writeLatency)) + } } synced = !snap.empty() @@ -811,7 +825,13 @@ func (w *LogWriter) syncWithLatency() (time.Duration, error) { } func (w *LogWriter) flushBlock(b *block) error { - if _, err := w.w.Write(b.buf[b.flushed:]); err != nil { + // Measure write latency for block flush + writeStart := crtime.NowMono() + _, err := w.w.Write(b.buf[b.flushed:]) + if writeLatency := writeStart.Elapsed(); w.flusher.walFileOpHistogram != nil { + w.flusher.walFileOpHistogram.Observe(float64(writeLatency)) + } + if err != nil { return err } b.written.Store(0) @@ -885,8 +905,8 @@ func (w *LogWriter) closeInternal(lastQueuedRecord PendingSyncIndex) error { syncLatency, err = w.syncWithLatency() } f.Lock() - if err == nil && f.fsyncLatency != nil { - f.fsyncLatency.Observe(float64(syncLatency)) + if err == nil && f.walFileOpHistogram != nil { + f.walFileOpHistogram.Observe(float64(syncLatency)) } free := w.free.blocks f.Unlock() @@ -897,7 +917,12 @@ func (w *LogWriter) closeInternal(lastQueuedRecord PendingSyncIndex) error { w.pendingSyncsBackingIndex.externalSyncQueueCallback(lastQueuedRecord, err) } if w.c != nil { + // Measure close latency + closeStart := crtime.NowMono() cerr := w.c.Close() + if closeLatency := closeStart.Elapsed(); w.flusher.walFileOpHistogram != nil { + w.flusher.walFileOpHistogram.Observe(float64(closeLatency)) + } w.c = nil err = firstError(err, cerr) } diff --git a/record/log_writer_test.go b/record/log_writer_test.go index 849bf55b6e..b224ef1e67 100644 --- a/record/log_writer_test.go +++ b/record/log_writer_test.go @@ -150,7 +150,6 @@ func TestSyncError(t *testing.T) { injectedErr := errors.New("injected error") w := NewLogWriter(syncErrorFile{f, injectedErr}, 0, LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), WriteWALSyncOffsets: func() bool { return false }, }) @@ -193,7 +192,6 @@ func (f *syncFile) Sync() error { func TestSyncRecord(t *testing.T) { f := &syncFile{} w := NewLogWriter(f, 0, LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), WriteWALSyncOffsets: func() bool { return false }, }) @@ -221,7 +219,6 @@ func TestSyncRecordWithSignalChan(t *testing.T) { semChan <- struct{}{} } w := NewLogWriter(f, 0, LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), QueueSemChan: semChan, WriteWALSyncOffsets: func() bool { return false }, }) @@ -273,7 +270,6 @@ func TestMinSyncInterval(t *testing.T) { WALMinSyncInterval: func() time.Duration { return minSyncInterval }, - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), WriteWALSyncOffsets: func() bool { return false }, }) @@ -346,7 +342,6 @@ func TestMinSyncIntervalClose(t *testing.T) { WALMinSyncInterval: func() time.Duration { return minSyncInterval }, - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), WriteWALSyncOffsets: func() bool { return false }, }) @@ -398,7 +393,6 @@ func TestMetricsWithoutSync(t *testing.T) { f := &syncFileWithWait{} f.writeWG.Add(1) w := NewLogWriter(f, 0, LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), WriteWALSyncOffsets: func() bool { return false }, }) offset, err := w.SyncRecord([]byte("hello"), nil, nil) @@ -444,10 +438,9 @@ func TestMetricsWithSync(t *testing.T) { }) w := NewLogWriter(f, 0, LogWriterConfig{ - WALFsyncLatency: syncLatencyMicros, WriteWALSyncOffsets: func() bool { return false }, - }, - ) + WALFileOpHistogram: syncLatencyMicros, + }) var wg sync.WaitGroup wg.Add(100) for i := 0; i < 100; i++ { @@ -551,7 +544,6 @@ func TestQueueWALBlocks(t *testing.T) { return nil })) w := NewLogWriter(f, 0, LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), WriteWALSyncOffsets: func() bool { return false }, }) const numBlocks = 1024 @@ -662,7 +654,6 @@ func TestSyncRecordGeneralized(t *testing.T) { lastSync := int64(-1) cbChan := make(chan struct{}, 2) w := NewLogWriter(f, 0, LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), ExternalSyncQueueCallback: func(doneSync PendingSyncIndex, err error) { require.NoError(t, err) require.Equal(t, lastSync+1, doneSync.Index) @@ -714,7 +705,6 @@ func TestSyncRecordGeneralizedWithCloseError(t *testing.T) { lastSync := int64(-1) cbChan := make(chan struct{}, 2) w := NewLogWriter(f, 0, LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), ExternalSyncQueueCallback: func(doneSync PendingSyncIndex, err error) { if doneSync.Index == 1 { require.Error(t, err) @@ -755,7 +745,6 @@ func TestSyncRecordGeneralizedWithCloseError(t *testing.T) { func writeWALSyncRecords(t *testing.T, numRecords int, recordSizes []int) *syncFile { f := &syncFile{} w := NewLogWriter(f, 1, LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), WriteWALSyncOffsets: func() bool { return true }, }) var syncErr error @@ -864,7 +853,6 @@ func BenchmarkQueueWALBlocks(b *testing.B) { return nil })) w := NewLogWriter(f, 0, LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), WriteWALSyncOffsets: func() bool { return false }, }) @@ -897,7 +885,6 @@ func BenchmarkWriteWALBlocksAllocs(b *testing.B) { b.StopTimer() f := vfstest.DiscardFile w := NewLogWriter(f, 0, LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), ExternalSyncQueueCallback: func(doneSync PendingSyncIndex, err error) {}, WriteWALSyncOffsets: func() bool { return false }, }) diff --git a/record/record_test.go b/record/record_test.go index c870a285d1..34ebf16559 100644 --- a/record/record_test.go +++ b/record/record_test.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/pebble/internal/binfmt" "github.com/cockroachdb/pebble/internal/crc" "github.com/cockroachdb/pebble/internal/treeprinter" - "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" ) @@ -96,7 +95,6 @@ func testGenerator(t *testing.T, reset func(), gen func() (string, bool)) { t.Run("LogWriter", func(t *testing.T) { testGeneratorWriter(t, reset, gen, func(w io.Writer) recordWriter { return NewLogWriter(w, 0 /* logNum */, LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), WriteWALSyncOffsets: func() bool { return false }, }) }) @@ -642,7 +640,6 @@ func TestNoLastRecordOffset(t *testing.T) { func TestInvalidLogNum(t *testing.T) { var buf bytes.Buffer w := NewLogWriter(&buf, 1, LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), WriteWALSyncOffsets: func() bool { return false }, }) for i := 0; i < 10; i++ { @@ -739,7 +736,6 @@ func TestRecycleLog(t *testing.T) { } w := NewLogWriter(limitedBuf, base.DiskFileNum(i), LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), WriteWALSyncOffsets: func() bool { return false }, }) sizes := make([]int, 10+rnd.IntN(100)) @@ -785,7 +781,6 @@ func TestRecycleLog(t *testing.T) { func TestTruncatedLog(t *testing.T) { backing := make([]byte, 2*blockSize) w := NewLogWriter(bytes.NewBuffer(backing[:0]), base.DiskFileNum(1), LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), WriteWALSyncOffsets: func() bool { return false }, }) // Write a record that spans 2 blocks. @@ -803,7 +798,6 @@ func TestTruncatedLog(t *testing.T) { func TestRecycleLogWithPartialBlock(t *testing.T) { backing := make([]byte, 27) w := NewLogWriter(bytes.NewBuffer(backing[:0]), base.DiskFileNum(1), LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), WriteWALSyncOffsets: func() bool { return false }, }) // Will write a chunk with 11 byte header + 5 byte payload. @@ -813,7 +807,6 @@ func TestRecycleLogWithPartialBlock(t *testing.T) { require.NoError(t, w.Close()) w = NewLogWriter(bytes.NewBuffer(backing[:0]), base.DiskFileNum(2), LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), WriteWALSyncOffsets: func() bool { return false }, }) // Will write a chunk with 11 byte header + 1 byte payload. @@ -838,7 +831,6 @@ func TestRecycleLogNumberOverflow(t *testing.T) { backing := make([]byte, 27) w := NewLogWriter(bytes.NewBuffer(backing[:0]), base.DiskFileNum(math.MaxUint32), LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), WriteWALSyncOffsets: func() bool { return false }, }) // Will write a chunk with 11 byte header + 5 byte payload. @@ -848,7 +840,6 @@ func TestRecycleLogNumberOverflow(t *testing.T) { require.NoError(t, w.Close()) w = NewLogWriter(bytes.NewBuffer(backing[:0]), base.DiskFileNum(math.MaxUint32+1), LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), WriteWALSyncOffsets: func() bool { return false }, }) // Will write a chunk with 11 byte header + 1 byte payload. @@ -872,7 +863,6 @@ func TestRecycleLogWithPartialRecord(t *testing.T) { // Write a record that is larger than the log block size. backing1 := make([]byte, 2*blockSize) w := NewLogWriter(bytes.NewBuffer(backing1[:0]), base.DiskFileNum(1), LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), WriteWALSyncOffsets: func() bool { return false }, }) _, err := w.WriteRecord(bytes.Repeat([]byte("a"), recordSize)) @@ -883,7 +873,6 @@ func TestRecycleLogWithPartialRecord(t *testing.T) { // the block size. backing2 := make([]byte, 2*blockSize) w = NewLogWriter(bytes.NewBuffer(backing2[:0]), base.DiskFileNum(2), LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), WriteWALSyncOffsets: func() bool { return false }, }) _, err = w.WriteRecord(bytes.Repeat([]byte("b"), recordSize)) @@ -1103,7 +1092,6 @@ func BenchmarkRecordWrite(b *testing.B) { for _, size := range []int{8, 16, 32, 64, 256, 1028, 4096, 65_536} { b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) { w := NewLogWriter(io.Discard, 0 /* logNum */, LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), WriteWALSyncOffsets: func() bool { return false }, }) defer w.Close() diff --git a/vfs/disk_health.go b/vfs/disk_health.go index 4492e5f9b3..f4f3b8fe4c 100644 --- a/vfs/disk_health.go +++ b/vfs/disk_health.go @@ -466,15 +466,20 @@ func (i DiskSlowInfo) String() string { // SafeFormat implements redact.SafeFormatter. func (i DiskSlowInfo) SafeFormat(w redact.SafePrinter, _ rune) { + fullPath, err := filepath.Abs(i.Path) + if err != nil { + w.Printf("failed to get absolute path %s: %s", redact.Safe(i.Path), err) + fullPath = i.Path + } switch i.OpType { // Operations for which i.WriteSize is meaningful. case OpTypeWrite, OpTypeSyncTo, OpTypePreallocate: w.Printf("disk slowness detected: %s on file %s (%d bytes) has been ongoing for %0.1fs", - redact.Safe(i.OpType.String()), redact.Safe(filepath.Base(i.Path)), + redact.Safe(i.OpType.String()), redact.Safe(fullPath), redact.Safe(i.WriteSize), redact.Safe(i.Duration.Seconds())) default: w.Printf("disk slowness detected: %s on file %s has been ongoing for %0.1fs", - redact.Safe(i.OpType.String()), redact.Safe(filepath.Base(i.Path)), + redact.Safe(i.OpType.String()), redact.Safe(fullPath), redact.Safe(i.Duration.Seconds())) } } diff --git a/vfs/disk_health_test.go b/vfs/disk_health_test.go index 10191617b0..3394364677 100644 --- a/vfs/disk_health_test.go +++ b/vfs/disk_health_test.go @@ -635,3 +635,24 @@ func TestDiskHealthChecking_Filesystem_Close(t *testing.T) { } wg.Wait() } + +func TestDiskSlowInfo(t *testing.T) { + info := DiskSlowInfo{ + Path: "/some/really/deep/path/to/wal/000123.log", + OpType: OpTypeWrite, + WriteSize: 123, + Duration: 5 * time.Second, + } + + result := info.String() + + // Essential information should be present + require.Contains(t, result, "disk slowness detected") + require.Contains(t, result, "write") + require.Contains(t, result, "/some/really/deep/path/to/wal/000123.log") + require.Contains(t, result, "(123 bytes)") + require.Contains(t, result, "5.0s") + + // Should not contain just the filename without full path + require.NotContains(t, result, " 000123.log ") +} diff --git a/wal/failover_manager.go b/wal/failover_manager.go index 36c1eb9764..e97a3b35ff 100644 --- a/wal/failover_manager.go +++ b/wal/failover_manager.go @@ -14,6 +14,7 @@ import ( "sync" "time" + "github.com/cockroachdb/crlib/crtime" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/invariants" @@ -506,10 +507,20 @@ func (wm *failoverManager) init(o Options, initial Logs) error { var dirs [numDirIndices]dirAndFileHandle for i, dir := range []Dir{o.Primary, o.Secondary} { dirs[i].Dir = dir + + // measure directory open operation + openStart := crtime.NowMono() f, err := dir.FS.OpenDir(dir.Dirname) if err != nil { return err } + openLatency := openStart.Elapsed() + if dirIndex(i) == primaryDirIndex && o.PrimaryFileOpHistogram != nil { + o.PrimaryFileOpHistogram.Observe(float64(openLatency)) + } else if dirIndex(i) == secondaryDirIndex && o.SecondaryFileOpHistogram != nil { + o.SecondaryFileOpHistogram.Observe(float64(openLatency)) + } + dirs[i].File = f } fmOpts := failoverMonitorOptions{ @@ -633,7 +644,10 @@ func (wm *failoverManager) Create(wn NumWAL, jobID int) (Writer, error) { bytesPerSync: wm.opts.BytesPerSync, preallocateSize: wm.opts.PreallocateSize, minSyncInterval: wm.opts.MinSyncInterval, - fsyncLatency: wm.opts.FsyncLatency, + primaryDir: wm.opts.Primary, + secondaryDir: wm.opts.Secondary, + primaryFileOpHistogram: wm.opts.PrimaryFileOpHistogram, + secondaryFileOpHistogram: wm.opts.SecondaryFileOpHistogram, queueSemChan: wm.opts.QueueSemChan, stopper: wm.stopper, failoverWriteAndSyncLatency: wm.opts.FailoverWriteAndSyncLatency, @@ -798,7 +812,14 @@ func (wm *failoverManager) logCreator( createInfo.RecycledFileNum = recycleLog.FileNum recycleLogName := dir.FS.PathJoin(dir.Dirname, makeLogFilename(NumWAL(recycleLog.FileNum), 0)) r.writeStart() + reuseStart := crtime.NowMono() logFile, err = dir.FS.ReuseForWrite(recycleLogName, logFilename, "pebble-wal") + reuseLatency := reuseStart.Elapsed() + if isPrimary && wm.opts.PrimaryFileOpHistogram != nil { + wm.opts.PrimaryFileOpHistogram.Observe(float64(reuseLatency)) + } else if !isPrimary && wm.opts.SecondaryFileOpHistogram != nil { + wm.opts.SecondaryFileOpHistogram.Observe(float64(reuseLatency)) + } r.writeEnd(err) // TODO(sumeer): should we fatal since primary dir? At some point it is // better to fatal instead of continuing to failover. @@ -819,7 +840,15 @@ func (wm *failoverManager) logCreator( // indicating whether or not the file was actually reused would allow us // to skip the stat and use recycleLog.FileSize. var finfo os.FileInfo + // Instrument file stat operation + statStart := crtime.NowMono() finfo, err = logFile.Stat() + statLatency := statStart.Elapsed() + if isPrimary && wm.opts.PrimaryFileOpHistogram != nil { + wm.opts.PrimaryFileOpHistogram.Observe(float64(statLatency)) + } else if !isPrimary && wm.opts.SecondaryFileOpHistogram != nil { + wm.opts.SecondaryFileOpHistogram.Observe(float64(statLatency)) + } if err != nil { logFile.Close() return nil, 0, err @@ -832,7 +861,14 @@ func (wm *failoverManager) logCreator( // // Create file. r.writeStart() + createStart := crtime.NowMono() logFile, err = dir.FS.Create(logFilename, "pebble-wal") + createLatency := createStart.Elapsed() + if isPrimary && wm.opts.PrimaryFileOpHistogram != nil { + wm.opts.PrimaryFileOpHistogram.Observe(float64(createLatency)) + } else if !isPrimary && wm.opts.SecondaryFileOpHistogram != nil { + wm.opts.SecondaryFileOpHistogram.Observe(float64(createLatency)) + } r.writeEnd(err) return logFile, 0, err } diff --git a/wal/failover_manager_test.go b/wal/failover_manager_test.go index af5b5ec8c3..cd3b00aa9b 100644 --- a/wal/failover_manager_test.go +++ b/wal/failover_manager_test.go @@ -350,7 +350,6 @@ func TestManagerFailover(t *testing.T) { BytesPerSync: 0, PreallocateSize: func() int { return 0 }, MinSyncInterval: nil, - FsyncLatency: nil, QueueSemChan: nil, Logger: nil, EventListener: nil, diff --git a/wal/failover_writer.go b/wal/failover_writer.go index b611bf398a..1c0563141d 100644 --- a/wal/failover_writer.go +++ b/wal/failover_writer.go @@ -455,9 +455,15 @@ type failoverWriterOpts struct { // Options for record.LogWriter. minSyncInterval func() time.Duration - fsyncLatency prometheus.Histogram - queueSemChan chan struct{} - stopper *stopper + + // WAL file operation latency histograms + primaryDir Dir + secondaryDir Dir + primaryFileOpHistogram record.WALFileOpHistogram + secondaryFileOpHistogram record.WALFileOpHistogram + + queueSemChan chan struct{} + stopper *stopper failoverWriteAndSyncLatency prometheus.Histogram // writerClosed is a callback invoked by the FailoverWriter when it's @@ -648,10 +654,22 @@ func (ww *failoverWriter) switchToNewDir(dir dirAndFileHandle) error { // map to a single NumWAL, a file used for NumWAL n at index m will // never get recycled for NumWAL n at a later index (since recycling // happens when n as a whole is obsolete). + + // Determine which directory we're using and select the appropriate histogram + var histogram record.WALFileOpHistogram + switch dir.Dir { + case ww.opts.primaryDir: + histogram = ww.opts.primaryFileOpHistogram + case ww.opts.secondaryDir: + histogram = ww.opts.secondaryFileOpHistogram + default: + histogram = nil + } + w := record.NewLogWriter(recorderAndWriter, base.DiskFileNum(ww.opts.wn), record.LogWriterConfig{ WALMinSyncInterval: ww.opts.minSyncInterval, - WALFsyncLatency: ww.opts.fsyncLatency, + WALFileOpHistogram: histogram, QueueSemChan: ww.opts.queueSemChan, ExternalSyncQueueCallback: ww.doneSyncCallback, WriteWALSyncOffsets: ww.opts.writeWALSyncOffsets, diff --git a/wal/standalone_manager.go b/wal/standalone_manager.go index 7396550db4..c2dc4cd06b 100644 --- a/wal/standalone_manager.go +++ b/wal/standalone_manager.go @@ -9,6 +9,7 @@ import ( "slices" "sync" + "github.com/cockroachdb/crlib/crtime" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/record" "github.com/cockroachdb/pebble/vfs" @@ -53,9 +54,15 @@ func (m *StandaloneManager) init(o Options, initial Logs) error { } var err error var walDir vfs.File + + // measure directory open operation + openStart := crtime.NowMono() if walDir, err = o.Primary.FS.OpenDir(o.Primary.Dirname); err != nil { return err } + if openLatency := openStart.Elapsed(); o.PrimaryFileOpHistogram != nil { + o.PrimaryFileOpHistogram.Observe(float64(openLatency)) + } *m = StandaloneManager{ o: o, walDir: walDir, @@ -147,10 +154,20 @@ func (m *StandaloneManager) Create(wn NumWAL, jobID int) (Writer, error) { recycleLog, recycleOK = m.recycler.Peek() if recycleOK { recycleLogName := m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, makeLogFilename(NumWAL(recycleLog.FileNum), 0)) + // measure file reuse operation + reuseStart := crtime.NowMono() newLogFile, err = m.o.Primary.FS.ReuseForWrite(recycleLogName, newLogName, "pebble-wal") + if reuseLatency := reuseStart.Elapsed(); m.o.PrimaryFileOpHistogram != nil { + m.o.PrimaryFileOpHistogram.Observe(float64(reuseLatency)) + } base.MustExist(m.o.Primary.FS, newLogName, m.o.Logger, err) } else { + // measure file creation operation + createStart := crtime.NowMono() newLogFile, err = m.o.Primary.FS.Create(newLogName, "pebble-wal") + if createLatency := createStart.Elapsed(); m.o.PrimaryFileOpHistogram != nil { + m.o.PrimaryFileOpHistogram.Observe(float64(createLatency)) + } base.MustExist(m.o.Primary.FS, newLogName, m.o.Logger, err) } createInfo := CreateInfo{ @@ -163,7 +180,9 @@ func (m *StandaloneManager) Create(wn NumWAL, jobID int) (Writer, error) { } defer func() { createInfo.Err = err - m.o.EventListener.LogCreated(createInfo) + if m.o.EventListener != nil { + m.o.EventListener.LogCreated(createInfo) + } }() if err != nil { @@ -180,7 +199,12 @@ func (m *StandaloneManager) Create(wn NumWAL, jobID int) (Writer, error) { // reused would allow us to skip the stat and use // recycleLog.FileSize. var finfo os.FileInfo + // measure file stat operation + statStart := crtime.NowMono() finfo, err = newLogFile.Stat() + if statLatency := statStart.Elapsed(); m.o.PrimaryFileOpHistogram != nil { + m.o.PrimaryFileOpHistogram.Observe(float64(statLatency)) + } if err == nil { newLogSize = uint64(finfo.Size()) } @@ -201,8 +225,8 @@ func (m *StandaloneManager) Create(wn NumWAL, jobID int) (Writer, error) { PreallocateSize: m.o.PreallocateSize(), }) w := record.NewLogWriter(newLogFile, newLogNum, record.LogWriterConfig{ - WALFsyncLatency: m.o.FsyncLatency, WALMinSyncInterval: m.o.MinSyncInterval, + WALFileOpHistogram: m.o.PrimaryFileOpHistogram, QueueSemChan: m.o.QueueSemChan, WriteWALSyncOffsets: m.o.WriteWALSyncOffsets, }) diff --git a/wal/wal.go b/wal/wal.go index acdc232fa5..2cf3047de0 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -123,11 +123,10 @@ type Options struct { // MinSyncInterval is documented in Options.WALMinSyncInterval. MinSyncInterval func() time.Duration - // FsyncLatency records fsync latency. This doesn't differentiate between - // fsyncs on the primary and secondary dir. - // - // TODO(sumeer): consider separating out into two histograms. - FsyncLatency prometheus.Histogram + + // WAL file operation latency histograms + PrimaryFileOpHistogram record.WALFileOpHistogram + SecondaryFileOpHistogram record.WALFileOpHistogram // QueueSemChan is the channel to pop from when popping from queued records // that have requested a sync. It's original purpose was to function as a // semaphore that prevents the record.LogWriter.flusher.syncQueue from