Skip to content

Commit 4ca3bfe

Browse files
craig[bot]asg0451kyle-a-wongrafissarulajmani
committed
144497: changefeedccl: fix mock kafka server shutdown r=andyyang890 a=asg0451 Fix a test timeout due to a batching sink worker being blocked on Flush after the context was cancelled. Fixes: #144213 Release note: None 144649: ui: refactor schedules page to use useSWR hook r=kyle-a-wong a=kyle-a-wong - Refactors the schedules page to use `useSwrWithClusterId` instead of redux sagas to fetch data. - Removed show and status props from being stored in redux and browser session. - Deleted schedules page story book Epic: None Release note: None --- Note: This is a stacked PR, only the last commit needs to be reviewed 144815: securitytest: regenerate certs r=rafiss a=rafiss The certs had expired, causing tests to fail. Fixes #144782 Release note: None 144848: kvclient: add some metrics for buffered writes r=stevendanna a=arulajmani Gets the ball rolling on some metrics. Epic: none Release note: None Co-authored-by: Miles Frankel <[email protected]> Co-authored-by: Kyle Wong <[email protected]> Co-authored-by: Rafi Shamim <[email protected]> Co-authored-by: Arul Ajmani <[email protected]>
5 parents 8cd9a6b + 87fb29a + 9966edc + 03b66bd + bdc58fb commit 4ca3bfe

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+912
-952
lines changed

docs/generated/metrics/metrics.html

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1949,8 +1949,12 @@
19491949
<tr><td>APPLICATION</td><td>txn.restarts.txnpush</td><td>Number of restarts due to a transaction push failure</td><td>Restarted Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
19501950
<tr><td>APPLICATION</td><td>txn.restarts.unknown</td><td>Number of restarts due to a unknown reasons</td><td>Restarted Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
19511951
<tr><td>APPLICATION</td><td>txn.restarts.writetooold</td><td>Number of restarts due to a concurrent writer committing first</td><td>Restarted Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
1952-
<tr><td>APPLICATION</td><td>txn.rollbacks.async.failed</td><td>Number of KV transaction that failed to send abort asynchronously which is not always retried</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
1953-
<tr><td>APPLICATION</td><td>txn.rollbacks.failed</td><td>Number of KV transaction that failed to send final abort</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
1952+
<tr><td>APPLICATION</td><td>txn.rollbacks.async.failed</td><td>Number of KV transactions that failed to send abort asynchronously which is not always retried</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
1953+
<tr><td>APPLICATION</td><td>txn.rollbacks.failed</td><td>Number of KV transactions that failed to send final abort</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
1954+
<tr><td>APPLICATION</td><td>txn.write_buffering.batches.fully_handled</td><td>Number of KV batches that were fully handled by the write buffer (not sent to KV)</td><td>KV Batches</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
1955+
<tr><td>APPLICATION</td><td>txn.write_buffering.disabled_after_buffering</td><td>Number of KV transactions that disabled write buffering after buffering some writes but before an EndTxn request</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
1956+
<tr><td>APPLICATION</td><td>txn.write_buffering.memory_limit_exceeded</td><td>Number of KV transactions that exceeded the write buffering memory limit</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
1957+
<tr><td>APPLICATION</td><td>txn.write_buffering.num_enabled</td><td>Number of KV transactions that enabled buffered writes</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
19541958
<tr><td>SERVER</td><td>build.timestamp</td><td>Build information</td><td>Build Time</td><td>GAUGE</td><td>TIMESTAMP_SEC</td><td>AVG</td><td>NONE</td></tr>
19551959
<tr><td>SERVER</td><td>go.scheduler_latency</td><td>Go scheduling latency</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
19561960
<tr><td>SERVER</td><td>log.buffered.messages.dropped</td><td>Count of log messages that are dropped by buffered log sinks. When CRDB attempts to buffer a log message in a buffered log sink whose buffer is already full, it drops the oldest buffered messages to make space for the new message</td><td>Messages</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>

