Skip to content

Commit 874ad8f

Browse files
craig[bot]stevendanna
andcommitted
Merge #144221
144221: kvcoord: support stateful txn retries in txnWriteBuffer r=yuzefovich a=stevendanna Previously, a stateful transaction retry would result in: 1) errors when calling SetBufferedWritesEnabled, and 2) erroneous writes from the previous epoch being flushed Here, (1) is fixed by separately tracking the flushed state from the enabled state, allowing SetBufferedWritesEnabled to be called even after we've disabled future flushing. (2) is solved by clearing the buffer when the epoch is bumped. We've decided to keep write buffering disabled after an epoch bump to maintain the current invariant that once we've stopped buffering writes, we won't resume buffering. Fixes #139057 Release note: None Co-authored-by: Steven Danna <[email protected]>
2 parents 74f9ec4 + 0172edf commit 874ad8f

File tree

3 files changed

+148
-25
lines changed

3 files changed

+148
-25
lines changed

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 54 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
2121
"github.com/cockroachdb/cockroach/pkg/storage/mvccencoding"
2222
"github.com/cockroachdb/cockroach/pkg/storage/mvcceval"
23+
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
2324
"github.com/cockroachdb/cockroach/pkg/util/log"
2425
"github.com/cockroachdb/errors"
2526
)
@@ -151,7 +152,17 @@ type txnWriteBuffer struct {
151152
// that have been aborted by a conflicting transaction. As read-your-own-write
152153
// semantics are upheld by the client, not the server, for transactions that
153154
// use buffered writes, we can skip the AbortSpan check on the server.
155+
//
156+
// We currently track this via two state variables: `enabled` and `flushed`.
157+
// Writes are only buffered if enabled && !flushed.
158+
//
159+
// `enabled` tracks whether buffering has been enabled/disabled externally via
160+
// txn.SetBufferedWritesEnabled or because we are operating on a leaf
161+
// transaction.
154162
enabled bool
163+
//
164+
// `flushed` tracks whether the buffer has been previously flushed.
165+
flushed bool
155166

156167
// flushOnNextBatch, if set, indicates that write buffering has just been
157168
// disabled, and the interceptor should flush any buffered writes when it
@@ -188,7 +199,7 @@ func (twb *txnWriteBuffer) SendLocked(
188199
return twb.flushBufferAndSendBatch(ctx, ba)
189200
}
190201

191-
if !twb.enabled {
202+
if !twb.shouldBuffer() {
192203
return twb.wrapped.SendLocked(ctx, ba)
193204
}
194205

@@ -202,6 +213,7 @@ func (twb *txnWriteBuffer) SendLocked(
202213
}
203214

204215
if _, ok := ba.GetArg(kvpb.DeleteRange); ok {
216+
log.VEventf(ctx, 2, "DeleteRangeRequest forcing flush of write buffer")
205217
// DeleteRange requests can delete an arbitrary number of keys over a
206218
// given keyspan. We won't know the exact scope of the delete until
207219
// we've scanned the keyspan, which must happen on the server. We've got
@@ -231,8 +243,13 @@ func (twb *txnWriteBuffer) SendLocked(
231243
// Check if buffering writes from the supplied batch will run us over
232244
// budget. If it will, we shouldn't buffer writes from the current batch,
233245
// and flush the buffer.
234-
if twb.estimateSize(ba)+twb.bufferSize > bufferedWritesMaxBufferSize.Get(&twb.st.SV) {
246+
maxSize := bufferedWritesMaxBufferSize.Get(&twb.st.SV)
247+
bufSize := twb.estimateSize(ba) + twb.bufferSize
248+
if bufSize > maxSize {
235249
// TODO(arul): add some metrics for this case.
250+
log.VEventf(ctx, 2, "flushing buffer because buffer size (%s) exceeds max size (%s)",
251+
humanizeutil.IBytes(bufSize),
252+
humanizeutil.IBytes(maxSize))
236253
return twb.flushBufferAndSendBatch(ctx, ba)
237254
}
238255

@@ -325,12 +342,17 @@ func (twb *txnWriteBuffer) adjustError(
325342
if ts[0].stripped {
326343
numStripped++
327344
} else {
328-
// TODO(arul): If the error index points to a request that we've
329-
// transformed, returning this back to the client is weird -- the
330-
// client doesn't know we're making transformations. We should
331-
// probably just log a warning and clear out the error index for such
332-
// cases.
333-
log.Fatal(ctx, "unhandled")
345+
// This is a transformed request (for example a LockingGet that was
346+
// sent instead of a Del). In this case, the error might be a bit
347+
// confusing to the client since the request that had an error isn't
348+
// exactly the request the user sent.
349+
//
350+
// For now, we handle this by logging and removing the error index.
351+
if baIdx == pErr.Index.Index {
352+
log.Warningf(ctx, "error index %d is part of a transformed request", pErr.Index.Index)
353+
pErr.Index = nil
354+
return pErr
355+
}
334356
}
335357
ts = ts[1:]
336358
continue
@@ -444,7 +466,14 @@ func (twb *txnWriteBuffer) importLeafFinalState(context.Context, *roachpb.LeafTx
444466
}
445467

446468
// epochBumpedLocked implements the txnInterceptor interface.
447-
func (twb *txnWriteBuffer) epochBumpedLocked() {}
469+
func (twb *txnWriteBuffer) epochBumpedLocked() {
470+
twb.resetBuffer()
471+
}
472+
473+
func (twb *txnWriteBuffer) resetBuffer() {
474+
twb.buffer.Reset()
475+
twb.bufferSize = 0
476+
}
448477

449478
// createSavepointLocked is part of the txnInterceptor interface.
450479
func (twb *txnWriteBuffer) createSavepointLocked(context.Context, *savepoint) {}
@@ -1132,34 +1161,28 @@ func (twb *txnWriteBuffer) flushBufferAndSendBatch(
11321161
defer func() {
11331162
assertTrue(twb.buffer.Len() == 0, "buffer should be empty after flush")
11341163
assertTrue(twb.bufferSize == 0, "buffer size should be 0 after flush")
1164+
assertTrue(twb.flushed, "flushed should be true after flush")
11351165
}()
11361166

1167+
// Once we've flushed the buffer, we disable write buffering going forward. We
1168+
// do this even if the buffer is empty since once we've called this function,
1169+
// our buffer no longer represents all of the writes in the transaction.
1170+
log.VEventf(ctx, 2, "disabling write buffering for this epoch")
1171+
twb.flushed = true
1172+
11371173
numBuffered := twb.buffer.Len()
11381174
if numBuffered == 0 {
11391175
return twb.wrapped.SendLocked(ctx, ba) // nothing to flush
11401176
}
11411177

1142-
// Once we've flushed the buffer, we disable write buffering going forward.
1143-
twb.enabled = false
1144-
11451178
// Flush all buffered writes by pre-pending them to the requests being sent
11461179
// in the batch.
1147-
// First, collect the requests we'll need to flush.
1148-
toFlushBufferedWrites := make([]bufferedWrite, 0, twb.buffer.Len())
1149-
1180+
reqs := make([]kvpb.RequestUnion, 0, numBuffered+len(ba.Requests))
11501181
it := twb.buffer.MakeIter()
11511182
for it.First(); it.Valid(); it.Next() {
1152-
toFlushBufferedWrites = append(toFlushBufferedWrites, *it.Cur())
1153-
}
1154-
1155-
reqs := make([]kvpb.RequestUnion, 0, numBuffered+len(ba.Requests))
1156-
1157-
// Next, remove the buffered writes from the buffer and collect them into
1158-
// requests.
1159-
for _, bw := range toFlushBufferedWrites {
1160-
reqs = append(reqs, bw.toRequest())
1161-
twb.removeFromBuffer(&bw)
1183+
reqs = append(reqs, it.Cur().toRequest())
11621184
}
1185+
twb.resetBuffer()
11631186

11641187
// Layers below us expect that writes inside a batch are in sequence number
11651188
// order but the iterator above returns data in key order. Here we re-sort it
@@ -1194,6 +1217,12 @@ func (twb *txnWriteBuffer) hasBufferedWrites() bool {
11941217
return twb.buffer.Len() > 0
11951218
}
11961219

1220+
// shouldBuffer returns true if SendLocked() should attempt to buffer parts of
1221+
// the batch.
1222+
func (twb *txnWriteBuffer) shouldBuffer() bool {
1223+
return twb.enabled && !twb.flushed
1224+
}
1225+
11971226
// testingBufferedWritesAsSlice returns all buffered writes, in key order, as a
11981227
// slice.
11991228
func (twb *txnWriteBuffer) testingBufferedWritesAsSlice() []bufferedWrite {

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1817,3 +1817,37 @@ func TestTxnWriteBufferFlushesAfterDisabling(t *testing.T) {
18171817
require.Len(t, br.Responses, 1)
18181818
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
18191819
}
1820+
1821+
// TestTxnWriteBufferClearsBufferOnEpochBump tests that the txnWriteBuffer
1822+
// clears its buffer whenever the epoch is bumped.
1823+
func TestTxnWriteBufferClearsBufferOnEpochBump(t *testing.T) {
1824+
defer leaktest.AfterTest(t)()
1825+
defer log.Scope(t).Close(t)
1826+
ctx := context.Background()
1827+
twb, mockSender := makeMockTxnWriteBuffer(cluster.MakeClusterSettings())
1828+
1829+
txn := makeTxnProto()
1830+
txn.Sequence = 1
1831+
1832+
keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")
1833+
1834+
// Blindly write to some keys that should all be buffered.
1835+
ba := &kvpb.BatchRequest{}
1836+
ba.Header = kvpb.Header{Txn: &txn}
1837+
putA := putArgs(keyA, "valA", txn.Sequence)
1838+
putB := putArgs(keyB, "valB", txn.Sequence)
1839+
delC := delArgs(keyC, txn.Sequence)
1840+
ba.Add(putA, putB, delC)
1841+
1842+
numCalled := mockSender.NumCalled()
1843+
br, pErr := twb.SendLocked(ctx, ba)
1844+
require.Nil(t, pErr)
1845+
require.NotNil(t, br)
1846+
require.Equal(t, numCalled, mockSender.NumCalled())
1847+
1848+
// The buffer should be cleared after epoch bump.
1849+
twb.epochBumpedLocked()
1850+
require.Equal(t, 0, len(twb.testingBufferedWritesAsSlice()))
1851+
require.Equal(t, 0, int(twb.bufferSize))
1852+
require.Equal(t, numCalled, mockSender.NumCalled())
1853+
}

pkg/kv/kvclient/kvcoord/txn_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"context"
1111
"fmt"
1212
"sort"
13+
"sync/atomic"
1314
"testing"
1415
"time"
1516

@@ -36,6 +37,7 @@ import (
3637
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3738
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
3839
"github.com/cockroachdb/cockroach/pkg/util/log"
40+
"github.com/cockroachdb/cockroach/pkg/util/randutil"
3941
"github.com/cockroachdb/errors"
4042
"github.com/stretchr/testify/require"
4143
)
@@ -2080,5 +2082,63 @@ func TestTxnBufferedWritesRollbackToSavepointAllBuffered(t *testing.T) {
20802082
require.NoError(t, err)
20812083
require.Equal(t, actualA.ValueBytes(), value1)
20822084
})
2085+
}
2086+
2087+
// TestTxnBufferedWriteRetriesCorrectly tests that a stateful retry of a
2088+
// transaction with buffered writes enabled does not encounter errors because of
2089+
// state in the txnWriteBuffer.
2090+
func TestTxnBufferedWriteRetriesCorrectly(t *testing.T) {
2091+
defer leaktest.AfterTest(t)()
2092+
defer log.Scope(t).Close(t)
2093+
ctx := context.Background()
20832094

2095+
// This filter will force a transaction retry.
2096+
errKey := roachpb.Key("inject_err")
2097+
var shouldInject atomic.Bool
2098+
reqFilter := func(_ context.Context, ba *kvpb.BatchRequest) *kvpb.Error {
2099+
if !shouldInject.Load() {
2100+
return nil
2101+
}
2102+
2103+
if g, ok := ba.GetArg(kvpb.Get); ok && g.(*kvpb.GetRequest).Key.Equal(errKey) {
2104+
txn := ba.Txn.Clone()
2105+
pErr := kvpb.NewReadWithinUncertaintyIntervalError(
2106+
txn.ReadTimestamp,
2107+
hlc.ClockTimestamp{},
2108+
txn,
2109+
txn.WriteTimestamp.Add(0, 1),
2110+
hlc.ClockTimestamp{})
2111+
return kvpb.NewErrorWithTxn(pErr, txn)
2112+
}
2113+
return nil
2114+
}
2115+
2116+
s := createTestDBWithKnobs(t, &kvserver.StoreTestingKnobs{
2117+
TestingRequestFilter: reqFilter,
2118+
})
2119+
defer s.Stop()
2120+
2121+
rng, _ := randutil.NewTestRand()
2122+
2123+
testutils.RunTrueAndFalse(t, "buffered_writes", func(t *testing.T, bwEnabled bool) {
2124+
testPrefix := fmt.Sprintf("test-bw-%v-", bwEnabled)
2125+
shouldInject.Swap(true)
2126+
2127+
err := s.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
2128+
defer func() { _ = shouldInject.Swap(false) }()
2129+
2130+
txn.SetBufferedWritesEnabled(bwEnabled)
2131+
key := fmt.Sprintf("%s-%s", testPrefix, randutil.RandString(rng, 10, randutil.PrintableKeyAlphabet))
2132+
if err := txn.Put(ctx, roachpb.Key(key), []byte("values")); err != nil {
2133+
return err
2134+
}
2135+
_, err := txn.Get(ctx, errKey)
2136+
return err
2137+
})
2138+
require.NoError(t, err)
2139+
startKey := roachpb.Key(testPrefix)
2140+
keys, err := s.DB.Scan(ctx, startKey, startKey.PrefixEnd(), 0)
2141+
require.NoError(t, err)
2142+
require.Equal(t, 1, len(keys))
2143+
})
20842144
}

0 commit comments

Comments
 (0)