Skip to content

Commit c12ca7e

Browse files
arulajmaniyuzefovich
authored andcommitted
kv: add HasBufferedAllPreceedingWrites flag for BatchRequests
This patch adds a new flag to indicate that a transaction has buffered all preceeding writes on the client. It also sets it in the txnWriteBuffer. In the future, we'll use this information to omit AbortSpan checks on the server. Epic: none Release note: None
1 parent e90488e commit c12ca7e

File tree

3 files changed

+227
-16
lines changed

3 files changed

+227
-16
lines changed

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -146,26 +146,26 @@ type txnWriteBuffer struct {
146146
// and disable write buffering going forward out of an abundance of caution.
147147
// This is opted into by SQL.
148148
//
149-
// As a result, we have a nice invariant: if write buffering is enabled, then
150-
// all writes performed by the transaction are buffered in memory. We can
151-
// never have the case where a part of the write set is buffered, and the
152-
// other part is replicated.
149+
// As a result, we have a nice invariant: if write buffering is enabled,
150+
// then all writes performed by the transaction are buffered in memory. We
151+
// can never have the case where a part of the write set is buffered, and
152+
// the other part is replicated.
153153
//
154-
// In the future, the invariant above allows us to omit checking the AbortSpan
155-
// for transactions that have buffered writes enabled. The AbortSpan is used
156-
// to ensure we don't violate read-your-own-write semantics for transactions
157-
// that have been aborted by a conflicting transaction. As read-your-own-write
158-
// semantics are upheld by the client, not the server, for transactions that
159-
// use buffered writes, we can skip the AbortSpan check on the server.
154+
// The invariant above allows us to omit checking the AbortSpan for
155+
// transactions that have buffered writes enabled. The AbortSpan is used to
156+
// ensure we don't violate read-your-own-write semantics for transactions
157+
// that have been aborted by a conflicting transaction. As
158+
// read-your-own-write semantics are upheld by the client, not the server,
159+
// for transactions that use buffered writes, we can skip the AbortSpan
160+
// check on the server.
160161
//
161162
// We currently track this via two state variables: `enabled` and `flushed`.
162163
// Writes are only buffered if enabled && !flushed.
163164
//
164-
// `enabled` tracks whether buffering has been enabled/disabled externally via
165-
// txn.SetBufferedWritesEnabled or because we are operating on a leaf
165+
// `enabled` tracks whether buffering has been enabled/disabled externally
166+
// via txn.SetBufferedWritesEnabled or because we are operating on a leaf
166167
// transaction.
167168
enabled bool
168-
//
169169
// `flushed` tracks whether the buffer has been previously flushed.
170170
flushed bool
171171

@@ -210,6 +210,15 @@ func (twb *txnWriteBuffer) SendLocked(
210210

211211
if !twb.shouldBuffer() {
212212
return twb.wrapped.SendLocked(ctx, ba)
213+
} else {
214+
// If we're here, write buffering is enabled, and all writes until now
215+
// have been buffered. Set the flag to indicate this.
216+
//
217+
// NB: We don't need a version check here (for v25.3) because this is only
218+
// used by the server to optimize away the AbortSpan check. Even if we set
219+
// this field, and the server is on a previous version, the worst that can
220+
// happen is we'll perform this check, which is harmless.
221+
ba.HasBufferedAllPrecedingWrites = true
213222
}
214223

215224
if etArg, ok := ba.GetArg(kvpb.EndTxn); ok {

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2056,6 +2056,190 @@ func TestTxnWriteBufferBatchRequestValidation(t *testing.T) {
20562056
}
20572057
}
20582058

2059+
// TestTxnWriteBufferHasBufferedAllPrecedingWrites verifies that the
2060+
// txnWriteBuffer correctly sets the HasBufferedAllPrecedingWrites flag.
2061+
func TestTxnWriteBufferHasBufferedAllPrecedingWrites(t *testing.T) {
2062+
defer leaktest.AfterTest(t)()
2063+
defer log.Scope(t).Close(t)
2064+
2065+
txn := makeTxnProto()
2066+
txn.Sequence = 1
2067+
keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")
2068+
2069+
for _, tc := range []struct {
2070+
name string
2071+
setup func(*txnWriteBuffer)
2072+
ba func(ba *kvpb.BatchRequest)
2073+
mockSend func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error)
2074+
expHasBufferedAllPrecedingWrites bool
2075+
}{
2076+
{
2077+
name: "batch with two Get requests",
2078+
ba: func(ba *kvpb.BatchRequest) {
2079+
getA := &kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: keyA, Sequence: txn.Sequence}}
2080+
getB := &kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: keyB, Sequence: txn.Sequence}}
2081+
ba.Add(getA, getB)
2082+
},
2083+
mockSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
2084+
require.Len(t, ba.Requests, 2)
2085+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner())
2086+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[1].GetInner())
2087+
2088+
require.True(t, ba.HasBufferedAllPrecedingWrites)
2089+
2090+
br := ba.CreateReply()
2091+
br.Txn = ba.Txn
2092+
return br, nil
2093+
},
2094+
expHasBufferedAllPrecedingWrites: true,
2095+
},
2096+
{
2097+
name: "batch with one Put and one Get request",
2098+
ba: func(ba *kvpb.BatchRequest) {
2099+
putA := putArgs(keyA, "valA", txn.Sequence)
2100+
getB := &kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: keyB, Sequence: txn.Sequence}}
2101+
ba.Add(putA, getB)
2102+
},
2103+
mockSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
2104+
require.Len(t, ba.Requests, 1)
2105+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner())
2106+
2107+
require.True(t, ba.HasBufferedAllPrecedingWrites)
2108+
2109+
br := ba.CreateReply()
2110+
br.Txn = ba.Txn
2111+
return br, nil
2112+
},
2113+
expHasBufferedAllPrecedingWrites: true,
2114+
},
2115+
{
2116+
name: "batch with one Put, one Get, and one Delete request",
2117+
ba: func(ba *kvpb.BatchRequest) {
2118+
putA := putArgs(keyA, "valA", txn.Sequence)
2119+
getB := &kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: keyB, Sequence: txn.Sequence}}
2120+
delC := delArgs(keyC, txn.Sequence)
2121+
2122+
ba.Add(putA, getB, delC)
2123+
},
2124+
mockSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
2125+
require.Len(t, ba.Requests, 1)
2126+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner())
2127+
2128+
require.True(t, ba.HasBufferedAllPrecedingWrites)
2129+
2130+
br := ba.CreateReply()
2131+
br.Txn = ba.Txn
2132+
return br, nil
2133+
},
2134+
expHasBufferedAllPrecedingWrites: true,
2135+
},
2136+
{
2137+
name: "batch with one DeleteRange and one Get request",
2138+
ba: func(ba *kvpb.BatchRequest) {
2139+
delRange := delRangeArgs(keyA, keyB, txn.Sequence)
2140+
getB := &kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: keyB, Sequence: txn.Sequence}}
2141+
2142+
ba.Add(delRange, getB)
2143+
},
2144+
mockSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
2145+
require.Len(t, ba.Requests, 2)
2146+
require.IsType(t, &kvpb.DeleteRangeRequest{}, ba.Requests[0].GetInner())
2147+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[1].GetInner())
2148+
2149+
require.True(t, ba.HasBufferedAllPrecedingWrites)
2150+
2151+
br := ba.CreateReply()
2152+
br.Txn = ba.Txn
2153+
return br, nil
2154+
},
2155+
expHasBufferedAllPrecedingWrites: false,
2156+
},
2157+
{
2158+
name: "flushed due to size limit",
2159+
setup: func(twb *txnWriteBuffer) {
2160+
bufferedWritesMaxBufferSize.Override(context.Background(), &twb.st.SV, 1)
2161+
},
2162+
ba: func(ba *kvpb.BatchRequest) {
2163+
putA := putArgs(keyA, "valA", txn.Sequence)
2164+
2165+
ba.Add(putA)
2166+
},
2167+
mockSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
2168+
require.Len(t, ba.Requests, 1)
2169+
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
2170+
2171+
require.True(t, ba.HasBufferedAllPrecedingWrites)
2172+
2173+
br := ba.CreateReply()
2174+
br.Txn = ba.Txn
2175+
return br, nil
2176+
},
2177+
expHasBufferedAllPrecedingWrites: false,
2178+
},
2179+
{
2180+
name: "write buffering disabled",
2181+
setup: func(twb *txnWriteBuffer) {
2182+
twb.setEnabled(false)
2183+
},
2184+
ba: func(ba *kvpb.BatchRequest) {
2185+
putA := putArgs(keyA, "valA", txn.Sequence)
2186+
2187+
ba.Add(putA)
2188+
},
2189+
mockSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
2190+
require.Len(t, ba.Requests, 1)
2191+
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
2192+
2193+
// NB: Should never be set if write buffering is disabled
2194+
require.False(t, ba.HasBufferedAllPrecedingWrites)
2195+
2196+
br := ba.CreateReply()
2197+
br.Txn = ba.Txn
2198+
return br, nil
2199+
},
2200+
expHasBufferedAllPrecedingWrites: false,
2201+
},
2202+
} {
2203+
t.Run(tc.name, func(t *testing.T) {
2204+
ctx := context.Background()
2205+
st := cluster.MakeTestingClusterSettings()
2206+
twb, mockSender := makeMockTxnWriteBuffer(st)
2207+
2208+
if tc.setup != nil {
2209+
tc.setup(&twb)
2210+
}
2211+
2212+
ba := &kvpb.BatchRequest{}
2213+
tc.ba(ba)
2214+
mockSender.MockSend(tc.mockSend)
2215+
2216+
br, pErr := twb.SendLocked(ctx, ba)
2217+
require.Nil(t, pErr)
2218+
require.NotNil(t, br)
2219+
2220+
// Go to commit the transaction and ensure HasBufferedAllPrecedingWrites
2221+
// is set correctly.
2222+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
2223+
require.Equal(t, tc.expHasBufferedAllPrecedingWrites, ba.HasBufferedAllPrecedingWrites)
2224+
2225+
br = ba.CreateReply()
2226+
br.Txn = ba.Txn
2227+
return br, nil
2228+
})
2229+
2230+
ba = &kvpb.BatchRequest{}
2231+
ba.Header = kvpb.Header{Txn: &txn}
2232+
ba.Add(&kvpb.EndTxnRequest{Commit: true})
2233+
2234+
br, pErr = twb.SendLocked(ctx, ba)
2235+
require.Nil(t, pErr)
2236+
require.NotNil(t, br)
2237+
require.Len(t, br.Responses, 1)
2238+
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
2239+
})
2240+
}
2241+
}
2242+
20592243
// BenchmarkTxnWriteBuffer benchmarks the txnWriteBuffer. The test sets up a
20602244
// transaction with an existing buffer and runs a single batch through
20612245
// SendLocked and flushBufferAndSendBatch. The test varies the state of the

