Skip to content

Commit 02d1f93

Browse files
committed
kvclient: enable savepoint rollbacks with buffered writes
This patch correctly handles savepoint rollbacks when buffered writes are enabled. Whenever a savepoint is rolled back, any writes that belong to the savepoint are removed from the buffer. Closes #139059 Release note: None
1 parent 9fef4b2 commit 02d1f93

File tree

3 files changed

+237
-7
lines changed

3 files changed

+237
-7
lines changed

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
"encoding/binary"
1111
"slices"
12+
"sort"
1213
"unsafe"
1314

1415
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
@@ -423,7 +424,36 @@ func (twb *txnWriteBuffer) epochBumpedLocked() {}
423424
func (twb *txnWriteBuffer) createSavepointLocked(context.Context, *savepoint) {}
424425

425426
// rollbackToSavepointLocked is part of the txnInterceptor interface.
426-
func (twb *txnWriteBuffer) rollbackToSavepointLocked(ctx context.Context, s savepoint) {}
427+
func (twb *txnWriteBuffer) rollbackToSavepointLocked(ctx context.Context, s savepoint) {
428+
toDelete := make([]*bufferedWrite, 0)
429+
it := twb.buffer.MakeIter()
430+
for it.First(); it.Valid(); it.Next() {
431+
bufferedVals := it.Cur().vals
432+
// NB: the savepoint is being rolled back to s.seqNum (inclusive). So,
433+
// idx is the index of the first value that is considered rolled back.
434+
idx := sort.Search(len(bufferedVals), func(i int) bool {
435+
return bufferedVals[i].seq >= s.seqNum
436+
})
437+
if idx == len(bufferedVals) {
438+
// No writes are being rolled back.
439+
continue
440+
}
441+
// Update size bookkeeping for the values we're rolling back.
442+
for i := idx; i < len(bufferedVals); i++ {
443+
twb.bufferSize -= bufferedVals[i].size()
444+
}
445+
// Rollback writes by truncating the buffered values.
446+
it.Cur().vals = bufferedVals[:idx]
447+
if len(it.Cur().vals) == 0 {
448+
// All writes have been rolled back; we should remove this key from
449+
// the buffer entirely.
450+
toDelete = append(toDelete, it.Cur())
451+
}
452+
}
453+
for _, bw := range toDelete {
454+
twb.removeFromBuffer(bw)
455+
}
456+
}
427457

428458
// closeLocked implements the txnInterceptor interface.
429459
func (twb *txnWriteBuffer) closeLocked() {}
@@ -1057,6 +1087,12 @@ func (twb *txnWriteBuffer) addToBuffer(key roachpb.Key, val roachpb.Value, seq e
10571087
}
10581088
}
10591089