docs/generated/settings/settings-for-tenants.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ kv.transaction.max_refresh_spans_bytes integer 4194304 maximum number of bytes u
9999
kv.transaction.randomized_anchor_key.enabled boolean false dictates whether a transactions anchor key is randomized or not application
100100
kv.transaction.reject_over_max_intents_budget.enabled boolean false if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed application
101101
kv.transaction.write_buffering.enabled boolean false if enabled, transactional writes are buffered on the client application
102-
kv.transaction.write_buffering.max_buffer_size integer 4194304 if non-zero, defines that maximum size of the buffer that will be used to buffer transactional writes per-transaction application
102+
kv.transaction.write_buffering.max_buffer_size byte size 4.0 MiB if non-zero, defines that maximum size of the buffer that will be used to buffer transactional writes per-transaction application
103103
kv.transaction.write_pipelining.locking_reads.enabled boolean true if enabled, transactional locking reads are pipelined through Raft consensus application
104104
kv.transaction.write_pipelining.ranged_writes.enabled boolean true if enabled, transactional ranged writes are pipelined through Raft consensus application
105105
kv.transaction.write_pipelining.enabled (alias: kv.transaction.write_pipelining_enabled) boolean true if enabled, transactional writes are pipelined through Raft consensus application

docs/generated/settings/settings.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@
128128
<tr><td><div id="setting-kv-transaction-randomized-anchor-key-enabled" class="anchored"><code>kv.transaction.randomized_anchor_key.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>dictates whether a transactions anchor key is randomized or not</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
129129
<tr><td><div id="setting-kv-transaction-reject-over-max-intents-budget-enabled" class="anchored"><code>kv.transaction.reject_over_max_intents_budget.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
130130
<tr><td><div id="setting-kv-transaction-write-buffering-enabled" class="anchored"><code>kv.transaction.write_buffering.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if enabled, transactional writes are buffered on the client</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
131-
<tr><td><div id="setting-kv-transaction-write-buffering-max-buffer-size" class="anchored"><code>kv.transaction.write_buffering.max_buffer_size</code></div></td><td>integer</td><td><code>4194304</code></td><td>if non-zero, defines that maximum size of the buffer that will be used to buffer transactional writes per-transaction</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
131+
<tr><td><div id="setting-kv-transaction-write-buffering-max-buffer-size" class="anchored"><code>kv.transaction.write_buffering.max_buffer_size</code></div></td><td>byte size</td><td><code>4.0 MiB</code></td><td>if non-zero, defines that maximum size of the buffer that will be used to buffer transactional writes per-transaction</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
132132
<tr><td><div id="setting-kv-transaction-write-pipelining-locking-reads-enabled" class="anchored"><code>kv.transaction.write_pipelining.locking_reads.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional locking reads are pipelined through Raft consensus</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
133133
<tr><td><div id="setting-kv-transaction-write-pipelining-ranged-writes-enabled" class="anchored"><code>kv.transaction.write_pipelining.ranged_writes.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional ranged writes are pipelined through Raft consensus</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
134134
<tr><td><div id="setting-kv-transaction-write-pipelining-enabled" class="anchored"><code>kv.transaction.write_pipelining.enabled<br />(alias: kv.transaction.write_pipelining_enabled)</code></div></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional writes are pipelined through Raft consensus</td><td>Serverless/Dedicated/Self-Hosted</td></tr>

pkg/ccl/changefeedccl/testfeed_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1876,12 +1876,16 @@ func (s *fakeKafkaSinkV2) Dial() error {
18761876
})
18771877
}
18781878

1879-
s.feedCh <- &sarama.ProducerMessage{
1879+
select {
1880+
case <-ctx.Done():
1881+
return kgo.ProduceResults{kgo.ProduceResult{Err: ctx.Err()}}
1882+
case s.feedCh <- &sarama.ProducerMessage{
18801883
Topic: m.Topic,
18811884
Key: key,
18821885
Value: sarama.ByteEncoder(m.Value),
18831886
Partition: m.Partition,
18841887
Headers: headers,
1888+
}:
18851889
}
18861890
}
18871891
return nil

