Skip to content

Commit 1832dbd

Browse files
committed
logstore: make the caller responsible for metrics
Epic: none Release note: none
1 parent 345c50c commit 1832dbd

File tree

5 files changed

+20
-51
lines changed

5 files changed

+20
-51
lines changed

pkg/kv/kvserver/client_manual_proposal_test.go

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"path/filepath"
1313
"sync"
1414
"testing"
15-
"time"
1615

1716
"github.com/cockroachdb/cockroach/pkg/base"
1817
"github.com/cockroachdb/cockroach/pkg/keys"
@@ -35,7 +34,6 @@ import (
3534
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
3635
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
3736
"github.com/cockroachdb/cockroach/pkg/util/log"
38-
"github.com/cockroachdb/cockroach/pkg/util/metric"
3937
"github.com/cockroachdb/cockroach/pkg/util/stop"
4038
"github.com/stretchr/testify/require"
4139
)
@@ -215,9 +213,6 @@ LIMIT
215213
Responses: []raftpb.Message{{}}, // need >0 responses so StoreEntries will sync
216214
}
217215

218-
fakeMeta := metric.Metadata{
219-
Name: "fake.meta",
220-
}
221216
swl := logstore.NewSyncWaiterLoop()
222217
stopper := stop.NewStopper()
223218
defer stopper.Stop(ctx)
@@ -230,14 +225,6 @@ LIMIT
230225
SyncWaiter: swl,
231226
EntryCache: raftentry.NewCache(1024),
232227
Settings: st,
233-
Metrics: logstore.Metrics{
234-
RaftLogCommitLatency: metric.NewHistogram(metric.HistogramOptions{
235-
Mode: metric.HistogramModePrometheus,
236-
Metadata: fakeMeta,
237-
Duration: time.Millisecond,
238-
BucketConfig: metric.IOLatencyBuckets,
239-
}),
240-
},
241228
}
242229

243230
wg := &sync.WaitGroup{}
@@ -257,8 +244,6 @@ LIMIT
257244

258245
type wgSyncCallback sync.WaitGroup
259246