1090+
// removeFromBuffer removes all buffered writes on a given key from the buffer.
1091+
func (twb *txnWriteBuffer) removeFromBuffer(bw *bufferedWrite) {
1092+
twb.buffer.Delete(bw)
1093+
twb.bufferSize -= bw.size()
1094+
}
1095+
10601096
// flushBufferAndSendBatch flushes all buffered writes when sending the supplied
10611097
// batch request to the KV layer. This is done by pre-pending the buffered
10621098
// writes to the requests in the batch.
@@ -1092,11 +1128,11 @@ func (twb *txnWriteBuffer) flushBufferAndSendBatch(
10921128

10931129
reqs := make([]kvpb.RequestUnion, 0, numBuffered+len(ba.Requests))
10941130

1095-
// Next, remove the buffered writes from the buffer and collect them into requests.
1131+
// Next, remove the buffered writes from the buffer and collect them into
1132+
// requests.
10961133
for _, bw := range toFlushBufferedWrites {
10971134
reqs = append(reqs, bw.toRequest())
1098-
twb.buffer.Delete(&bw)
1099-
twb.bufferSize -= bw.size()
1135+
twb.removeFromBuffer(&bw)
11001136
}
11011137

11021138
// Layers below us expect that writes inside a batch are in sequence number

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1614,3 +1614,101 @@ func TestTxnWriteBufferDeleteRange(t *testing.T) {
16141614
require.Len(t, br.Responses, 1)
16151615
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
16161616
}
1617+
1618+
// TestTxnWriteBufferRollbackToSavepoint tests the savepoint rollback logic.
1619+
func TestTxnWriteBufferRollbackToSavepoint(t *testing.T) {
1620+
defer leaktest.AfterTest(t)()
1621+
defer log.Scope(t).Close(t)
1622+
ctx := context.Background()
1623+
twb, mockSender := makeMockTxnWriteBuffer(cluster.MakeClusterSettings())
1624+
1625+
txn := makeTxnProto()
1626+
txn.Sequence = 10
1627+
keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")
1628+
valA, valA2, valB := "valA", "valA2", "valB"
1629+
1630+
ba := &kvpb.BatchRequest{}
1631+
ba.Header = kvpb.Header{Txn: &txn}
1632+
putA := putArgs(keyA, valA, txn.Sequence)
1633+
txn.Sequence++
1634+
delC := delArgs(keyC, txn.Sequence)
1635+
ba.Add(putA)
1636+
ba.Add(delC)
1637+
1638+
numCalled := mockSender.NumCalled()
1639+
br, pErr := twb.SendLocked(ctx, ba)
1640+
require.Nil(t, pErr)
1641+
require.NotNil(t, br)
1642+
// All the requests should be buffered and not make it past the
1643+
// txnWriteBuffer. The response returned should be indistinguishable.
1644+
require.Equal(t, numCalled, mockSender.NumCalled())
1645+
require.Len(t, br.Responses, 2)
1646+
require.IsType(t, &kvpb.PutResponse{}, br.Responses[0].GetInner())
1647+
require.IsType(t, &kvpb.DeleteResponse{}, br.Responses[1].GetInner())
1648+
// Verify the
1649+
expBufferedWrites := []bufferedWrite{
1650+
makeBufferedWrite(keyA, makeBufferedValue("valA", 10)),
1651+
makeBufferedWrite(keyC, makeBufferedValue("", 11)),
1652+
}
1653+
require.Equal(t, expBufferedWrites, twb.testingBufferedWritesAsSlice())
1654+
1655+
// Create a savepoint. This is inclusive of the buffered Delete on keyC.
1656+
savepoint := &savepoint{seqNum: txn.Sequence}
1657+
twb.createSavepointLocked(ctx, savepoint)
1658+
1659+
// Add some new writes. A second write to keyA and a new one to keyB.
1660+
ba = &kvpb.BatchRequest{}
1661+
txn.Sequence++
1662+
putA2 := putArgs(keyA, valA2, txn.Sequence)
1663+
ba.Add(putA2)
1664+
txn.Sequence++
1665+
putB := putArgs(keyB, valB, txn.Sequence)
1666+
ba.Add(putB)
1667+
1668+
// All these writes should be buffered as well, with no KV requests sent yet.
1669+
numCalled = mockSender.NumCalled()
1670+
br, pErr = twb.SendLocked(ctx, ba)
1671+
require.Nil(t, pErr)
1672+
require.NotNil(t, br)
1673+
1674+
require.Equal(t, numCalled, mockSender.NumCalled())
1675+
require.Len(t, br.Responses, 2)
1676+
require.IsType(t, &kvpb.PutResponse{}, br.Responses[0].GetInner())
1677+
require.IsType(t, &kvpb.PutResponse{}, br.Responses[1].GetInner())
1678+
// Verify the state of the write buffer.
1679+
expBufferedWrites = []bufferedWrite{
1680+
makeBufferedWrite(keyA, makeBufferedValue("valA", 10), makeBufferedValue("valA2", 12)),
1681+
makeBufferedWrite(keyB, makeBufferedValue("valB", 13)),
1682+
makeBufferedWrite(keyC, makeBufferedValue("", 11)),
1683+
}
1684+
require.Equal(t, expBufferedWrites, twb.testingBufferedWritesAsSlice())
1685+
1686+
// Now, Rollback to the savepoint. This should leave just one write in the
1687+
// buffer, that on keyA at seqnum 10.
1688+
twb.rollbackToSavepointLocked(ctx, *savepoint)
1689+
expBufferedWrites = []bufferedWrite{
1690+
makeBufferedWrite(keyA, makeBufferedValue("valA", 10)),
1691+
}
1692+
require.Equal(t, expBufferedWrites, twb.testingBufferedWritesAsSlice())
1693+
1694+
// Commit the transaction.
1695+
ba = &kvpb.BatchRequest{}
1696+
ba.Header = kvpb.Header{Txn: &txn}
1697+
ba.Add(&kvpb.EndTxnRequest{Commit: true})
1698+
1699+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1700+
require.Len(t, ba.Requests, 2)
1701+
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
1702+
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[1].GetInner())
1703+
1704+
br = ba.CreateReply()
1705+
br.Txn = ba.Txn
1706+
return br, nil
1707+
})
1708+
1709+
br, pErr = twb.SendLocked(ctx, ba)
1710+
require.Nil(t, pErr)
1711+
require.NotNil(t, br)
1712+
require.Len(t, br.Responses, 1)
1713+
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
1714+
}

