Skip to content

Commit 704e340

Browse files
committed
kvcoord: disallow pipelining while write buffering is enabled
The interaction between pipelining and write buffering has revealed a couple of subtle bugs. Further, these two optimisations are largely overlapping. In the happy case, when write buffering is enabled, we expect almost not write pipelining. Here, we disable pipelining when write buffering is enabled. The upside of this is: 1. It avoids other possible bugs in the interaction between these features. 2. It makes it a bit easier to reason about the behaviour of write-buffered transactions. The downsides include: 1. This results in a negative performance impact for users who turn on write buffering but whose transactions often result in buffering later being disabled. 2. Write pipelining has been the default for many releases and it is not clear that running with write pipelining disabled is as well tested as the enabled code path. Fixes #149911 Epic: none Release note: None
1 parent 9272e28 commit 704e340

File tree

3 files changed

+97
-0
lines changed

3 files changed

+97
-0
lines changed

pkg/kv/kvclient/kvcoord/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ go_test(
159159
"txn_interceptor_pipeliner_test.go",
160160
"txn_interceptor_seq_num_allocator_test.go",
161161
"txn_interceptor_span_refresher_test.go",
162+
"txn_interceptor_write_buffer_client_test.go",
162163
"txn_interceptor_write_buffer_test.go",
163164
"txn_test.go",
164165
":bufferedwrite_interval_btree_test.go", # keep

pkg/kv/kvclient/kvcoord/txn_coord_sender.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1192,6 +1192,9 @@ func (tc *TxnCoordSender) SetBufferedWritesEnabled(enabled bool) {
11921192
panic("cannot enable buffered writes on a running transaction")
11931193
}
11941194
tc.interceptorAlloc.txnWriteBuffer.setEnabled(enabled)
1195+
if enabled {
1196+
tc.interceptorAlloc.txnPipeliner.disabled = true
1197+
}
11951198
}
11961199

11971200
// BufferedWritesEnabled is part of the kv.TxnSender interface.
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package kvcoord
7+
8+
import (
9+
"context"
10+
"testing"
11+
12+
"github.com/cockroachdb/cockroach/pkg/base"
13+
"github.com/cockroachdb/cockroach/pkg/kv"
14+
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
15+
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
16+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
17+
"github.com/cockroachdb/cockroach/pkg/util/log"
18+
"github.com/stretchr/testify/require"
19+
)
20+
21+
// TestTxnCoordSenderWriteBufferingDisablesPipelining verifies that enabling
22+
// write buffering disables pipelining.
23+
func TestTxnCoordSenderWriteBufferingDisablesPipelining(t *testing.T) {
24+
defer leaktest.AfterTest(t)()
25+
defer log.Scope(t).Close(t)
26+
27+
ctx := context.Background()
28+
s := serverutils.StartServerOnly(t, base.TestServerArgs{})
29+
defer s.Stopper().Stop(ctx)
30+
31+
distSender := s.DistSenderI().(*DistSender)
32+
batchCount := 0
33+
var calls []kvpb.Method
34+
var senderFn kv.SenderFunc = func(
35+
ctx context.Context, ba *kvpb.BatchRequest,
36+
) (*kvpb.BatchResponse, *kvpb.Error) {
37+
batchCount++
38+
calls = append(calls, ba.Methods()...)
39+
if et, ok := ba.GetArg(kvpb.EndTxn); ok {
40+
// Ensure that no transactions enter a STAGING state.
41+
et.(*kvpb.EndTxnRequest).InFlightWrites = nil
42+
}
43+
return distSender.Send(ctx, ba)
44+
}
45+
46+
st := s.ClusterSettings()
47+
tsf := NewTxnCoordSenderFactory(TxnCoordSenderFactoryConfig{
48+
AmbientCtx: s.AmbientCtx(),
49+
Settings: st,
50+
Clock: s.Clock(),
51+
Stopper: s.Stopper(),
52+
// Disable transaction heartbeats so that they don't disrupt our attempt to
53+
// track the requests issued by the transactions.
54+
HeartbeatInterval: -1,
55+
}, senderFn)
56+
db := kv.NewDB(s.AmbientCtx(), tsf, s.Clock(), s.Stopper())
57+
58+
// Disable scan transforms so that we can force a write that _would have_ been
59+
// buffered.
60+
require.NoError(t, db.Put(ctx, "test-key-a", "hello"))
61+
62+
bufferedWritesScanTransformEnabled.Override(ctx, &st.SV, false)
63+
64+
// Without write buffering
65+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
66+
txn.SetBufferedWritesEnabled(false)
67+
if err := txn.Put(ctx, "test-key-c", "hello"); err != nil {
68+
return err
69+
}
70+
_, err := txn.ScanForUpdate(ctx, "test-key", "test-key-b", 10, kvpb.GuaranteedDurability)
71+
return err
72+
}))
73+
74+
// With write buffering.
75+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
76+
txn.SetBufferedWritesEnabled(true)
77+
if err := txn.Put(ctx, "test-key-c", "hello"); err != nil {
78+
return err
79+
}
80+
_, err := txn.ScanForUpdate(ctx, "test-key", "test-key-b", 10, kvpb.GuaranteedDurability)
81+
return err
82+
}))
83+
84+
require.Equal(t, 1+3+2, batchCount)
85+
require.Equal(t, []kvpb.Method{
86+
// The initial setup
87+
kvpb.Put,
88+
// The first transaction without write buffering
89+
kvpb.Put, kvpb.Scan, kvpb.QueryIntent, kvpb.QueryIntent, kvpb.EndTxn,
90+
// The second transaction with write buffering
91+
kvpb.Scan, kvpb.Put, kvpb.EndTxn,
92+
}, calls)
93+
}

0 commit comments

Comments
 (0)