260-
func (w *wgSyncCallback) OnLogSync(
261-
context.Context, raft.StorageAppendAck, storage.BatchCommitStats,
262-
) {
247+
func (w *wgSyncCallback) OnLogSync(context.Context, raft.StorageAppendAck, logstore.WriteStats) {
263248
(*sync.WaitGroup)(w).Done()
264249
}

pkg/kv/kvserver/logstore/logstore.go

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"math/rand"
1313
"slices"
1414
"sync"
15+
"time"
1516

1617
"github.com/cockroachdb/cockroach/pkg/keys"
1718
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
@@ -31,7 +32,6 @@ import (
3132
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3233
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
3334
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
34-
"github.com/cockroachdb/cockroach/pkg/util/metric"
3535
"github.com/cockroachdb/crlib/crtime"
3636
"github.com/cockroachdb/errors"
3737
)
@@ -119,9 +119,10 @@ type AppendStats struct {
119119
NonBlocking bool
120120
}
121121

122-
// Metrics contains metrics specific to the log storage.
123-
type Metrics struct {
124-
RaftLogCommitLatency metric.IHistogram
122+
// WriteStats contains stats about a write to raft storage.
123+
type WriteStats struct {
124+
CommitDur time.Duration
125+
storage.BatchCommitStats
125126
}
126127

127128
// LogStore is a stub of a separated Raft log storage.
@@ -133,7 +134,6 @@ type LogStore struct {
133134
SyncWaiter *SyncWaiterLoop
134135
EntryCache *raftentry.Cache
135136
Settings *cluster.Settings
136-
Metrics Metrics
137137

138138
DisableSyncLogWriteToss bool // for testing only
139139
}
@@ -146,7 +146,7 @@ type LogStore struct {
146146
//
147147
// commitStats is populated iff this was a non-blocking sync.
148148
type SyncCallback interface {
149-
OnLogSync(context.Context, raft.StorageAppendAck, storage.BatchCommitStats)
149+
OnLogSync(context.Context, raft.StorageAppendAck, WriteStats)
150150
}
151151

152152
func newStoreEntriesBatch(eng storage.Engine) storage.Batch {
@@ -273,7 +273,6 @@ func (s *LogStore) storeEntriesAndCommitBatch(
273273
cb: cb,
274274
onDone: m.Ack(),
275275
batch: batch,
276-
metrics: s.Metrics,
277276
logCommitBegin: stats.PebbleBegin,
278277
}
279278
s.SyncWaiter.enqueue(ctx, batch, waiterCallback)
@@ -287,9 +286,8 @@ func (s *LogStore) storeEntriesAndCommitBatch(
287286
stats.PebbleEnd = crtime.NowMono()
288287
stats.PebbleCommitStats = batch.CommitStats()
289288
if wantsSync {
290-
logCommitEnd := stats.PebbleEnd
291-
s.Metrics.RaftLogCommitLatency.RecordValue(logCommitEnd.Sub(stats.PebbleBegin).Nanoseconds())
292-
cb.OnLogSync(ctx, m.Ack(), storage.BatchCommitStats{})
289+
commitDur := stats.PebbleEnd.Sub(stats.PebbleBegin)
290+
cb.OnLogSync(ctx, m.Ack(), WriteStats{CommitDur: commitDur})
293291
}
294292
}
295293
stats.Sync = wantsSync
@@ -344,17 +342,16 @@ type nonBlockingSyncWaiterCallback struct {
344342
onDone raft.StorageAppendAck
345343
// Used to extract stats. This is the batch that has been synced.
346344
batch storage.WriteBatch
347-
// Used to record Metrics.
348-
metrics Metrics
345+
// Used to measure raft storage write/sync latency.
349346
logCommitBegin crtime.Mono
350347
}
351348

352349
// run is the callback's logic. It is executed on the SyncWaiterLoop goroutine.
353350
func (cb *nonBlockingSyncWaiterCallback) run() {
354-
dur := cb.logCommitBegin.Elapsed().Nanoseconds()
355-
cb.metrics.RaftLogCommitLatency.RecordValue(dur)
356-
commitStats := cb.batch.CommitStats()
357-
cb.cb.OnLogSync(cb.ctx, cb.onDone, commitStats)
351+
cb.cb.OnLogSync(cb.ctx, cb.onDone, WriteStats{
352+
CommitDur: cb.logCommitBegin.Elapsed(),
353+
BatchCommitStats: cb.batch.CommitStats(),
354+
})
358355
cb.release()
359356
}
360357

pkg/kv/kvserver/logstore/logstore_bench_test.go

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"fmt"
1111
"math/rand"
1212
"testing"
13-
"time"
1413

1514
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry"
1615
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
@@ -20,7 +19,6 @@ import (
2019
"github.com/cockroachdb/cockroach/pkg/storage"
2120
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
2221
"github.com/cockroachdb/cockroach/pkg/util/log"
23-
"github.com/cockroachdb/cockroach/pkg/util/metric"
2422
"github.com/stretchr/testify/require"
2523
)
2624

@@ -34,9 +32,7 @@ func (b *discardBatch) Commit(bool) error {
3432

3533
type noopSyncCallback struct{}
3634

37-
func (noopSyncCallback) OnLogSync(
38-
context.Context, raft.StorageAppendAck, storage.BatchCommitStats,
39-
) {
35+
func (noopSyncCallback) OnLogSync(context.Context, raft.StorageAppendAck, WriteStats) {
4036
}
4137

4238
func BenchmarkLogStore_StoreEntries(b *testing.B) {
@@ -65,14 +61,6 @@ func runBenchmarkLogStore_StoreEntries(b *testing.B, bytes int64) {
6561
StateLoader: NewStateLoader(rangeID),
6662
EntryCache: ec,
6763
Settings: st,
68-
Metrics: Metrics{
69-
RaftLogCommitLatency: metric.NewHistogram(metric.HistogramOptions{
70-
Mode: metric.HistogramModePrometheus,
71-
Metadata: metric.Metadata{},
72-
Duration: 10 * time.Second,
73-
BucketConfig: metric.IOLatencyBuckets,
74-
}),
75-
},
7664
}
7765

7866
rs := RaftState{

pkg/kv/kvserver/replica_init.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,6 @@ func newUninitializedReplicaWithoutRaftGroup(
224224
SyncWaiter: store.syncWaiters[int(rangeID)%len(store.syncWaiters)],
225225
EntryCache: store.raftEntryCache,
226226
Settings: store.cfg.Settings,
227-
Metrics: logstore.Metrics{
228-
RaftLogCommitLatency: store.metrics.RaftLogCommitLatency,
229-
},
230227
DisableSyncLogWriteToss: buildutil.CrdbTestBuild &&
231228
store.TestingKnobs().DisableSyncLogWriteToss,
232229
}

pkg/kv/kvserver/replica_raft.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1800,7 +1800,7 @@ func (r *Replica) maybeCoalesceHeartbeat(
18001800
type replicaSyncCallback Replica
18011801

18021802
func (r *replicaSyncCallback) OnLogSync(
1803-
ctx context.Context, ack raft.StorageAppendAck, commitStats storage.BatchCommitStats,
1803+
ctx context.Context, ack raft.StorageAppendAck, stats logstore.WriteStats,
18041804
) {
18051805
repl := (*Replica)(r)
18061806
// The log mark is non-empty only if this was a non-empty log append that
@@ -1814,8 +1814,10 @@ func (r *replicaSyncCallback) OnLogSync(
18141814
}
18151815
// Send MsgStorageAppend's responses.
18161816
repl.sendStorageAck(ctx, ack, false /* willDeliver */)
1817-
if commitStats.TotalDuration > defaultReplicaRaftMuWarnThreshold {
1818-
log.Infof(repl.raftCtx, "slow non-blocking raft commit: %s", commitStats)
1817+
1818+
r.store.metrics.RaftLogCommitLatency.RecordValue(stats.CommitDur.Nanoseconds())
1819+
if stats.TotalDuration > defaultReplicaRaftMuWarnThreshold {
1820+
log.Infof(repl.raftCtx, "slow non-blocking raft commit: %s", stats.BatchCommitStats)
18191821
}
18201822
}
18211823

0 commit comments

Comments
 (0)