Skip to content

Commit e66f38b

Browse files
committed
kvcoord: avoid unnecessary flushing in txnWriteBuffer
Previously, if the buffer was being flushed midway through a transaction, we were flushing the entire buffer to ensure that future savepoint rollbacks would result in the correct final value being committed. This was pessimistic, since many transactions don't use explicit savepoints. Here, we track whether an explicit savepoint has been created and only flush those values required to rollback the earliest active savepoint. Epic: none Release note: None
1 parent 8a1562c commit e66f38b

11 files changed

+376
-10
lines changed

pkg/kv/kvclient/kvcoord/txn_coord_sender.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,10 @@ type txnInterceptor interface {
217217
// createSavepointLocked().
218218
rollbackToSavepointLocked(context.Context, savepoint)
219219

220+
// releaseSavepointLocked is called when a savepoint is being
221+
// released.
222+
releaseSavepointLocked(context.Context, *savepoint)
223+
220224
// closeLocked closes the interceptor. It is called when the TxnCoordSender
221225
// shuts down due to either a txn commit or a txn abort. The method will
222226
// be called exactly once from cleanupTxnLocked.

pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,15 @@ func (tc *TxnCoordSender) ReleaseSavepoint(ctx context.Context, s kv.SavepointTo
187187
}
188188

189189
sp := s.(*savepoint)
190-
return tc.checkSavepointLocked(sp, "release")
190+
if err := tc.checkSavepointLocked(sp, "release"); err != nil {
191+
return err
192+
}
193+
194+
for _, reqInt := range tc.interceptorStack {
195+
reqInt.releaseSavepointLocked(ctx, sp)
196+
}
197+
198+
return nil
191199
}
192200

193201
// CanUseSavepoint is part of the kv.TxnSender interface.

pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,9 @@ func (tc *txnCommitter) epochBumpedLocked() {}
626626
// createSavepointLocked is part of the txnInterceptor interface.
627627
func (*txnCommitter) createSavepointLocked(context.Context, *savepoint) {}
628628

629+
// releaseSavepointLocked is part of the txnInterceptor interface.
630+
func (*txnCommitter) releaseSavepointLocked(context.Context, *savepoint) {}
631+
629632
// rollbackToSavepointLocked is part of the txnInterceptor interface.
630633
func (*txnCommitter) rollbackToSavepointLocked(context.Context, savepoint) {}
631634

pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,9 @@ func (h *txnHeartbeater) epochBumpedLocked() {}
302302
// createSavepointLocked is part of the txnInterceptor interface.
303303
func (*txnHeartbeater) createSavepointLocked(context.Context, *savepoint) {}
304304

305+
// releaseSavepointLocked is part of the txnInterceptor interface.
306+
func (*txnHeartbeater) releaseSavepointLocked(context.Context, *savepoint) {}
307+
305308
// rollbackToSavepointLocked is part of the txnInterceptor interface.
306309
func (*txnHeartbeater) rollbackToSavepointLocked(context.Context, savepoint) {}
307310

pkg/kv/kvclient/kvcoord/txn_interceptor_metric_recorder.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ func (*txnMetricRecorder) epochBumpedLocked() {}
7878
// createSavepointLocked is part of the txnInterceptor interface.
7979
func (*txnMetricRecorder) createSavepointLocked(context.Context, *savepoint) {}
8080

81+
// releaseSavepointLocked is part of the txnInterceptor interface.
82+
func (*txnMetricRecorder) releaseSavepointLocked(context.Context, *savepoint) {}
83+
8184
// rollbackToSavepointLocked is part of the txnInterceptor interface.
8285
func (*txnMetricRecorder) rollbackToSavepointLocked(context.Context, savepoint) {}
8386

pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -984,6 +984,9 @@ func (tp *txnPipeliner) epochBumpedLocked() {
984984
// createSavepointLocked is part of the txnInterceptor interface.
985985
func (tp *txnPipeliner) createSavepointLocked(context.Context, *savepoint) {}
986986

987+
// releaseSavepointLocked is part of the txnInterceptor interface.
988+
func (tp *txnPipeliner) releaseSavepointLocked(context.Context, *savepoint) {}
989+
987990
// rollbackToSavepointLocked is part of the txnInterceptor interface.
988991
func (tp *txnPipeliner) rollbackToSavepointLocked(ctx context.Context, s savepoint) {
989992
// Move all the writes in txnPipeliner that are not in the savepoint to the

pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,9 @@ func (s *txnSeqNumAllocator) createSavepointLocked(ctx context.Context, sp *save
219219
sp.seqNum = s.writeSeq
220220
}
221221

222+
// releaseSavepointLocked is part of the txnInterceptor interface.
223+
func (*txnSeqNumAllocator) releaseSavepointLocked(context.Context, *savepoint) {}
224+
222225
// rollbackToSavepointLocked is part of the txnInterceptor interface.
223226
func (s *txnSeqNumAllocator) rollbackToSavepointLocked(context.Context, savepoint) {
224227
// Nothing to restore. The seq nums keep increasing. The TxnCoordSender has

pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -764,6 +764,9 @@ func (sr *txnSpanRefresher) createSavepointLocked(ctx context.Context, s *savepo
764764
s.refreshInvalid = sr.refreshInvalid
765765
}
766766

767+
// releaseSavepointLocked is part of the txnInterceptor interface.
768+
func (sr *txnSpanRefresher) releaseSavepointLocked(context.Context, *savepoint) {}
769+
767770
// rollbackToSavepointLocked is part of the txnInterceptor interface.
768771
func (sr *txnSpanRefresher) rollbackToSavepointLocked(ctx context.Context, s savepoint) {
769772
if !KeepRefreshSpansOnSavepointRollback.Get(&sr.st.SV) {

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,12 @@ type txnWriteBuffer struct {
177177
// sees the next BatchRequest.
178178
flushOnNextBatch bool
179179

180+
// firstExplicitSavepointSeq tracks the lowest explicit savepoint that hasn't
181+
// been released or rolled back. If this savepoint is non-zero, then a
182+
// mid-transaction flush must flush all revisions required to roll back this
183+
// (or a later) savepoint.
184+
firstExplicitSavepointSeq enginepb.TxnSeq
185+
180186
buffer btree
181187
bufferIDAlloc uint64
182188
bufferSize int64
@@ -605,6 +611,9 @@ func (twb *txnWriteBuffer) importLeafFinalState(context.Context, *roachpb.LeafTx
605611

606612
// epochBumpedLocked implements the txnInterceptor interface.
607613
func (twb *txnWriteBuffer) epochBumpedLocked() {
614+
// Sequence numbers are reset on epoch bumps so any retained savepoint is
615+
// wrong.
616+
twb.firstExplicitSavepointSeq = 0
608617
twb.resetBuffer()
609618
}
610619

@@ -613,8 +622,26 @@ func (twb *txnWriteBuffer) resetBuffer() {
613622
twb.bufferSize = 0
614623
}
615624

625+
func (twb *txnWriteBuffer) hasActiveSavepoint() bool {
626+
return twb.firstExplicitSavepointSeq != enginepb.TxnSeq(0)
627+
}
628+
616629
// createSavepointLocked is part of the txnInterceptor interface.
617-
func (twb *txnWriteBuffer) createSavepointLocked(context.Context, *savepoint) {}
630+
func (twb *txnWriteBuffer) createSavepointLocked(ctx context.Context, sp *savepoint) {
631+
assertTrue(twb.firstExplicitSavepointSeq <= sp.seqNum,
632+
"sequence number in created savepoint lower than retained savepoint")
633+
634+
if twb.firstExplicitSavepointSeq == enginepb.TxnSeq(0) {
635+
twb.firstExplicitSavepointSeq = sp.seqNum
636+
}
637+
}
638+
639+
// rollbackToSavepointLocked is part of the txnInterceptor interface.
640+
func (twb *txnWriteBuffer) releaseSavepointLocked(ctx context.Context, sp *savepoint) {
641+
if twb.firstExplicitSavepointSeq == sp.seqNum {
642+
twb.firstExplicitSavepointSeq = 0
643+
}
644+
}
618645

619646
// rollbackToSavepointLocked is part of the txnInterceptor interface.
620647
func (twb *txnWriteBuffer) rollbackToSavepointLocked(ctx context.Context, s savepoint) {
@@ -647,6 +674,9 @@ func (twb *txnWriteBuffer) rollbackToSavepointLocked(ctx context.Context, s save
647674
for _, bw := range toDelete {
648675
twb.removeFromBuffer(bw)
649676
}
677+
if twb.firstExplicitSavepointSeq == s.seqNum {
678+
twb.firstExplicitSavepointSeq = 0
679+
}
650680
}
651681

652682
// closeLocked implements the txnInterceptor interface.
@@ -1370,6 +1400,8 @@ func (twb *txnWriteBuffer) flushBufferAndSendBatch(
13701400
twb.txnMetrics.TxnWriteBufferDisabledAfterBuffering.Inc(1)
13711401
}
13721402

1403+
midTxnFlushWithExplicitSavepoint := !hasEndTxn && twb.hasActiveSavepoint()
1404+
13731405
// Flush all buffered writes by pre-pending them to the requests being sent
13741406
// in the batch.
13751407
//
@@ -1379,8 +1411,8 @@ func (twb *txnWriteBuffer) flushBufferAndSendBatch(
13791411
it := twb.buffer.MakeIter()
13801412
numRevisionsBuffered := 0
13811413
for it.First(); it.Valid(); it.Next() {
1382-
if !hasEndTxn {
1383-
revs := it.Cur().toAllRevisionRequests()
1414+
if midTxnFlushWithExplicitSavepoint {
1415+
revs := it.Cur().toAllRevisionRequests(twb.firstExplicitSavepointSeq)
13841416
numRevisionsBuffered += len(revs)
13851417
reqs = append(reqs, revs...)
13861418
} else {
@@ -1589,13 +1621,26 @@ func (bw *bufferedWrite) toRequest() kvpb.RequestUnion {
15891621
}
15901622

15911623
// toAllRevisionRequests returns requests for all revisions of the buffered
1592-
// writes for the key. When the buffer is flushed before the end of a
1593-
// transaction, all revisions must be written to storage to ensure that a future
1594-
// savepoint rollback is properly handled.
1595-
func (bw *bufferedWrite) toAllRevisionRequests() []kvpb.RequestUnion {
1624+
// writes that need to be flushed given the minimum sequence number.
1625+
//
1626+
// When the buffer is flushed before the end of a transaction, previous
1627+
// revisions must be written to storage to ensure that a future savepoint
1628+
// rollback is properly handled.
1629+
//
1630+
// The given sequence number is the smallest sequence number associated with an
1631+
// active savepoint.
1632+
//
1633+
// A write below the the minimum sequence number can be elided if there is a
1634+
// subsequent write also below the minimum sequence number.
1635+
func (bw *bufferedWrite) toAllRevisionRequests(minSeq enginepb.TxnSeq) []kvpb.RequestUnion {
15961636
rus := make([]kvpb.RequestUnion, 0, len(bw.vals))
1597-
for _, val := range bw.vals {
1598-
rus = append(rus, val.toRequestUnion(bw.key, bw.exclusionExpectedSinceTimestamp()))
1637+
maxIdx := len(bw.vals) - 1
1638+
for i, val := range bw.vals {
1639+
nextWriteLessThanSeq := (i+1 < maxIdx) && (bw.vals[i+1].seq < minSeq)
1640+
canElideRevision := val.seq < minSeq && nextWriteLessThanSeq
1641+
if !canElideRevision {
1642+
rus = append(rus, val.toRequestUnion(bw.key, bw.exclusionExpectedSinceTimestamp()))
1643+
}
15991644
}
16001645
return rus
16011646
}

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1808,6 +1808,133 @@ func TestTxnWriteBufferRollbackToSavepoint(t *testing.T) {
18081808
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
18091809
}
18101810

1811+
// TestTxnWriteBufferRollbackToSavepointMidTxn tests the savepoint rollback
1812+
// logic in the presence of explicit savepoints.
1813+
func TestTxnWriteBufferRollbackToSavepointMidTxn(t *testing.T) {
1814+
defer leaktest.AfterTest(t)()
1815+
defer log.Scope(t).Close(t)
1816+
ctx := context.Background()
1817+
1818+
sendPut := func(t *testing.T, twb *txnWriteBuffer, mockSender *mockLockedSender, txn *roachpb.Transaction) {
1819+
txn.Sequence++
1820+
keyA := roachpb.Key("a")
1821+
valA := fmt.Sprintf("valA@%d", txn.Sequence)
1822+
putA := putArgs(keyA, valA, txn.Sequence)
1823+
1824+
ba := &kvpb.BatchRequest{}
1825+
ba.Header = kvpb.Header{Txn: txn}
1826+
ba.Add(putA)
1827+
1828+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1829+
br := ba.CreateReply()
1830+
br.Txn = ba.Txn
1831+
return br, nil
1832+
})
1833+
1834+
numCalled := mockSender.NumCalled()
1835+
br, pErr := twb.SendLocked(ctx, ba)
1836+
require.Nil(t, pErr)
1837+
require.NotNil(t, br)
1838+
require.Equal(t, numCalled, mockSender.NumCalled())
1839+
}
1840+
1841+
delRangeBatch := func(txn *roachpb.Transaction) *kvpb.BatchRequest {
1842+
txn.Sequence++
1843+
keyB := roachpb.Key("b")
1844+
keyC := roachpb.Key("c")
1845+
delRangeReq := delRangeArgs(keyB, keyC, txn.Sequence)
1846+
1847+
ba := &kvpb.BatchRequest{}
1848+
ba.Header = kvpb.Header{Txn: txn}
1849+
ba.Add(delRangeReq)
1850+
return ba
1851+
}
1852+
1853+
savepoint := func(twb *txnWriteBuffer, txn *roachpb.Transaction) *savepoint {
1854+
txn.Sequence++
1855+
savepoint := &savepoint{seqNum: txn.Sequence}
1856+
twb.createSavepointLocked(ctx, savepoint)
1857+
return savepoint
1858+
}
1859+
1860+
t.Run("flush with no savepoint sends latest", func(t *testing.T) {
1861+
twb, mockSender := makeMockTxnWriteBuffer(cluster.MakeClusterSettings())
1862+
txn := makeTxnProto()
1863+
// Send 4 requests to the buffer
1864+
sendPut(t, &twb, mockSender, &txn)
1865+
sendPut(t, &twb, mockSender, &txn)
1866+
sendPut(t, &twb, mockSender, &txn)
1867+
sendPut(t, &twb, mockSender, &txn)
1868+
ba := delRangeBatch(&txn)
1869+
1870+
// Expect 1 Put and 1 DelRange.
1871+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1872+
require.Len(t, ba.Requests, 2)
1873+
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
1874+
require.IsType(t, &kvpb.DeleteRangeRequest{}, ba.Requests[1].GetInner())
1875+
1876+
br := ba.CreateReply()
1877+
br.Txn = ba.Txn
1878+
return br, nil
1879+
})
1880+
br, pErr := twb.SendLocked(ctx, ba)
1881+
require.Nil(t, pErr)
1882+
require.NotNil(t, br)
1883+
})
1884+
1885+
t.Run("flush with savepoint still elides unnecessary writes under savepoint", func(t *testing.T) {
1886+
twb, mockSender := makeMockTxnWriteBuffer(cluster.MakeClusterSettings())
1887+
txn := makeTxnProto()
1888+
sendPut(t, &twb, mockSender, &txn) // should be elided
1889+
sendPut(t, &twb, mockSender, &txn)
1890+
_ = savepoint(&twb, &txn)
1891+
sendPut(t, &twb, mockSender, &txn)
1892+
sendPut(t, &twb, mockSender, &txn)
1893+
ba := delRangeBatch(&txn)
1894+
1895+
// Expect 3 Put and 1 DelRange.
1896+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1897+
require.Len(t, ba.Requests, 4)
1898+
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
1899+
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[1].GetInner())
1900+
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[2].GetInner())
1901+
require.IsType(t, &kvpb.DeleteRangeRequest{}, ba.Requests[3].GetInner())
1902+
1903+
br := ba.CreateReply()
1904+
br.Txn = ba.Txn
1905+
return br, nil
1906+
})
1907+
br, pErr := twb.SendLocked(ctx, ba)
1908+
require.Nil(t, pErr)
1909+
require.NotNil(t, br)
1910+
})
1911+
1912+
t.Run("flush after release of earliest savepoint only sends latest", func(t *testing.T) {
1913+
twb, mockSender := makeMockTxnWriteBuffer(cluster.MakeClusterSettings())
1914+
txn := makeTxnProto()
1915+
sendPut(t, &twb, mockSender, &txn)
1916+
sendPut(t, &twb, mockSender, &txn)
1917+
sendPut(t, &twb, mockSender, &txn)
1918+
sp := savepoint(&twb, &txn)
1919+
sendPut(t, &twb, mockSender, &txn)
1920+
twb.releaseSavepointLocked(ctx, sp)
1921+
1922+
ba := delRangeBatch(&txn)
1923+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1924+
require.Len(t, ba.Requests, 2)
1925+
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
1926+
require.IsType(t, &kvpb.DeleteRangeRequest{}, ba.Requests[1].GetInner())
1927+
1928+
br := ba.CreateReply()
1929+
br.Txn = ba.Txn
1930+
return br, nil
1931+
})
1932+
br, pErr := twb.SendLocked(ctx, ba)
1933+
require.Nil(t, pErr)
1934+
require.NotNil(t, br)
1935+
})
1936+
}
1937+
18111938
// TestTxnWriteBufferFlushesAfterDisabling verifies that the txnWriteBuffer
18121939
// flushes on the next batch after it is disabled if it buffered any writes.
18131940
func TestTxnWriteBufferFlushesAfterDisabling(t *testing.T) {

0 commit comments

Comments
 (0)