Skip to content

Commit 7c4290d

Browse files
committed
kvcoord: correctly initialize txnWriteBuffer
Previously we were initializing the pipelineEnabler on a copy of the txnWriteBuffer that would later be overwritten. As a result, we were never re-enabling pipelining after a mid-transaction flush. This is OK from a correctness perspective since this was only intended as a potential performance improvement for some workloads under buffered writes. There is a potentially strong argument that we shouldn't do this at all and that if someone sees a regression of their workload under buffered writes they should turn it off rather than relying on this fallback. Epic: None Release note: None
1 parent 8b1e1a9 commit 7c4290d

File tree

3 files changed

+70
-3
lines changed

3 files changed

+70
-3
lines changed

pkg/kv/kvclient/kvcoord/txn_coord_sender.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -281,9 +281,6 @@ func newRootTxnCoordSender(
281281
timeSource: timeutil.DefaultTimeSource{},
282282
txn: &tcs.mu.txn,
283283
}
284-
tcs.interceptorAlloc.txnWriteBuffer.init(
285-
&tcs.interceptorAlloc.txnPipeliner,
286-
)
287284

288285
tcs.initCommonInterceptors(tcf, txn, kv.RootTxn)
289286

@@ -363,6 +360,7 @@ func (tc *TxnCoordSender) initCommonInterceptors(
363360
allowConcurrentRequests: typ == kv.LeafTxn,
364361
}
365362
tc.interceptorAlloc.txnSeqNumAllocator.writeSeq = txn.Sequence
363+
tc.interceptorAlloc.txnWriteBuffer.init(&tc.interceptorAlloc.txnPipeliner)
366364
}
367365

368366
func (tc *TxnCoordSender) connectInterceptors() {

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_client_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,71 @@ func TestTxnCoordSenderWriteBufferingDisablesPipelining(t *testing.T) {
9292
kvpb.Scan, kvpb.Put, kvpb.EndTxn,
9393
}, calls)
9494
}
95+
96+
// TestTxnCoordSenderWriteBufferingReEnablesPipelining verifies that pipelining
97+
// is re-enabled after a mid-transaction flush.
98+
func TestTxnCoordSenderWriteBufferingReEnablesPipelining(t *testing.T) {
99+
defer leaktest.AfterTest(t)()
100+
defer log.Scope(t).Close(t)
101+
102+
ctx := context.Background()
103+
s := serverutils.StartServerOnly(t, base.TestServerArgs{})
104+
defer s.Stopper().Stop(ctx)
105+
106+
distSender := s.DistSenderI().(*DistSender)
107+
batchCount := 0
108+
var calls []kvpb.Method
109+
var senderFn kv.SenderFunc = func(
110+
ctx context.Context, ba *kvpb.BatchRequest,
111+
) (*kvpb.BatchResponse, *kvpb.Error) {
112+
batchCount++
113+
t.Logf("Batch: %#+v", ba.Methods())
114+
calls = append(calls, ba.Methods()...)
115+
if et, ok := ba.GetArg(kvpb.EndTxn); ok {
116+
// Ensure that no transactions enter a STAGING state.
117+
et.(*kvpb.EndTxnRequest).InFlightWrites = nil
118+
}
119+
return distSender.Send(ctx, ba)
120+
}
121+
122+
st := s.ClusterSettings()
123+
BufferedWritesMaxBufferSize.Override(ctx, &st.SV, defaultBufferSize)
124+
125+
tsf := NewTxnCoordSenderFactory(TxnCoordSenderFactoryConfig{
126+
AmbientCtx: s.AmbientCtx(),
127+
Settings: st,
128+
Clock: s.Clock(),
129+
Stopper: s.Stopper(),
130+
// Disable transaction heartbeats so that they don't disrupt our attempt to
131+
// track the requests issued by the transactions.
132+
HeartbeatInterval: -1,
133+
}, senderFn)
134+
db := kv.NewDB(s.AmbientCtx(), tsf, s.Clock(), s.Stopper())
135+
136+
require.NoError(t, db.Put(ctx, "test-key-a", "hello"))
137+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
138+
txn.SetBufferedWritesEnabled(true)
139+
if err := txn.Put(ctx, "test-key-c", "hello"); err != nil {
140+
return err
141+
}
142+
if _, err := txn.DelRange(ctx, "test-key", "test-key-d", true); err != nil {
143+
return err
144+
}
145+
if err := txn.Put(ctx, "test-key-a", "hello"); err != nil {
146+
return err
147+
}
148+
return nil
149+
}))
150+
151+
require.Equal(t, 4, batchCount)
152+
require.Equal(t, []kvpb.Method{
153+
// The initial setup
154+
kvpb.Put,
155+
// The first (buffered) Put and the DeleteRange that flushes the buffer.
156+
kvpb.Put, kvpb.DeleteRange,
157+
// The second (pipelined) Put
158+
kvpb.Put,
159+
// EndTxn with the QueryIntent because pipelining was turned back on.
160+
kvpb.QueryIntent, kvpb.EndTxn,
161+
}, calls)
162+
}

pkg/sql/opt/exec/execbuilder/testdata/show_trace

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,7 @@ SELECT operation, message FROM [SHOW KV TRACE FOR SESSION]
398398
WHERE message LIKE '%r$rangeid: sending batch%'
399399
AND message NOT LIKE '%PushTxn%'
400400
AND message NOT LIKE '%QueryTxn%'
401+
AND message NOT LIKE '%QueryIntent%'
401402
----
402403
dist sender send r77: sending batch 6 CPut to (n1,s1):1
403404
dist sender send r77: sending batch 6 CPut to (n1,s1):1

0 commit comments

Comments
 (0)