Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/record"
"github.com/cockroachdb/pebble/vfs"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -253,7 +252,6 @@ func TestCommitPipelineWALClose(t *testing.T) {
}
p := newCommitPipeline(testEnv)
wal = record.NewLogWriter(sf, 0 /* logNum */, record.LogWriterConfig{
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
QueueSemChan: p.logSyncQSem,
WriteWALSyncOffsets: func() bool { return false },
})
Expand Down Expand Up @@ -395,7 +393,6 @@ func BenchmarkCommitPipeline(b *testing.B) {
p := newCommitPipeline(nullCommitEnv)
wal = record.NewLogWriter(io.Discard, 0, /* logNum */
record.LogWriterConfig{
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
QueueSemChan: p.logSyncQSem,
WriteWALSyncOffsets: func() bool { return false },
})
Expand Down
15 changes: 4 additions & 11 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,12 @@ import (
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/objstorage/remote"
"github.com/cockroachdb/pebble/rangekey"
"github.com/cockroachdb/pebble/record"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/sstable/blob"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/cockroachdb/pebble/vfs/atomicfs"
"github.com/cockroachdb/pebble/wal"
"github.com/cockroachdb/tokenbucket"
"github.com/prometheus/client_golang/prometheus"
)

const (
Expand Down Expand Up @@ -376,13 +374,7 @@ type DB struct {
// commitPipeline.mu and DB.mu to be held when rotating the WAL/memtable
// (i.e. makeRoomForWrite). Can be nil.
writer wal.Writer
metrics struct {
// fsyncLatency has its own internal synchronization, and is not
// protected by mu.
fsyncLatency prometheus.Histogram
// Updated whenever a wal.Writer is closed.
record.LogWriterMetrics
}
metrics WALMetrics
}

mem struct {
Expand Down Expand Up @@ -1972,8 +1964,9 @@ func (d *DB) Metrics() *Metrics {
metrics.BlobFiles.ReferencedValueSize = blobStats.ReferencedValueSize
metrics.BlobFiles.ReferencedBackingValueSize = blobStats.ReferencedBackingValueSize

metrics.LogWriter.FsyncLatency = d.mu.log.metrics.fsyncLatency
if err := metrics.LogWriter.Merge(&d.mu.log.metrics.LogWriterMetrics); err != nil {
metrics.WALMetrics.PrimaryFileOpLatency = d.mu.log.metrics.PrimaryFileOpLatency
metrics.WALMetrics.SecondaryFileOpLatency = d.mu.log.metrics.SecondaryFileOpLatency
if err := metrics.WALMetrics.Merge(&d.mu.log.metrics.LogWriterMetrics); err != nil {
d.opts.Logger.Errorf("metrics error: %s", err)
}
metrics.Flush.WriteThroughput = d.mu.compact.flushWriteThroughput
Expand Down
27 changes: 23 additions & 4 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,10 +486,7 @@ type Metrics struct {
// deletion. These can be relevant if free disk space is unexplainably low.
DeletePacer deletepacer.Metrics

LogWriter struct {
FsyncLatency prometheus.Histogram
record.LogWriterMetrics
}
WALMetrics WALMetrics

CategoryStats []block.CategoryStatsAggregate

Expand Down Expand Up @@ -544,6 +541,28 @@ func (cm *CompressionMetrics) MergeWith(o *CompressionMetrics) {
cm.Zstd.Add(o.Zstd)
}

// DirectoryContext identifies which WAL directory an operation targets
type DirectoryContext int

const (
// DirectoryPrimary indicates operation targets the primary WAL directory
DirectoryPrimary DirectoryContext = iota
// DirectorySecondary indicates operation targets the secondary WAL directory
DirectorySecondary
// DirectoryUnknown indicates directory context is unknown or not applicable
DirectoryUnknown
)

// WALMetrics contains directory-specific latency histograms for all WAL operations
type WALMetrics struct {
// PrimaryFileOpLatency tracks all file operations for the primary directory
PrimaryFileOpLatency prometheus.Histogram
// SecondaryFileOpLatency tracks all file operations for the secondary directory
SecondaryFileOpLatency prometheus.Histogram
// Updated whenever a wal.Writer is closed
record.LogWriterMetrics
}

var (
// FsyncLatencyBuckets are prometheus histogram buckets suitable for a histogram
// that records latencies for fsyncs.
Expand Down
27 changes: 21 additions & 6 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,8 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
d.mu.mem.queue = append(d.mu.mem.queue, entry)
}

d.mu.log.metrics.fsyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
Buckets: FsyncLatencyBuckets,
})
// Initialize WAL metrics structure (histograms populated below)
d.mu.log.metrics = WALMetrics{}

walOpts := wal.Options{
Primary: dirs.WALPrimary,
Expand All @@ -302,17 +301,33 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
BytesPerSync: opts.WALBytesPerSync,
PreallocateSize: d.walPreallocateSize,
MinSyncInterval: opts.WALMinSyncInterval,
FsyncLatency: d.mu.log.metrics.fsyncLatency,
QueueSemChan: d.commit.logSyncQSem,
Logger: opts.Logger,
EventListener: walEventListenerAdaptor{l: opts.EventListener},
WriteWALSyncOffsets: func() bool { return d.FormatMajorVersion() >= FormatWALSyncChunks },
}

// Create and assign WAL file operation histograms
walPrimaryFileOpHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{
Buckets: FsyncLatencyBuckets,
})
d.mu.log.metrics.PrimaryFileOpLatency = walPrimaryFileOpHistogram
walOpts.PrimaryFileOpHistogram = walPrimaryFileOpHistogram

// Configure failover-specific histograms and options
if !opts.ReadOnly && opts.WALFailover != nil {
walOpts.FailoverOptions = opts.WALFailover.FailoverOptions
walOpts.FailoverWriteAndSyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
walSecondaryFileOpHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{
Buckets: FsyncLatencyBuckets,
})
d.mu.log.metrics.SecondaryFileOpLatency = walSecondaryFileOpHistogram
walOpts.SecondaryFileOpHistogram = walSecondaryFileOpHistogram

walFailoverWriteAndSyncHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{
Buckets: FsyncLatencyBuckets,
})
walOpts.FailoverWriteAndSyncLatency = walFailoverWriteAndSyncHistogram

walOpts.FailoverOptions = opts.WALFailover.FailoverOptions
}
walDirs := walOpts.Dirs()
var recoveryDirLocks base.DirLockSet
Expand Down
43 changes: 34 additions & 9 deletions record/log_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,10 @@ type LogWriter struct {
err error
// minSyncInterval is the minimum duration between syncs.
minSyncInterval durationFunc
fsyncLatency prometheus.Histogram
pending []*block

// WAL file operation latency histogram
walFileOpHistogram WALFileOpHistogram
pending []*block
// Pushing and popping from pendingSyncs does not require flusher mutex to
// be held.
pendingSyncs pendingSyncs
Expand Down Expand Up @@ -486,10 +488,17 @@ type LogWriter struct {
emitFragment func(n int, p []byte) (remainingP []byte)
}

// WALFileOpHistogram is a prometheus histogram for tracking WAL file operation latencies
// (create, write, fsync, close, stat, opendir).
type WALFileOpHistogram = prometheus.Histogram

// LogWriterConfig is a struct used for configuring new LogWriters
type LogWriterConfig struct {
WALMinSyncInterval durationFunc
WALFsyncLatency prometheus.Histogram

// WAL file operation latency histogram for this directory
WALFileOpHistogram WALFileOpHistogram

// QueueSemChan is an optional channel to pop from when popping from
// LogWriter.flusher.syncQueue. It functions as a semaphore that prevents
// the syncQueue from overflowing (which will cause a panic). All production
Expand Down Expand Up @@ -581,7 +590,7 @@ func NewLogWriter(

f := &r.flusher
f.minSyncInterval = logWriterConfig.WALMinSyncInterval
f.fsyncLatency = logWriterConfig.WALFsyncLatency
f.walFileOpHistogram = logWriterConfig.WALFileOpHistogram

go func() {
pprof.Do(context.Background(), walSyncLabels, r.flushLoop)
Expand Down Expand Up @@ -725,9 +734,9 @@ func (w *LogWriter) flushLoop(context.Context) {
writtenOffset += uint64(len(data))
synced, syncLatency, bytesWritten, err := w.flushPending(data, pending, snap)
f.Lock()
if synced && f.fsyncLatency != nil {
if synced && f.walFileOpHistogram != nil {
w.syncedOffset.Store(writtenOffset)
f.fsyncLatency.Observe(float64(syncLatency))
f.walFileOpHistogram.Observe(float64(syncLatency))
}
f.err = err
if f.err != nil {
Expand Down Expand Up @@ -784,7 +793,12 @@ func (w *LogWriter) flushPending(
}
if n := len(data); err == nil && n > 0 {
bytesWritten += int64(n)
// Measure write latency
writeStart := crtime.NowMono()
_, err = w.w.Write(data)
if writeLatency := writeStart.Elapsed(); w.flusher.walFileOpHistogram != nil {
w.flusher.walFileOpHistogram.Observe(float64(writeLatency))
}
}

synced = !snap.empty()
Expand All @@ -811,7 +825,13 @@ func (w *LogWriter) syncWithLatency() (time.Duration, error) {
}

func (w *LogWriter) flushBlock(b *block) error {
if _, err := w.w.Write(b.buf[b.flushed:]); err != nil {
// Measure write latency for block flush
writeStart := crtime.NowMono()
_, err := w.w.Write(b.buf[b.flushed:])
if writeLatency := writeStart.Elapsed(); w.flusher.walFileOpHistogram != nil {
w.flusher.walFileOpHistogram.Observe(float64(writeLatency))
}
if err != nil {
return err
}
b.written.Store(0)
Expand Down Expand Up @@ -885,8 +905,8 @@ func (w *LogWriter) closeInternal(lastQueuedRecord PendingSyncIndex) error {
syncLatency, err = w.syncWithLatency()
}
f.Lock()
if err == nil && f.fsyncLatency != nil {
f.fsyncLatency.Observe(float64(syncLatency))
if err == nil && f.walFileOpHistogram != nil {
f.walFileOpHistogram.Observe(float64(syncLatency))
}
free := w.free.blocks
f.Unlock()
Expand All @@ -897,7 +917,12 @@ func (w *LogWriter) closeInternal(lastQueuedRecord PendingSyncIndex) error {
w.pendingSyncsBackingIndex.externalSyncQueueCallback(lastQueuedRecord, err)
}
if w.c != nil {
// Measure close latency
closeStart := crtime.NowMono()
cerr := w.c.Close()
if closeLatency := closeStart.Elapsed(); w.flusher.walFileOpHistogram != nil {
w.flusher.walFileOpHistogram.Observe(float64(closeLatency))
}
w.c = nil
err = firstError(err, cerr)
}
Expand Down
17 changes: 2 additions & 15 deletions record/log_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ func TestSyncError(t *testing.T) {

injectedErr := errors.New("injected error")
w := NewLogWriter(syncErrorFile{f, injectedErr}, 0, LogWriterConfig{
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
WriteWALSyncOffsets: func() bool { return false },
})

Expand Down Expand Up @@ -193,7 +192,6 @@ func (f *syncFile) Sync() error {
func TestSyncRecord(t *testing.T) {
f := &syncFile{}
w := NewLogWriter(f, 0, LogWriterConfig{
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
WriteWALSyncOffsets: func() bool { return false },
})

Expand Down Expand Up @@ -221,7 +219,6 @@ func TestSyncRecordWithSignalChan(t *testing.T) {
semChan <- struct{}{}
}
w := NewLogWriter(f, 0, LogWriterConfig{
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
QueueSemChan: semChan,
WriteWALSyncOffsets: func() bool { return false },
})
Expand Down Expand Up @@ -273,7 +270,6 @@ func TestMinSyncInterval(t *testing.T) {
WALMinSyncInterval: func() time.Duration {
return minSyncInterval
},
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
WriteWALSyncOffsets: func() bool { return false },
})

Expand Down Expand Up @@ -346,7 +342,6 @@ func TestMinSyncIntervalClose(t *testing.T) {
WALMinSyncInterval: func() time.Duration {
return minSyncInterval
},
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
WriteWALSyncOffsets: func() bool { return false },
})

Expand Down Expand Up @@ -398,7 +393,6 @@ func TestMetricsWithoutSync(t *testing.T) {
f := &syncFileWithWait{}
f.writeWG.Add(1)
w := NewLogWriter(f, 0, LogWriterConfig{
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
WriteWALSyncOffsets: func() bool { return false },
})
offset, err := w.SyncRecord([]byte("hello"), nil, nil)
Expand Down Expand Up @@ -444,10 +438,9 @@ func TestMetricsWithSync(t *testing.T) {
})

w := NewLogWriter(f, 0, LogWriterConfig{
WALFsyncLatency: syncLatencyMicros,
WriteWALSyncOffsets: func() bool { return false },
},
)
WALFileOpHistogram: syncLatencyMicros,
})
var wg sync.WaitGroup
wg.Add(100)
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -551,7 +544,6 @@ func TestQueueWALBlocks(t *testing.T) {
return nil
}))
w := NewLogWriter(f, 0, LogWriterConfig{
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
WriteWALSyncOffsets: func() bool { return false },
})
const numBlocks = 1024
Expand Down Expand Up @@ -662,7 +654,6 @@ func TestSyncRecordGeneralized(t *testing.T) {
lastSync := int64(-1)
cbChan := make(chan struct{}, 2)
w := NewLogWriter(f, 0, LogWriterConfig{
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
ExternalSyncQueueCallback: func(doneSync PendingSyncIndex, err error) {
require.NoError(t, err)
require.Equal(t, lastSync+1, doneSync.Index)
Expand Down Expand Up @@ -714,7 +705,6 @@ func TestSyncRecordGeneralizedWithCloseError(t *testing.T) {
lastSync := int64(-1)
cbChan := make(chan struct{}, 2)
w := NewLogWriter(f, 0, LogWriterConfig{
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
ExternalSyncQueueCallback: func(doneSync PendingSyncIndex, err error) {
if doneSync.Index == 1 {
require.Error(t, err)
Expand Down Expand Up @@ -755,7 +745,6 @@ func TestSyncRecordGeneralizedWithCloseError(t *testing.T) {
func writeWALSyncRecords(t *testing.T, numRecords int, recordSizes []int) *syncFile {
f := &syncFile{}
w := NewLogWriter(f, 1, LogWriterConfig{
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
WriteWALSyncOffsets: func() bool { return true },
})
var syncErr error
Expand Down Expand Up @@ -864,7 +853,6 @@ func BenchmarkQueueWALBlocks(b *testing.B) {
return nil
}))
w := NewLogWriter(f, 0, LogWriterConfig{
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
WriteWALSyncOffsets: func() bool { return false },
})

Expand Down Expand Up @@ -897,7 +885,6 @@ func BenchmarkWriteWALBlocksAllocs(b *testing.B) {
b.StopTimer()
f := vfstest.DiscardFile
w := NewLogWriter(f, 0, LogWriterConfig{
WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
ExternalSyncQueueCallback: func(doneSync PendingSyncIndex, err error) {},
WriteWALSyncOffsets: func() bool { return false },
})
Expand Down
Loading