Skip to content

Commit bdc58fb

Browse files
committed
kvclient: add some metrics for buffered writes
Gets the ball rolling on some metrics. Epic: none Release note: None
1 parent 4a5a266 commit bdc58fb

File tree

6 files changed

+95
-20
lines changed

6 files changed

+95
-20
lines changed

docs/generated/metrics/metrics.html

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1949,8 +1949,12 @@
19491949
<tr><td>APPLICATION</td><td>txn.restarts.txnpush</td><td>Number of restarts due to a transaction push failure</td><td>Restarted Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
19501950
<tr><td>APPLICATION</td><td>txn.restarts.unknown</td><td>Number of restarts due to a unknown reasons</td><td>Restarted Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
19511951
<tr><td>APPLICATION</td><td>txn.restarts.writetooold</td><td>Number of restarts due to a concurrent writer committing first</td><td>Restarted Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
1952-
<tr><td>APPLICATION</td><td>txn.rollbacks.async.failed</td><td>Number of KV transaction that failed to send abort asynchronously which is not always retried</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
1953-
<tr><td>APPLICATION</td><td>txn.rollbacks.failed</td><td>Number of KV transaction that failed to send final abort</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
1952+
<tr><td>APPLICATION</td><td>txn.rollbacks.async.failed</td><td>Number of KV transactions that failed to send abort asynchronously which is not always retried</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
1953+
<tr><td>APPLICATION</td><td>txn.rollbacks.failed</td><td>Number of KV transactions that failed to send final abort</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
1954+
<tr><td>APPLICATION</td><td>txn.write_buffering.batches.fully_handled</td><td>Number of KV batches that were fully handled by the write buffer (not sent to KV)</td><td>KV Batches</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
1955+
<tr><td>APPLICATION</td><td>txn.write_buffering.disabled_after_buffering</td><td>Number of KV transactions that disabled write buffering after buffering some writes but before an EndTxn request</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
1956+
<tr><td>APPLICATION</td><td>txn.write_buffering.memory_limit_exceeded</td><td>Number of KV transactions that exceeded the write buffering memory limit</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
1957+
<tr><td>APPLICATION</td><td>txn.write_buffering.num_enabled</td><td>Number of KV transactions that enabled buffered writes</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
19541958
<tr><td>SERVER</td><td>build.timestamp</td><td>Build information</td><td>Build Time</td><td>GAUGE</td><td>TIMESTAMP_SEC</td><td>AVG</td><td>NONE</td></tr>
19551959
<tr><td>SERVER</td><td>go.scheduler_latency</td><td>Go scheduling latency</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
19561960
<tr><td>SERVER</td><td>log.buffered.messages.dropped</td><td>Count of log messages that are dropped by buffered log sinks. When CRDB attempts to buffer a log message in a buffered log sink whose buffer is already full, it drops the oldest buffered messages to make space for the new message</td><td>Messages</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>