pkg/kv/kvclient/kvcoord/txn_coord_sender.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,10 @@ func (tc *TxnCoordSender) initCommonInterceptors(
321321
if ds, ok := tcf.wrapped.(*DistSender); ok {
322322
riGen.ds = ds
323323
}
324-
tc.interceptorAlloc.txnWriteBuffer.st = tcf.st
324+
tc.interceptorAlloc.txnWriteBuffer = txnWriteBuffer{
325+
st: tcf.st,
326+
txnMetrics: &tc.metrics,
327+
}
325328
tc.interceptorAlloc.txnPipeliner = txnPipeliner{
326329
st: tcf.st,
327330
riGen: riGen,

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ var BufferedWritesEnabled = settings.RegisterBoolSetting(
3535
settings.WithPublic,
3636
)
3737

38-
var bufferedWritesMaxBufferSize = settings.RegisterIntSetting(
38+
var bufferedWritesMaxBufferSize = settings.RegisterByteSizeSetting(
3939
settings.ApplicationLevel,
4040
"kv.transaction.write_buffering.max_buffer_size",
4141
"if non-zero, defines that maximum size of the "+
@@ -176,7 +176,8 @@ type txnWriteBuffer struct {
176176

177177
bufferSeek bufferedWrite // re-use while seeking
178178

179-
wrapped lockedSender
179+
wrapped lockedSender
180+
txnMetrics *TxnMetrics
180181

181182
// testingOverrideCPutEvalFn is used to mock the evaluation function for
182183
// conditional puts. Intended only for tests.
@@ -189,6 +190,9 @@ func (twb *txnWriteBuffer) setEnabled(enabled bool) {
189190
// to ensure to flush the buffer.
190191
twb.flushOnNextBatch = true
191192
}
193+
if enabled {
194+
twb.txnMetrics.TxnWriteBufferEnabled.Inc(1)
195+
}
192196
twb.enabled = enabled
193197
}
194198

@@ -246,8 +250,10 @@ func (twb *txnWriteBuffer) SendLocked(
246250
// and flush the buffer.
247251
maxSize := bufferedWritesMaxBufferSize.Get(&twb.st.SV)
248252
bufSize := twb.estimateSize(ba) + twb.bufferSize
249-
if bufSize > maxSize {
250-
// TODO(arul): add some metrics for this case.
253+
// NB: if bufferedWritesMaxBufferSize is set to 0 then we effectively disable
254+
// any buffer limiting.
255+
if maxSize != 0 && bufSize > maxSize {
256+
twb.txnMetrics.TxnWriteBufferMemoryLimitExceeded.Inc(1)
251257
log.VEventf(ctx, 2, "flushing buffer because buffer size (%s) exceeds max size (%s)",
252258
humanizeutil.IBytes(bufSize),
253259
humanizeutil.IBytes(maxSize))
@@ -277,6 +283,7 @@ func (twb *txnWriteBuffer) SendLocked(
277283
return nil, pErr
278284
}
279285
}
286+
twb.txnMetrics.TxnWriteBufferFullyHandledBatches.Inc(1)
280287
return br, nil
281288
}
282289

@@ -1256,6 +1263,13 @@ func (twb *txnWriteBuffer) flushBufferAndSendBatch(
12561263
return twb.wrapped.SendLocked(ctx, ba) // nothing to flush
12571264
}
12581265

1266+
if _, ok := ba.GetArg(kvpb.EndTxn); !ok {
1267+
// We're flushing the buffer even though the batch doesn't contain an EndTxn
1268+
// request. That means we buffered some writes and decided to disable write
1269+
// buffering mid-way through the transaction, thus necessitating this flush.
1270+
twb.txnMetrics.TxnWriteBufferDisabledAfterBuffering.Inc(1)
1271+
}
1272+
12591273
// Flush all buffered writes by pre-pending them to the requests being sent
12601274
// in the batch.
12611275
reqs := make([]kvpb.RequestUnion, 0, numBuffered+len(ba.Requests))

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"fmt"
1111
"strings"
1212
"testing"
13+
"time"
1314

1415
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1516
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
@@ -25,11 +26,13 @@ import (
2526
)
2627

2728
func makeMockTxnWriteBuffer(st *cluster.Settings) (txnWriteBuffer, *mockLockedSender) {
29+
metrics := MakeTxnMetrics(time.Hour)
2830
mockSender := &mockLockedSender{}
2931
return txnWriteBuffer{
30-
enabled: true,
31-
wrapped: mockSender,
32-
st: st,
32+
enabled: true,
33+
wrapped: mockSender,
34+
txnMetrics: &metrics,
35+
st: st,
3336
}, mockSender
3437
}
3538

@@ -109,6 +112,7 @@ func TestTxnWriteBufferBuffersBlindWrites(t *testing.T) {
109112
require.Equal(t, br.Responses[0].GetInner(), &kvpb.PutResponse{})
110113
require.Equal(t, br.Responses[1].GetInner(), &kvpb.PutResponse{})
111114
require.Equal(t, br.Responses[2].GetInner(), &kvpb.DeleteResponse{})
115+
require.Equal(t, int64(1), twb.txnMetrics.TxnWriteBufferFullyHandledBatches.Count())
112116

113117
// Verify the writes were buffered correctly.
114118
expBufferedWrites := []bufferedWrite{
@@ -1450,6 +1454,8 @@ func TestTxnWriteBufferFlushesWhenOverBudget(t *testing.T) {
14501454
// Even though we flushed the Put, it shouldn't make it back to the response.
14511455
require.Len(t, br.Responses, 1)
14521456
require.IsType(t, &kvpb.DeleteResponse{}, br.Responses[0].GetInner())
1457+
require.Equal(t, int64(1), twb.txnMetrics.TxnWriteBufferDisabledAfterBuffering.Count())
1458+
require.Equal(t, int64(1), twb.txnMetrics.TxnWriteBufferMemoryLimitExceeded.Count())
14531459

14541460
// Ensure the buffer is empty at this point.
14551461
require.Equal(t, 0, len(twb.testingBufferedWritesAsSlice()))
@@ -1497,6 +1503,9 @@ func TestTxnWriteBufferFlushesWhenOverBudget(t *testing.T) {
14971503
require.NotNil(t, br)
14981504
require.Len(t, br.Responses, 1)
14991505
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
1506+
// Sanity check the metrics remain the same (and don't increase).
1507+
require.Equal(t, int64(1), twb.txnMetrics.TxnWriteBufferDisabledAfterBuffering.Count())
1508+
require.Equal(t, int64(1), twb.txnMetrics.TxnWriteBufferMemoryLimitExceeded.Count())
15001509
}
15011510

15021511
// TestTxnWriteBufferDeleteRange ensures that the txnWriteBuffer correctly
@@ -1563,6 +1572,7 @@ func TestTxnWriteBufferDeleteRange(t *testing.T) {
15631572
// Even though we flushed some writes, it shouldn't make it back to the response.
15641573
require.Len(t, br.Responses, 1)
15651574
require.IsType(t, &kvpb.DeleteRangeResponse{}, br.Responses[0].GetInner())
1575+
require.Equal(t, int64(1), twb.txnMetrics.TxnWriteBufferDisabledAfterBuffering.Count())
15661576

15671577
// Ensure the buffer is empty at this point.
15681578
require.Equal(t, 0, len(twb.testingBufferedWritesAsSlice()))
@@ -1765,6 +1775,7 @@ func TestTxnWriteBufferFlushesAfterDisabling(t *testing.T) {
17651775
// The buffer should be flushed and disabled.
17661776
require.False(t, twb.enabled)
17671777
require.False(t, twb.flushOnNextBatch)
1778+
require.Equal(t, int64(1), twb.txnMetrics.TxnWriteBufferDisabledAfterBuffering.Count())
17681779

17691780
// Both Put and Del should make it to the server in a single bath.
17701781
require.Equal(t, numCalledBefore+1, mockSender.NumCalled())
@@ -1813,6 +1824,8 @@ func TestTxnWriteBufferFlushesAfterDisabling(t *testing.T) {
18131824
require.NotNil(t, br)
18141825
require.Len(t, br.Responses, 1)
18151826
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
1827+
// Sanity check the metric remains the same (and doesn't increase).
1828+
require.Equal(t, int64(1), twb.txnMetrics.TxnWriteBufferDisabledAfterBuffering.Count())
18161829
}
18171830

18181831
// TestTxnWriteBufferClearsBufferOnEpochBump tests that the txnWriteBuffer

pkg/kv/kvclient/kvcoord/txn_metrics.go

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ type TxnMetrics struct {
5555
// End transaction failure counters.
5656
RollbacksFailed *metric.Counter
5757
AsyncRollbacksFailed *metric.Counter
58+
59+
// Write buffering metrics.
60+
TxnWriteBufferEnabled *metric.Counter
61+
TxnWriteBufferDisabledAfterBuffering *metric.Counter
62+
TxnWriteBufferMemoryLimitExceeded *metric.Counter
63+
TxnWriteBufferFullyHandledBatches *metric.Counter
5864
}
5965

6066
var (
@@ -272,16 +278,40 @@ var (
272278
}
273279
metaRollbacksFailed = metric.Metadata{
274280
Name: "txn.rollbacks.failed",
275-
Help: "Number of KV transaction that failed to send final abort",
281+
Help: "Number of KV transactions that failed to send final abort",
276282
Measurement: "KV Transactions",
277283
Unit: metric.Unit_COUNT,
278284
}
279285
metaAsyncRollbacksFailed = metric.Metadata{
280286
Name: "txn.rollbacks.async.failed",
281-
Help: "Number of KV transaction that failed to send abort asynchronously which is not always retried",
287+
Help: "Number of KV transactions that failed to send abort asynchronously which is not always retried",
288+
Measurement: "KV Transactions",
289+
Unit: metric.Unit_COUNT,
290+
}
291+
metaTxnWriteBufferEnabled = metric.Metadata{
292+
Name: "txn.write_buffering.num_enabled",
293+
Help: "Number of KV transactions that enabled buffered writes",
294+
Measurement: "KV Transactions",
295+
Unit: metric.Unit_COUNT,
296+
}
297+
metaTxnWriteBufferDisabledAfterBuffering = metric.Metadata{
298+
Name: "txn.write_buffering.disabled_after_buffering",
299+
Help: "Number of KV transactions that disabled write buffering after buffering some writes but before an EndTxn request",
300+
Measurement: "KV Transactions",
301+
Unit: metric.Unit_COUNT,
302+
}
303+
metaTxnWriteBufferLimitExceeded = metric.Metadata{
304+
Name: "txn.write_buffering.memory_limit_exceeded",
305+
Help: "Number of KV transactions that exceeded the write buffering memory limit",
282306
Measurement: "KV Transactions",
283307
Unit: metric.Unit_COUNT,
284308
}
309+
metaTxnWriteBufferFullyHandledBatches = metric.Metadata{
310+
Name: "txn.write_buffering.batches.fully_handled",
311+
Help: "Number of KV batches that were fully handled by the write buffer (not sent to KV)",
312+
Measurement: "KV Batches",
313+
Unit: metric.Unit_COUNT,
314+
}
285315
)
286316

287317
// MakeTxnMetrics returns a TxnMetrics struct that contains metrics whose
@@ -321,15 +351,19 @@ func MakeTxnMetrics(histogramWindow time.Duration) TxnMetrics {
321351
SigFigs: 3,
322352
BucketConfig: metric.Count1KBuckets,
323353
}),
324-
RestartsWriteTooOld: telemetry.NewCounterWithMetric(metaRestartsWriteTooOld),
325-
RestartsSerializable: telemetry.NewCounterWithMetric(metaRestartsSerializable),
326-
RestartsAsyncWriteFailure: telemetry.NewCounterWithMetric(metaRestartsAsyncWriteFailure),
327-
RestartsCommitDeadlineExceeded: telemetry.NewCounterWithMetric(metaRestartsCommitDeadlineExceeded),
328-
RestartsReadWithinUncertainty: telemetry.NewCounterWithMetric(metaRestartsReadWithinUncertainty),
329-
RestartsTxnAborted: telemetry.NewCounterWithMetric(metaRestartsTxnAborted),
330-
RestartsTxnPush: telemetry.NewCounterWithMetric(metaRestartsTxnPush),
331-
RestartsUnknown: telemetry.NewCounterWithMetric(metaRestartsUnknown),
332-
RollbacksFailed: metric.NewCounter(metaRollbacksFailed),
333-
AsyncRollbacksFailed: metric.NewCounter(metaAsyncRollbacksFailed),
354+
RestartsWriteTooOld: telemetry.NewCounterWithMetric(metaRestartsWriteTooOld),
355+
RestartsSerializable: telemetry.NewCounterWithMetric(metaRestartsSerializable),
356+
RestartsAsyncWriteFailure: telemetry.NewCounterWithMetric(metaRestartsAsyncWriteFailure),
357+
RestartsCommitDeadlineExceeded: telemetry.NewCounterWithMetric(metaRestartsCommitDeadlineExceeded),
358+
RestartsReadWithinUncertainty: telemetry.NewCounterWithMetric(metaRestartsReadWithinUncertainty),
359+
RestartsTxnAborted: telemetry.NewCounterWithMetric(metaRestartsTxnAborted),
360+
RestartsTxnPush: telemetry.NewCounterWithMetric(metaRestartsTxnPush),
361+
RestartsUnknown: telemetry.NewCounterWithMetric(metaRestartsUnknown),
362+
RollbacksFailed: metric.NewCounter(metaRollbacksFailed),
363+
AsyncRollbacksFailed: metric.NewCounter(metaAsyncRollbacksFailed),
364+
TxnWriteBufferEnabled: metric.NewCounter(metaTxnWriteBufferEnabled),
365+
TxnWriteBufferDisabledAfterBuffering: metric.NewCounter(metaTxnWriteBufferDisabledAfterBuffering),
366+
TxnWriteBufferMemoryLimitExceeded: metric.NewCounter(metaTxnWriteBufferLimitExceeded),
367+
TxnWriteBufferFullyHandledBatches: metric.NewCounter(metaTxnWriteBufferFullyHandledBatches),
334368
}
335369
}

pkg/kv/kvclient/kvcoord/txn_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1659,7 +1659,16 @@ func TestTxnBasicBufferedWrites(t *testing.T) {
16591659
require.NoError(t, txn.Commit(ctx))
16601660

16611661
err := s.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
1662+
metrics := txn.Sender().(*kvcoord.TxnCoordSender).Metrics()
1663+
before := metrics.TxnWriteBufferEnabled.Count()
16621664
txn.SetBufferedWritesEnabled(true)
1665+
after := metrics.TxnWriteBufferEnabled.Count()
1666+
require.Equal(t, after-before, int64(1))
1667+
1668+
// Ensure that enabling buffered writes after we've already done so
1669+
// doesn't increment the metric.
1670+
txn.SetBufferedWritesEnabled(true)
1671+
require.Equal(t, after, metrics.TxnWriteBufferEnabled.Count())
16631672

16641673
// Put transactional value.
16651674
if err := txn.Put(ctx, keyA, value1); err != nil {

0 commit comments

Comments
 (0)