Skip to content

Commit 6bb7514

Browse files
craig[bot]arulajmani
andcommitted
Merge #144799
144799: kvserver: omit AbortSpan checks for buffered writes transactions r=stevendanna,tbg a=arulajmani Transactions that use buffered writes do not rely on the AbortSpan to correctly uphold read-your-own-writes semantics. As such, we can omit AbortSpan checks for transactions that have buffered all writes, from preceding batches, on the client. Fixes #140593 Release note: None Co-authored-by: Arul Ajmani <[email protected]>
2 parents 6b9c3ec + b2b1034 commit 6bb7514

File tree

7 files changed

+339
-18
lines changed

7 files changed

+339
-18
lines changed

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -146,26 +146,26 @@ type txnWriteBuffer struct {
146146
// and disable write buffering going forward out of an abundance of caution.
147147
// This is opted into by SQL.
148148
//
149-
// As a result, we have a nice invariant: if write buffering is enabled, then
150-
// all writes performed by the transaction are buffered in memory. We can
151-
// never have the case where a part of the write set is buffered, and the
152-
// other part is replicated.
149+
// As a result, we have a nice invariant: if write buffering is enabled,
150+
// then all writes performed by the transaction are buffered in memory. We
151+
// can never have the case where a part of the write set is buffered, and
152+
// the other part is replicated.
153153
//
154-
// In the future, the invariant above allows us to omit checking the AbortSpan
155-
// for transactions that have buffered writes enabled. The AbortSpan is used
156-
// to ensure we don't violate read-your-own-write semantics for transactions
157-
// that have been aborted by a conflicting transaction. As read-your-own-write
158-
// semantics are upheld by the client, not the server, for transactions that
159-
// use buffered writes, we can skip the AbortSpan check on the server.
154+
// The invariant above allows us to omit checking the AbortSpan for
155+
// transactions that have buffered writes enabled. The AbortSpan is used to
156+
// ensure we don't violate read-your-own-write semantics for transactions
157+
// that have been aborted by a conflicting transaction. As
158+
// read-your-own-write semantics are upheld by the client, not the server,
159+
// for transactions that use buffered writes, we can skip the AbortSpan
160+
// check on the server.
160161
//
161162
// We currently track this via two state variables: `enabled` and `flushed`.
162163
// Writes are only buffered if enabled && !flushed.
163164
//
164-
// `enabled` tracks whether buffering has been enabled/disabled externally via
165-
// txn.SetBufferedWritesEnabled or because we are operating on a leaf
165+
// `enabled` tracks whether buffering has been enabled/disabled externally
166+
// via txn.SetBufferedWritesEnabled or because we are operating on a leaf
166167
// transaction.
167168
enabled bool
168-
//
169169
// `flushed` tracks whether the buffer has been previously flushed.
170170
flushed bool
171171

@@ -210,6 +210,15 @@ func (twb *txnWriteBuffer) SendLocked(
210210

211211
if !twb.shouldBuffer() {
212212
return twb.wrapped.SendLocked(ctx, ba)
213+
} else {
214+
// If we're here, write buffering is enabled, and all writes until now
215+
// have been buffered. Set the flag to indicate this.
216+
//
217+
// NB: We don't need a version check here (for v25.3) because this is only
218+
// used by the server to optimize away the AbortSpan check. Even if we set
219+
// this field, and the server is on a previous version, the worst that can
220+
// happen is we'll perform this check, which is harmless.
221+
ba.HasBufferedAllPrecedingWrites = true
213222
}
214223

215224
if etArg, ok := ba.GetArg(kvpb.EndTxn); ok {

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2056,6 +2056,190 @@ func TestTxnWriteBufferBatchRequestValidation(t *testing.T) {
20562056
}
20572057
}
20582058

2059+
// TestTxnWriteBufferHasBufferedAllPrecedingWrites verifies that the
2060+
// txnWriteBuffer correctly sets the HasBufferedAllPrecedingWrites flag.
2061+
func TestTxnWriteBufferHasBufferedAllPrecedingWrites(t *testing.T) {
2062+
defer leaktest.AfterTest(t)()
2063+
defer log.Scope(t).Close(t)
2064+
2065+
txn := makeTxnProto()
2066+
txn.Sequence = 1
2067+
keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")
2068+
2069+
for _, tc := range []struct {
2070+
name string
2071+
setup func(*txnWriteBuffer)
2072+
ba func(ba *kvpb.BatchRequest)
2073+
mockSend func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error)
2074+
expHasBufferedAllPrecedingWrites bool
2075+
}{
2076+
{
2077+
name: "batch with two Get requests",
2078+
ba: func(ba *kvpb.BatchRequest) {
2079+
getA := &kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: keyA, Sequence: txn.Sequence}}
2080+
getB := &kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: keyB, Sequence: txn.Sequence}}
2081+
ba.Add(getA, getB)
2082+
},
2083+
mockSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
2084+
require.Len(t, ba.Requests, 2)
2085+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner())
2086+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[1].GetInner())
2087+
2088+
require.True(t, ba.HasBufferedAllPrecedingWrites)
2089+
2090+
br := ba.CreateReply()
2091+
br.Txn = ba.Txn
2092+
return br, nil
2093+
},
2094+
expHasBufferedAllPrecedingWrites: true,
2095+
},
2096+
{
2097+
name: "batch with one Put and one Get request",
2098+
ba: func(ba *kvpb.BatchRequest) {
2099+
putA := putArgs(keyA, "valA", txn.Sequence)
2100+
getB := &kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: keyB, Sequence: txn.Sequence}}
2101+
ba.Add(putA, getB)
2102+
},
2103+
mockSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
2104+
require.Len(t, ba.Requests, 1)
2105+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner())
2106+
2107+
require.True(t, ba.HasBufferedAllPrecedingWrites)
2108+
2109+
br := ba.CreateReply()
2110+
br.Txn = ba.Txn
2111+
return br, nil
2112+
},
2113+
expHasBufferedAllPrecedingWrites: true,
2114+
},
2115+
{
2116+
name: "batch with one Put, one Get, and one Delete request",
2117+
ba: func(ba *kvpb.BatchRequest) {
2118+
putA := putArgs(keyA, "valA", txn.Sequence)
2119+
getB := &kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: keyB, Sequence: txn.Sequence}}
2120+
delC := delArgs(keyC, txn.Sequence)
2121+
2122+
ba.Add(putA, getB, delC)
2123+
},
2124+
mockSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
2125+
require.Len(t, ba.Requests, 1)
2126+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner())
2127+
2128+
require.True(t, ba.HasBufferedAllPrecedingWrites)
2129+
2130+
br := ba.CreateReply()
2131+
br.Txn = ba.Txn
2132+
return br, nil
2133+
},
2134+
expHasBufferedAllPrecedingWrites: true,
2135+
},
2136+
{
2137+
name: "batch with one DeleteRange and one Get request",
2138+
ba: func(ba *kvpb.BatchRequest) {
2139+
delRange := delRangeArgs(keyA, keyB, txn.Sequence)
2140+
getB := &kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: keyB, Sequence: txn.Sequence}}
2141+
2142+
ba.Add(delRange, getB)
2143+
},
2144+
mockSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
2145+
require.Len(t, ba.Requests, 2)
2146+
require.IsType(t, &kvpb.DeleteRangeRequest{}, ba.Requests[0].GetInner())
2147+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[1].GetInner())
2148+
2149+
require.True(t, ba.HasBufferedAllPrecedingWrites)
2150+
2151+
br := ba.CreateReply()
2152+
br.Txn = ba.Txn
2153+
return br, nil
2154+
},
2155+
expHasBufferedAllPrecedingWrites: false,
2156+
},
2157+
{
2158+
name: "flushed due to size limit",
2159+
setup: func(twb *txnWriteBuffer) {
2160+
bufferedWritesMaxBufferSize.Override(context.Background(), &twb.st.SV, 1)
2161+
},
2162+
ba: func(ba *kvpb.BatchRequest) {
2163+
putA := putArgs(keyA, "valA", txn.Sequence)
2164+
2165+
ba.Add(putA)
2166+
},
2167+
mockSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
2168+
require.Len(t, ba.Requests, 1)
2169+
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
2170+
2171+
require.True(t, ba.HasBufferedAllPrecedingWrites)
2172+
2173+
br := ba.CreateReply()
2174+
br.Txn = ba.Txn
2175+
return br, nil
2176+
},
2177+
expHasBufferedAllPrecedingWrites: false,
2178+
},
2179+
{
2180+
name: "write buffering disabled",
2181+
setup: func(twb *txnWriteBuffer) {
2182+
twb.setEnabled(false)
2183+
},
2184+
ba: func(ba *kvpb.BatchRequest) {
2185+
putA := putArgs(keyA, "valA", txn.Sequence)
2186+
2187+
ba.Add(putA)
2188+
},
2189+
mockSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
2190+
require.Len(t, ba.Requests, 1)
2191+
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
2192+
2193+
// NB: Should never be set if write buffering is disabled
2194+
require.False(t, ba.HasBufferedAllPrecedingWrites)
2195+
2196+
br := ba.CreateReply()
2197+
br.Txn = ba.Txn
2198+
return br, nil
2199+
},
2200+
expHasBufferedAllPrecedingWrites: false,
2201+
},
2202+
} {
2203+
t.Run(tc.name, func(t *testing.T) {
2204+
ctx := context.Background()
2205+
st := cluster.MakeTestingClusterSettings()
2206+
twb, mockSender := makeMockTxnWriteBuffer(st)
2207+
2208+
if tc.setup != nil {
2209+
tc.setup(&twb)
2210+
}
2211+
2212+
ba := &kvpb.BatchRequest{}
2213+
tc.ba(ba)
2214+
mockSender.MockSend(tc.mockSend)
2215+
2216+
br, pErr := twb.SendLocked(ctx, ba)
2217+
require.Nil(t, pErr)
2218+
require.NotNil(t, br)
2219+
2220+
// Go to commit the transaction and ensure HasBufferedAllPrecedingWrites
2221+
// is set correctly.
2222+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
2223+
require.Equal(t, tc.expHasBufferedAllPrecedingWrites, ba.HasBufferedAllPrecedingWrites)
2224+
2225+
br = ba.CreateReply()
2226+
br.Txn = ba.Txn
2227+
return br, nil
2228+
})
2229+
2230+
ba = &kvpb.BatchRequest{}
2231+
ba.Header = kvpb.Header{Txn: &txn}
2232+
ba.Add(&kvpb.EndTxnRequest{Commit: true})
2233+
2234+
br, pErr = twb.SendLocked(ctx, ba)
2235+
require.Nil(t, pErr)
2236+
require.NotNil(t, br)
2237+
require.Len(t, br.Responses, 1)
2238+
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
2239+
})
2240+
}
2241+
}
2242+
20592243
// BenchmarkTxnWriteBuffer benchmarks the txnWriteBuffer. The test sets up a
20602244
// transaction with an existing buffer and runs a single batch through
20612245
// SendLocked and flushBufferAndSendBatch. The test varies the state of the