pkg/kv/kvclient/kvcoord/txn_coord_sender.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,10 @@ func (tc *TxnCoordSender) initCommonInterceptors(
321321
if ds, ok := tcf.wrapped.(*DistSender); ok {
322322
riGen.ds = ds
323323
}
324-
tc.interceptorAlloc.txnWriteBuffer.st = tcf.st
324+
tc.interceptorAlloc.txnWriteBuffer = txnWriteBuffer{
325+
st: tcf.st,
326+
txnMetrics: &tc.metrics,
327+
}
325328
tc.interceptorAlloc.txnPipeliner = txnPipeliner{
326329
st: tcf.st,
327330
riGen: riGen,

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ type txnWriteBuffer struct {
176176

177177
bufferSeek bufferedWrite // re-use while seeking
178178

179-
wrapped lockedSender
179+
wrapped lockedSender
180+
txnMetrics *TxnMetrics
180181

181182
// testingOverrideCPutEvalFn is used to mock the evaluation function for
182183
// conditional puts. Intended only for tests.
@@ -189,6 +190,9 @@ func (twb *txnWriteBuffer) setEnabled(enabled bool) {
189190
// to ensure to flush the buffer.
190191
twb.flushOnNextBatch = true
191192
}
193+
if enabled {
194+
twb.txnMetrics.TxnWriteBufferEnabled.Inc(1)
195+
}
192196
twb.enabled = enabled
193197
}
194198

@@ -249,7 +253,7 @@ func (twb *txnWriteBuffer) SendLocked(
249253
// NB: if bufferedWritesMaxBufferSize is set to 0 then we effectively disable
250254
// any buffer limiting.
251255
if maxSize != 0 && bufSize > maxSize {
252-
// TODO(arul): add some metrics for this case.
256+
twb.txnMetrics.TxnWriteBufferMemoryLimitExceeded.Inc(1)
253257
log.VEventf(ctx, 2, "flushing buffer because buffer size (%s) exceeds max size (%s)",
254258
humanizeutil.IBytes(bufSize),
255259
humanizeutil.IBytes(maxSize))
@@ -279,6 +283,7 @@ func (twb *txnWriteBuffer) SendLocked(
279283
return nil, pErr
280284
}
281285
}
286+
twb.txnMetrics.TxnWriteBufferFullyHandledBatches.Inc(1)
282287
return br, nil
283288
}
284289

@@ -1258,6 +1263,13 @@ func (twb *txnWriteBuffer) flushBufferAndSendBatch(
12581263
return twb.wrapped.SendLocked(ctx, ba) // nothing to flush
12591264
}
12601265

1266+
if _, ok := ba.GetArg(kvpb.EndTxn); !ok {
1267+
// We're flushing the buffer even though the batch doesn't contain an EndTxn
1268+
// request. That means we buffered some writes and decided to disable write
1269+
// buffering mid-way through the transaction, thus necessitating this flush.
1270+
twb.txnMetrics.TxnWriteBufferDisabledAfterBuffering.Inc(1)
1271+
}
1272+
12611273
// Flush all buffered writes by pre-pending them to the requests being sent
12621274
// in the batch.
12631275
reqs := make([]kvpb.RequestUnion, 0, numBuffered+len(ba.Requests))

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"fmt"
1111
"strings"
1212
"testing"
13+
"time"
1314

1415
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1516
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
@@ -25,11 +26,13 @@ import (
2526
)
2627

2728
func makeMockTxnWriteBuffer(st *cluster.Settings) (txnWriteBuffer, *mockLockedSender) {
29+
metrics := MakeTxnMetrics(time.Hour)
2830
mockSender := &mockLockedSender{}
2931
return txnWriteBuffer{
30-
enabled: true,
31-
wrapped: mockSender,
32-
st: st,
32+
enabled: true,
33+
wrapped: mockSender,
34+
txnMetrics: &metrics,
35+
st: st,
3336
}, mockSender
3437
}
3538

@@ -109,6 +112,7 @@ func TestTxnWriteBufferBuffersBlindWrites(t *testing.T) {
109112
require.Equal(t, br.Responses[0].GetInner(), &kvpb.PutResponse{})
110113
require.Equal(t, br.Responses[1].GetInner(), &kvpb.PutResponse{})
111114
require.Equal(t, br.Responses[2].GetInner(), &kvpb.DeleteResponse{})
115+
require.Equal(t, int64(1), twb.txnMetrics.TxnWriteBufferFullyHandledBatches.Count())
112116

113117
// Verify the writes were buffered correctly.
114118
expBufferedWrites := []bufferedWrite{
@@ -1450,6 +1454,8 @@ func TestTxnWriteBufferFlushesWhenOverBudget(t *testing.T) {
14501454
// Even though we flushed the Put, it shouldn't make it back to the response.
14511455
require.Len(t, br.Responses, 1)
14521456
require.IsType(t, &kvpb.DeleteResponse{}, br.Responses[0].GetInner())
1457+
require.Equal(t, int64(1), twb.txnMetrics.TxnWriteBufferDisabledAfterBuffering.Count())
1458+
require.Equal(t, int64(1), twb.txnMetrics.TxnWriteBufferMemoryLimitExceeded.Count())
14531459

14541460
// Ensure the buffer is empty at this point.
14551461
require.Equal(t, 0, len(twb.testingBufferedWritesAsSlice()))
@@ -1497,6 +1503,9 @@ func TestTxnWriteBufferFlushesWhenOverBudget(t *testing.T) {
14971503
require.NotNil(t, br)
14981504
require.Len(t, br.Responses, 1)
14991505
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
1506+
// Sanity check the metrics remain the same (and don't increase).
1507+
require.Equal(t, int64(1), twb.txnMetrics.TxnWriteBufferDisabledAfterBuffering.Count())
1508+
require.Equal(t, int64(1), twb.txnMetrics.TxnWriteBufferMemoryLimitExceeded.Count())
15001509
}
15011510

15021511
// TestTxnWriteBufferDeleteRange ensures that the txnWriteBuffer correctly
@@ -1563,6 +1572,7 @@ func TestTxnWriteBufferDeleteRange(t *testing.T) {
15631572
// Even though we flushed some writes, it shouldn't make it back to the response.
15641573
require.Len(t, br.Responses, 1)
15651574
require.IsType(t, &kvpb.DeleteRangeResponse{}, br.Responses[0].GetInner())
1575+
require.Equal(t, int64(1), twb.txnMetrics.TxnWriteBufferDisabledAfterBuffering.Count())
15661576

15671577
// Ensure the buffer is empty at this point.
15681578
require.Equal(t, 0, len(twb.testingBufferedWritesAsSlice()))
@@ -1765,6 +1775,7 @@ func TestTxnWriteBufferFlushesAfterDisabling(t *testing.T) {
17651775
// The buffer should be flushed and disabled.
17661776
require.False(t, twb.enabled)
17671777
require.False(t, twb.flushOnNextBatch)
1778+
require.Equal(t, int64(1), twb.txnMetrics.TxnWriteBufferDisabledAfterBuffering.Count())
17681779

17691780
// Both Put and Del should make it to the server in a single bath.
17701781
require.Equal(t, numCalledBefore+1, mockSender.NumCalled())
@@ -1813,6 +1824,8 @@ func TestTxnWriteBufferFlushesAfterDisabling(t *testing.T) {
18131824
require.NotNil(t, br)
18141825
require.Len(t, br.Responses, 1)
18151826
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
1827+
// Sanity check the metric remains the same (and doesn't increase).
1828+
require.Equal(t, int64(1), twb.txnMetrics.TxnWriteBufferDisabledAfterBuffering.Count())
18161829
}
18171830

18181831
// TestTxnWriteBufferClearsBufferOnEpochBump tests that the txnWriteBuffer

pkg/kv/kvclient/kvcoord/txn_metrics.go

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ type TxnMetrics struct {
5555
// End transaction failure counters.
5656
RollbacksFailed *metric.Counter
5757
AsyncRollbacksFailed *metric.Counter
58+
59+
// Write buffering metrics.
60+
TxnWriteBufferEnabled *metric.Counter
61+
TxnWriteBufferDisabledAfterBuffering *metric.Counter
62+
TxnWriteBufferMemoryLimitExceeded *metric.Counter
63+
TxnWriteBufferFullyHandledBatches *metric.Counter
5864
}
5965

6066
var (
@@ -272,16 +278,40 @@ var (
272278
}
273279
metaRollbacksFailed = metric.Metadata{
274280
Name: "txn.rollbacks.failed",
275-
Help: "Number of KV transaction that failed to send final abort",
281+
Help: "Number of KV transactions that failed to send final abort",
276282
Measurement: "KV Transactions",
277283
Unit: metric.Unit_COUNT,
278284
}
279285
metaAsyncRollbacksFailed = metric.Metadata{
280286
Name: "txn.rollbacks.async.failed",
281-
Help: "Number of KV transaction that failed to send abort asynchronously which is not always retried",
287+
Help: "Number of KV transactions that failed to send abort asynchronously which is not always retried",
288+
Measurement: "KV Transactions",
289+
Unit: metric.Unit_COUNT,
290+
}
291+
metaTxnWriteBufferEnabled = metric.Metadata{
292+
Name: "txn.write_buffering.num_enabled",
293+
Help: "Number of KV transactions that enabled buffered writes",
294+
Measurement: "KV Transactions",
295+
Unit: metric.Unit_COUNT,
296+
}
297+
metaTxnWriteBufferDisabledAfterBuffering = metric.Metadata{
298+
Name: "txn.write_buffering.disabled_after_buffering",
299+
Help: "Number of KV transactions that disabled write buffering after buffering some writes but before an EndTxn request",
300+
Measurement: "KV Transactions",
301+
Unit: metric.Unit_COUNT,
302+
}
303+
metaTxnWriteBufferLimitExceeded = metric.Metadata{
304+
Name: "txn.write_buffering.memory_limit_exceeded",
305+
Help: "Number of KV transactions that exceeded the write buffering memory limit",
282306
Measurement: "KV Transactions",
283307
Unit: metric.Unit_COUNT,
284308
}
309+
metaTxnWriteBufferFullyHandledBatches = metric.Metadata{
310+
Name: "txn.write_buffering.batches.fully_handled",
311+
Help: "Number of KV batches that were fully handled by the write buffer (not sent to KV)",
312+
Measurement: "KV Batches",
313+
Unit: metric.Unit_COUNT,
314+
}
285315
)
286316

287317
// MakeTxnMetrics returns a TxnMetrics struct that contains metrics whose
@@ -321,15 +351,19 @@ func MakeTxnMetrics(histogramWindow time.Duration) TxnMetrics {
321351
SigFigs: 3,
322352
BucketConfig: metric.Count1KBuckets,
323353
}),
324-
RestartsWriteTooOld: telemetry.NewCounterWithMetric(metaRestartsWriteTooOld),
325-
RestartsSerializable: telemetry.NewCounterWithMetric(metaRestartsSerializable),
326-
RestartsAsyncWriteFailure: telemetry.NewCounterWithMetric(metaRestartsAsyncWriteFailure),
327-
RestartsCommitDeadlineExceeded: telemetry.NewCounterWithMetric(metaRestartsCommitDeadlineExceeded),
328-
RestartsReadWithinUncertainty: telemetry.NewCounterWithMetric(metaRestartsReadWithinUncertainty),
329-
RestartsTxnAborted: telemetry.NewCounterWithMetric(metaRestartsTxnAborted),
330-
RestartsTxnPush: telemetry.NewCounterWithMetric(metaRestartsTxnPush),
331-
RestartsUnknown: telemetry.NewCounterWithMetric(metaRestartsUnknown),
332-
RollbacksFailed: metric.NewCounter(metaRollbacksFailed),
333-
AsyncRollbacksFailed: metric.NewCounter(metaAsyncRollbacksFailed),
354+
RestartsWriteTooOld: telemetry.NewCounterWithMetric(metaRestartsWriteTooOld),
355+
RestartsSerializable: telemetry.NewCounterWithMetric(metaRestartsSerializable),
356+
RestartsAsyncWriteFailure: telemetry.NewCounterWithMetric(metaRestartsAsyncWriteFailure),
357+
RestartsCommitDeadlineExceeded: telemetry.NewCounterWithMetric(metaRestartsCommitDeadlineExceeded),
358+
RestartsReadWithinUncertainty: telemetry.NewCounterWithMetric(metaRestartsReadWithinUncertainty),
359+
RestartsTxnAborted: telemetry.NewCounterWithMetric(metaRestartsTxnAborted),
360+
RestartsTxnPush: telemetry.NewCounterWithMetric(metaRestartsTxnPush),
361+
RestartsUnknown: telemetry.NewCounterWithMetric(metaRestartsUnknown),
362+
RollbacksFailed: metric.NewCounter(metaRollbacksFailed),
363+
AsyncRollbacksFailed: metric.NewCounter(metaAsyncRollbacksFailed),
364+
TxnWriteBufferEnabled: metric.NewCounter(metaTxnWriteBufferEnabled),
365+
TxnWriteBufferDisabledAfterBuffering: metric.NewCounter(metaTxnWriteBufferDisabledAfterBuffering),
366+
TxnWriteBufferMemoryLimitExceeded: metric.NewCounter(metaTxnWriteBufferLimitExceeded),
367+
TxnWriteBufferFullyHandledBatches: metric.NewCounter(metaTxnWriteBufferFullyHandledBatches),
334368
}
335369
}

pkg/kv/kvclient/kvcoord/txn_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1659,7 +1659,16 @@ func TestTxnBasicBufferedWrites(t *testing.T) {
16591659
require.NoError(t, txn.Commit(ctx))
16601660

16611661
err := s.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
1662+
metrics := txn.Sender().(*kvcoord.TxnCoordSender).Metrics()
1663+
before := metrics.TxnWriteBufferEnabled.Count()
16621664
txn.SetBufferedWritesEnabled(true)
1665+
after := metrics.TxnWriteBufferEnabled.Count()
1666+
require.Equal(t, after-before, int64(1))
1667+
1668+
// Ensure that enabling buffered writes after we've already done so
1669+
// doesn't increment the metric.
1670+
txn.SetBufferedWritesEnabled(true)
1671+
require.Equal(t, after, metrics.TxnWriteBufferEnabled.Count())
16631672

16641673
// Put transactional value.
16651674
if err := txn.Put(ctx, keyA, value1); err != nil {

0 commit comments

Comments
 (0)