pkg/kv/kvpb/api.proto

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2928,8 +2928,6 @@ message Header {
29282928
// * A destination node older than 24.1 will not see this field.
29292929
RangeInfo proxy_range_info = 34;
29302930

2931-
reserved 7, 10, 12, 14, 20;
2932-
29332931
WriteOptions write_options = 35;
29342932

29352933
// DeadlockTimeout specifies the amount of time that a request will wait on a
@@ -2945,7 +2943,27 @@ message Header {
29452943
google.protobuf.Duration deadlock_timeout = 36 [(gogoproto.nullable) = false,
29462944
(gogoproto.stdduration) = true];
29472945

2948-
// Next ID: 37
2946+
// HasBufferedAllPrecedingWrites, if set, indicates that the batch belongs to
2947+
// a transaction that has buffered all of its writes (from preceding batches)
2948+
// on the client.
2949+
//
2950+
// The server may use this field to omit checking the AbortSpan. Transactions
2951+
// use the AbortSpan to check whether they've been aborted or not. If they
2952+
// have, any intents they may have previously written could be removed by
2953+
// concurrent transactions, which means a transaction may not have a guarantee
2954+
// to read its own writes. So, transactions eagerly check the AbortSpan to
2955+
// identify this case and eagerly return a TransactionAbortedError to the
2956+
// client, instead of breaking read-your-own-writes. However, transactions
2957+
// that have buffered all writes on the client uphold read-your-own-writes
2958+
// semantics by joining results from the KVServer against the write buffer;
2959+
// simply put, they do not rely on the AbortSpan to uphold
2960+
// read-your-own-writes. As such, they can eschew checking the AbortSpan on
2961+
// the server.
2962+
bool has_buffered_all_preceding_writes = 37;
2963+
2964+
reserved 7, 10, 12, 14, 20;
2965+
2966+
// Next ID: 38
29492967
}
29502968

29512969
message WriteOptions {

0 commit comments

Comments
 (0)