pkg/kv/kvclient/kvcoord/txn_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,20 @@ import (
3838
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
3939
"github.com/cockroachdb/cockroach/pkg/util/log"
4040
"github.com/cockroachdb/cockroach/pkg/util/randutil"
41+
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
42+
"github.com/cockroachdb/cockroach/pkg/util/uuid"
4143
"github.com/cockroachdb/errors"
4244
"github.com/stretchr/testify/require"
4345
)
4446

47+
func checkGetResults(t *testing.T, expected map[string][]byte, results ...kv.Result) {
48+
for _, result := range results {
49+
require.Equal(t, 1, len(result.Rows))
50+
require.Equal(t, expected[string(result.Rows[0].Key)], result.Rows[0].ValueBytes())
51+
}
52+
require.Len(t, expected, len(results))
53+
}
54+
4555
// TestTxnDBBasics verifies that a simple transaction can be run and
4656
// either committed or aborted. On commit, mutations are visible; on
4757
// abort, mutations are never visible. During the txn, verify that
@@ -2371,3 +2381,82 @@ func TestLeafTransactionAdmissionHeader(t *testing.T) {
23712381
}
23722382
require.Equal(t, expectedLeafHeader, leafHeader)
23732383
}
2384+
2385+
// TestTxnBufferedWritesOmitAbortSpanChecks verifies that transactions that use
2386+
// buffered writes do not check the AbortSpan, while still upholding
2387+
// read-your-own-writes semantics.
2388+
func TestTxnBufferedWritesOmitAbortSpanChecks(t *testing.T) {
2389+
defer leaktest.AfterTest(t)()
2390+
defer log.Scope(t).Close(t)
2391+
ctx := context.Background()
2392+
2393+
var mu struct {
2394+
syncutil.Mutex
2395+
txnID uuid.UUID
2396+
}
2397+
s := createTestDBWithKnobs(t, &kvserver.StoreTestingKnobs{
2398+
EvalKnobs: kvserverbase.BatchEvalTestingKnobs{
2399+
BeforeAbortSpanCheck: func(id uuid.UUID) {
2400+
mu.Lock()
2401+
defer mu.Unlock()
2402+
2403+
if mu.txnID == id {
2404+
t.Fatal("transactions using buffered writes should not check the AbortSpan")
2405+
}
2406+
},
2407+
},
2408+
})
2409+
defer s.Stop()
2410+
2411+
value1 := []byte("value1")
2412+
valueConflict := []byte("conflict")
2413+
2414+
keyA := []byte("keyA")
2415+
2416+
txn := kv.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */)
2417+
txn.SetBufferedWritesEnabled(true)
2418+
mu.Lock()
2419+
mu.txnID = txn.ID()
2420+
mu.Unlock()
2421+
2422+
// Fix the transaction's commit timestamp.
2423+
_, err := txn.CommitTimestamp()
2424+
require.NoError(t, err)
2425+
2426+
// Put transactional value at keyA.
2427+
require.NoError(t, txn.Put(ctx, keyA, value1))
2428+
2429+
// Read what we just wrote.
2430+
b := txn.NewBatch()
2431+
b.Get(keyA)
2432+
require.NoError(t, txn.Run(ctx, b))
2433+
expected := map[string][]byte{
2434+
"keyA": value1,
2435+
}
2436+
checkGetResults(t, expected, b.Results...)
2437+
2438+
// Start another transaction that writes to keyA. This prevents us from
2439+
// committing at our original timestamp. Moreover, had we not been buffering
2440+
// our writes, this transaction would have resulted in aborting us and
2441+
// removing our intent.
2442+
err = s.DB.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
2443+
require.NoError(t, txn.SetUserPriority(roachpb.MaxUserPriority))
2444+
return txn.Put(ctx, keyA, valueConflict)
2445+
})
2446+
require.NoError(t, err)
2447+
2448+
// Perform another read again. We should still see our previous write, not what
2449+
// the conflicting transaction wrote.
2450+
b = txn.NewBatch()
2451+
b.Get(keyA)
2452+
require.NoError(t, txn.Run(ctx, b))
2453+
expected = map[string][]byte{
2454+
"keyA": value1,
2455+
}
2456+
checkGetResults(t, expected, b.Results...)
2457+
2458+
// Try to commit the transaction. We should encounter a WriteTooOldError.
2459+
err = txn.Commit(ctx)
2460+
require.Error(t, err)
2461+
require.Regexp(t, "TransactionRetryWithProtoRefreshError: .*WriteTooOldError", err)
2462+
}