pkg/sql/logictest/testdata/logic_test/buffered_writes

Lines changed: 99 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ INSERT INTO t1 VALUES (1,1)
7878
statement ok
7979
COMMIT
8080

81-
query II
81+
query II rowsort
8282
SELECT * FROM t1
8383
----
8484
1 1
@@ -122,7 +122,7 @@ INSERT INTO t2 VALUES (1);
122122
statement ok
123123
COMMIT;
124124

125-
query I
125+
query I rowsort
126126
SELECT * FROM t2
127127
----
128128
1
@@ -151,7 +151,103 @@ DELETE FROM t3 WHERE k < 10 AND k > 0
151151
statement ok
152152
COMMIT
153153

154-
query I
154+
query I rowsort
155155
SELECT count(*) from t3
156156
----
157157
0
158+
159+
# Test savepoints, and in particular savepoint rollbacks, with buffered writes.
160+
# We test both intermediate selects after rollbacks and the final state
161+
# the transaction has been committed.
162+
subtest savepoint_rollbacks
163+
164+
# First, create a new table with a secondary index on it. That way, the DELETE
165+
# statements below will not use DeleteRange requets which cause the buffer to
166+
# be flushed.
167+
statement ok
168+
CREATE TABLE t4 (k INT PRIMARY KEY, v INT)
169+
170+
statement ok
171+
CREATE INDEX idx ON t4 (v)
172+
173+
statement ok
174+
BEGIN;
175+
INSERT INTO t4 VALUES(1, 100), (2, 200), (3, 300);
176+
SAVEPOINT s1;
177+
INSERT INTO t4 VALUES(4, 400), (5, 500), (6, 600)
178+
179+
query II rowsort
180+
SELECT * FROM t4
181+
----
182+
1 100
183+
2 200
184+
3 300
185+
4 400
186+
5 500
187+
6 600
188+
189+
statement ok
190+
SAVEPOINT s2;
191+
INSERT INTO t4 VALUES(7, 700), (8, 800), (9, 900)
192+
193+
query II rowsort
194+
SELECT * FROM t4
195+
----
196+
1 100
197+
2 200
198+
3 300
199+
4 400
200+
5 500
201+
6 600
202+
7 700
203+
8 800
204+
9 900
205+
206+
# Throw in some Deletes.
207+
statement ok
208+
DELETE FROM t4 WHERE k = 1;
209+
DELETE FROM t4 WHERE k = 2;
210+
DELETE FROM t4 WHERE k = 3;
211+
212+
query II rowsort
213+
SELECT * FROM t4
214+
----
215+
4 400
216+
5 500
217+
6 600
218+
7 700
219+
8 800
220+
9 900
221+
222+
statement ok
223+
ROLLBACK TO SAVEPOINT s2
224+
225+
query II rowsort
226+
SELECT * FROM t4
227+
----
228+
1 100
229+
2 200
230+
3 300
231+
4 400
232+
5 500
233+
6 600
234+
235+
statement ok
236+
ROLLBACK TO SAVEPOINT s1;
237+
238+
query II rowsort
239+
SELECT * FROM t4
240+
----
241+
1 100
242+
2 200
243+
3 300
244+
245+
statement ok
246+
COMMIT
247+
248+
query II rowsort
249+
SELECT * FROM t4
250+
----
251+
1 100
252+
2 200
253+
3 300

0 commit comments

Comments
 (0)