Skip to content

Commit 3abfebd

Browse files
committed
kvcoord: avoid erroneous rollback of buffered write
Previously, if a transaction had only done non-locking reads and buffered writes, a rollback would erroneously rollback the last issued buffered write because savepoint creation only incremented the sequence number if a lock had been acquired. This case is never encountered in SQL-generated transactions because we never completely buffer a Put or Delete without having previously issued at least 1 locking read that we expect to return a lock. But, we do hit these kind of transactions in KVNemesis. Epic: none Release note: None
1 parent 664648a commit 3abfebd

File tree

3 files changed

+47
-2
lines changed

3 files changed

+47
-2
lines changed

pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,16 @@ func (tc *TxnCoordSender) CreateSavepoint(ctx context.Context) (kv.SavepointToke
8585
// happened before the savepoint and those that happened after.
8686
// TODO(nvanbenschoten): once #113765 is resolved, we should make this
8787
// unconditional and push it into txnSeqNumAllocator.createSavepointLocked.
88-
if tc.interceptorAlloc.txnPipeliner.hasAcquiredLocks() {
88+
//
89+
// We also increment the write sequence if any writes are buffered since
90+
// a buffered write will not appears as a lock acquisition in the
91+
// pipeliner. For SQL-generated transactions, this additional check is
92+
// unnecessary since we won't ever have buffered writes wihout also at
93+
// least 1 locked row. However, for other types of KV transactions such
94+
// as those generated by KVNemesis we may have blind Put or Delete
95+
// requests that would be erroneously rolled back if we didn't increment
96+
// the sequence here.
97+
if tc.interceptorAlloc.txnPipeliner.hasAcquiredLocks() || tc.interceptorAlloc.txnWriteBuffer.hasBufferedWrites() {
8998
if err := tc.interceptorAlloc.txnSeqNumAllocator.stepWriteSeqLocked(ctx); err != nil {
9099
return nil, err
91100
}

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1137,7 +1137,7 @@ func (twb *txnWriteBuffer) flushBufferAndSendBatch(
11371137

11381138
// Layers below us expect that writes inside a batch are in sequence number
11391139
// order but the iterator above returns data in key order. Here we re-sort it
1140-
// which is unfortunate but required we make a change to the pipeliner.
1140+
// which is unfortunate but required unless we make a change to the pipeliner.
11411141
slices.SortFunc(reqs, func(a kvpb.RequestUnion, b kvpb.RequestUnion) int {
11421142
aHeader := a.GetInner().Header()
11431143
bHeader := b.GetInner().Header()

pkg/kv/kvclient/kvcoord/txn_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2046,3 +2046,39 @@ func TestTxnBufferedWritesConditionalPuts(t *testing.T) {
20462046
}
20472047
})
20482048
}
2049+
2050+
// TestTxnBufferedWritesRollbackToSavepointAllBuffered is a regression
2051+
// test for a bug encountered during development where the sequence
2052+
// number of a savepoint was not correctly advanced when all writes in
2053+
// a transaction had been buffered.
2054+
func TestTxnBufferedWritesRollbackToSavepointAllBuffered(t *testing.T) {
2055+
defer leaktest.AfterTest(t)()
2056+
defer log.Scope(t).Close(t)
2057+
s := createTestDB(t)
2058+
defer s.Stop()
2059+
2060+
ctx := context.Background()
2061+
value1 := []byte("value1")
2062+
testutils.RunTrueAndFalse(t, "buffered_write", func(t *testing.T, bufferedWritesEnabled bool) {
2063+
keyA := []byte(fmt.Sprintf("keyA-%v", bufferedWritesEnabled))
2064+
2065+
err := s.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
2066+
txn.SetBufferedWritesEnabled(bufferedWritesEnabled)
2067+
// Should not be rolled back because the savepoint is
2068+
// created after it.
2069+
if err := txn.Put(ctx, keyA, value1); err != nil {
2070+
return err
2071+
}
2072+
sp, err := txn.CreateSavepoint(ctx)
2073+
if err != nil {
2074+
return err
2075+
}
2076+
return txn.RollbackToSavepoint(ctx, sp)
2077+
})
2078+
require.NoError(t, err)
2079+
actualA, err := s.DB.Get(ctx, keyA)
2080+
require.NoError(t, err)
2081+
require.Equal(t, actualA.ValueBytes(), value1)
2082+
})
2083+
2084+
}

0 commit comments

Comments
 (0)