pkg/kv/kvpb/api.proto

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2928,8 +2928,6 @@ message Header {
29282928
// * A destination node older than 24.1 will not see this field.
29292929
RangeInfo proxy_range_info = 34;
29302930

2931-
reserved 7, 10, 12, 14, 20;
2932-
29332931
WriteOptions write_options = 35;
29342932

29352933
// DeadlockTimeout specifies the amount of time that a request will wait on a
@@ -2945,7 +2943,27 @@ message Header {
29452943
google.protobuf.Duration deadlock_timeout = 36 [(gogoproto.nullable) = false,
29462944
(gogoproto.stdduration) = true];
29472945

2948-
// Next ID: 37
2946+
// HasBufferedAllPrecedingWrites, if set, indicates that the batch belongs to
2947+
// a transaction that has buffered all of its writes (from preceding batches)
2948+
// on the client.
2949+
//
2950+
// The server may use this field to omit checking the AbortSpan. Transactions
2951+
// use the AbortSpan to check whether they've been aborted or not. If they
2952+
// have, any intents they may have previously written could be removed by
2953+
// concurrent transactions, which means a transaction may not have a guarantee
2954+
// to read its own writes. So, transactions eagerly check the AbortSpan to
2955+
// identify this case and eagerly return a TransactionAbortedError to the
2956+
// client, instead of breaking read-your-own-writes. However, transactions
2957+
// that have buffered all writes on the client uphold read-your-own-writes
2958+
// semantics by joining results from the KVServer against the write buffer;
2959+
// simply put, they do not rely on the AbortSpan to uphold
2960+
// read-your-own-writes. As such, they can eschew checking the AbortSpan on
2961+
// the server.
2962+
bool has_buffered_all_preceding_writes = 37;
2963+
2964+
reserved 7, 10, 12, 14, 20;
2965+
2966+
// Next ID: 38
29492967
}
29502968

29512969
message WriteOptions {

pkg/kv/kvserver/kvserverbase/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ go_library(
2828
"//pkg/util/quotapool",
2929
"//pkg/util/syncutil",
3030
"//pkg/util/timeutil",
31+
"//pkg/util/uuid",
3132
"@com_github_cockroachdb_errors//:errors",
3233
"@com_github_cockroachdb_pebble//vfs",
3334
"@com_github_cockroachdb_redact//:redact",

0 commit comments

Comments
 (0)