Skip to content

Commit 210b82c

Browse files
committed
kvcoord: transform replicated, locking [Reverse]Scan requests
We transform all [Reverse]Scan requests with replicated key locking durability to unreplicated locking requests. Then, any returned keys are added to the buffer so that replicated locking Get's can be issued at flush time. Scans can be unbounded in size. This poses a risk of increasing the size of our buffer substantially. To address this, we set MaxSpanRequestKeys on any scan requests based on the remaining size left in the buffer. If TargetBytes is zero, we assume the caller has verified that the scan response is likely to be small and we leave MaxSpanRequestKeys unmodified to allow parallel DistSQL requests (which aren't possible when MaxSpanRequestKeys is non-zero). Fixes #142977 Release note: None
1 parent e9631b4 commit 210b82c

File tree

2 files changed

+452
-39
lines changed

2 files changed

+452
-39
lines changed

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 198 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,13 @@ var BufferedWritesEnabled = settings.RegisterBoolSetting(
4444
settings.WithPublic,
4545
)
4646

47+
var bufferedWritesScanTransformEnabled = settings.RegisterBoolSetting(
48+
settings.ApplicationLevel,
49+
"kv.transaction.write_buffering.transformations.scans.enabled",
50+
"if enabled, locking scans and reverse scans with replicated durability are transformed to unreplicated durability",
51+
true,
52+
)
53+
4754
var bufferedWritesMaxBufferSize = settings.RegisterByteSizeSetting(
4855
settings.ApplicationLevel,
4956
"kv.transaction.write_buffering.max_buffer_size",
@@ -251,7 +258,11 @@ func (twb *txnWriteBuffer) SendLocked(
251258
// budget. If it will, we shouldn't buffer writes from the current batch,
252259
// and flush the buffer.
253260
maxSize := bufferedWritesMaxBufferSize.Get(&twb.st.SV)
254-
bufSize := twb.estimateSize(ba) + twb.bufferSize
261+
// We check if scan transforms are enabled once and use that answer until the
262+
// end of SendLocked.
263+
transformScans := bufferedWritesScanTransformEnabled.Get(&twb.st.SV)
264+
bufSize := twb.estimateSize(ba, transformScans) + twb.bufferSize
265+
255266
// NB: if bufferedWritesMaxBufferSize is set to 0 then we effectively disable
256267
// any buffer limiting.
257268
if maxSize != 0 && bufSize > maxSize {
@@ -268,7 +279,7 @@ func (twb *txnWriteBuffer) SendLocked(
268279
return nil, kvpb.NewError(err)
269280
}
270281

271-
transformedBa, rr, pErr := twb.applyTransformations(ctx, ba)
282+
transformedBa, rr, pErr := twb.applyTransformations(ctx, ba, transformScans)
272283
if pErr != nil {
273284
return nil, pErr
274285
}
@@ -429,7 +440,7 @@ func unsupportedOptionError(m kvpb.Method, option string) error {
429440

430441
// estimateSize returns a conservative estimate by which the buffer will grow in
431442
// size if the writes from the supplied batch request are buffered.
432-
func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest) int64 {
443+
func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest, transformScans bool) int64 {
433444
var scratch bufferedWrite
434445
estimate := int64(0)
435446
scratch.vals = make([]bufferedValue, 1)
@@ -478,7 +489,36 @@ func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest) int64 {
478489
if t.MustAcquireExclusiveLock {
479490
estimate += lockKeyInfoSize
480491
}
492+
case *kvpb.ScanRequest:
493+
// ScanRequest can potentially consume up to t.TargetBytes (or an
494+
// unbounded number of bytes if TargetBytes is 0). When set, TargetBytes
495+
// will typically be set much larger than the default buffer size, so if
496+
// we were to estimate the size based on TargetBytes we would always flush
497+
// the buffer. Here, we assume at least 1 key will be returned that is
498+
// about the size of the scan start boundary. We try to protect from large
499+
// buffer overflows by transforming the batch's MaxSpanRequestKeys later.
500+
shouldTransform := t.KeyLockingStrength > lock.None && t.KeyLockingDurability == lock.Replicated
501+
shouldTransform = shouldTransform && transformScans
502+
if shouldTransform {
503+
scratch.key = t.Key
504+
scratch.vals[0] = bufferedValue{
505+
seq: t.Sequence,
506+
}
507+
estimate += scratch.size() + lockKeyInfoSize
508+
}
509+
case *kvpb.ReverseScanRequest:
510+
// See the comment on the ScanRequest case for more details.
511+
shouldTransform := t.KeyLockingStrength > lock.None && t.KeyLockingDurability == lock.Replicated
512+
shouldTransform = shouldTransform && transformScans
513+
if shouldTransform {
514+
scratch.key = t.Key
515+
scratch.vals[0] = bufferedValue{
516+
seq: t.Sequence,
517+
}
518+
estimate += scratch.size() + lockKeyInfoSize
519+
}
481520
}
521+
482522
// No other request is buffered.
483523
}
484524
return estimate
@@ -751,22 +791,26 @@ func (twb *txnWriteBuffer) closeLocked() {}
751791
//
752792
// 4. Scans are always sent to the KV layer, but if the key span being scanned
753793
// overlaps with any buffered writes, then the response from the KV layer needs
754-
// to be merged with buffered writes. These are collected as requestRecords.
794+
// to be merged with buffered writes. These are collected as requestRecords. If
795+
// the Scan is a locking scan with a replicated durability, it is transformed to
796+
// an unreplicated durability and information about the lock is added to the
797+
// buffer.
755798
//
756799
// 5. ReverseScans, similar to scans, are also always sent to the KV layer and
757800
// their response needs to be merged with any buffered writes. The only
758801
// difference is the direction in which the buffer is iterated when doing the
759-
// merge. As a result, they're also collected as requestRecords.
802+
// merge. As a result, they're also collected as requestRecords. If the
803+
// ReverseScan is a locking scan with a replicated durability, it is transformed
804+
// to an unreplicated durability and information about the lock is added to the
805+
// buffer.
760806
//
761807
// 6. Conditional Puts are decomposed into a locking Get followed by a Put. The
762808
// Put is buffered locally if the condition evaluates successfully using the
763809
// Get's response. Otherwise, a ConditionFailedError is returned. We elide the
764810
// locking Get request if it can be served from the buffer (i.e if a lock of
765811
// sufficient strength has been acquired and a value has been buffered).
766-
//
767-
// TODO(arul): Augment this comment as these expand.
768812
func (twb *txnWriteBuffer) applyTransformations(
769-
ctx context.Context, ba *kvpb.BatchRequest,
813+
ctx context.Context, ba *kvpb.BatchRequest, transformScans bool,
770814
) (*kvpb.BatchRequest, requestRecords, *kvpb.Error) {
771815
baRemote := ba.ShallowCopy()
772816
// TODO(arul): We could improve performance here by pre-allocating
@@ -776,6 +820,8 @@ func (twb *txnWriteBuffer) applyTransformations(
776820
baRemote.Requests = nil
777821

778822
rr := make(requestRecords, 0, len(ba.Requests))
823+
hasTransformedLockingScan := false
824+
transformedLockingScanKeySizeEstimate := 0
779825
for i, ru := range ba.Requests {
780826
req := ru.GetInner()
781827
// Track a requestRecord for the request regardless of the type, and
@@ -887,11 +933,45 @@ func (twb *txnWriteBuffer) applyTransformations(
887933
baRemote.Requests = append(baRemote.Requests, ru)
888934
}
889935

890-
case *kvpb.ScanRequest, *kvpb.ReverseScanRequest:
936+
case *kvpb.ScanRequest:
891937
// Regardless of whether the scan overlaps with any writes in the buffer
892938
// or not, we must send the request to the KV layer. We can't know for
893939
// sure that there's nothing else to read.
894-
baRemote.Requests = append(baRemote.Requests, ru)
940+
shouldTransform := t.KeyLockingStrength > lock.None && t.KeyLockingDurability == lock.Replicated
941+
shouldTransform = shouldTransform && transformScans
942+
if shouldTransform {
943+
var scanReqU kvpb.RequestUnion
944+
scanReq := t.ShallowCopy().(*kvpb.ScanRequest)
945+
scanReq.KeyLockingDurability = lock.Unreplicated
946+
scanReqU.MustSetInner(scanReq)
947+
948+
baRemote.Requests = append(baRemote.Requests, scanReqU)
949+
record.transformed = true
950+
hasTransformedLockingScan = true
951+
transformedLockingScanKeySizeEstimate = max(transformedLockingScanKeySizeEstimate, len(scanReq.Key))
952+
} else {
953+
baRemote.Requests = append(baRemote.Requests, ru)
954+
}
955+
956+
case *kvpb.ReverseScanRequest:
957+
// Regardless of whether the reverse scan overlaps with any writes in the
958+
// buffer or not, we must send the request to the KV layer. We can't know
959+
// for sure that there's nothing else to read.
960+
shouldTransform := t.KeyLockingStrength > lock.None && t.KeyLockingDurability == lock.Replicated
961+
shouldTransform = shouldTransform && transformScans
962+
if shouldTransform {
963+
var rScanReqU kvpb.RequestUnion
964+
rScanReq := t.ShallowCopy().(*kvpb.ReverseScanRequest)
965+
rScanReq.KeyLockingDurability = lock.Unreplicated
966+
rScanReqU.MustSetInner(rScanReq)
967+
968+
baRemote.Requests = append(baRemote.Requests, rScanReqU)
969+
hasTransformedLockingScan = true
970+
transformedLockingScanKeySizeEstimate = max(transformedLockingScanKeySizeEstimate, len(rScanReq.Key))
971+
record.transformed = true
972+
} else {
973+
baRemote.Requests = append(baRemote.Requests, ru)
974+
}
895975

896976
case *kvpb.QueryLocksRequest, *kvpb.LeaseInfoRequest:
897977
// These requests don't interact with buffered writes, so we simply
@@ -903,9 +983,84 @@ func (twb *txnWriteBuffer) applyTransformations(
903983
}
904984
rr = append(rr, record)
905985
}
986+
987+
if hasTransformedLockingScan {
988+
twb.maybeMutateBatchMaxSpanRequestKeys(ctx, baRemote, transformedLockingScanKeySizeEstimate)
989+
}
990+
906991
return baRemote, rr, nil
907992
}
908993

994+
// maybeMutateBatchMaxSpanRequestKeys limits MaxSpanRequestKeys to protect from the
995+
// need to buffer an unbounded number of keys.
996+
//
997+
// SQL sets TargetBytes to 0, allowing for parallel scans, when it believes that
998+
// the constraints on the scan limits the result set to less than 10,000 rows.
999+
// This is a large number of rows, but, in order to preserve the ability for
1000+
// parallel scans, we don't limit the request keys if TargetBytes is 0.
1001+
//
1002+
// NB: This does not currently take into account the fact that other requests in
1003+
// the buffer may also consume some of the remaining buffer size. That is
1004+
// probably OK since SQL-generated transactions won't have reads and writes in
1005+
// the same batch.
1006+
func (twb *txnWriteBuffer) maybeMutateBatchMaxSpanRequestKeys(
1007+
ctx context.Context, ba *kvpb.BatchRequest, transformedLockingScanKeySizeEstimate int,
1008+
) {
1009+
if ba.TargetBytes == 0 && ba.MaxSpanRequestKeys == 0 {
1010+
log.VEventf(ctx, 2, "allowing unbounded transformed locking scan because TargetBytes=0 and MaxSpanRequestKeys=0")
1011+
return
1012+
}
1013+
1014+
// If the user has disabled a maximum buffer size, respect that.
1015+
maxSize := bufferedWritesMaxBufferSize.Get(&twb.st.SV)
1016+
if maxSize == 0 {
1017+
log.VEventf(ctx, 2, "allowing unbounded transformed locking scan because %s=0", bufferedWritesMaxBufferSize.Name())
1018+
return
1019+
}
1020+
1021+
// According to the documentation, MaxSpanRequestKeys is not supported for all
1022+
// requests. Here we check against all requests that are allowed through
1023+
// (*txnWriteBuffer).validateRequests, but we should not see most of these
1024+
// requests in a transformed batch. Further, SQL does not generate batches
1025+
// that include both reads and writes.
1026+
for _, ru := range ba.Requests {
1027+
req := ru.GetInner()
1028+
switch req.(type) {
1029+
case *kvpb.ConditionalPutRequest, *kvpb.DeleteRequest,
1030+
*kvpb.PutRequest, *kvpb.LeaseInfoRequest:
1031+
log.VEventf(ctx, 2, "allowing unbounded transformed locking scan because transformed batch contains a %s request",
1032+
req.Method())
1033+
return
1034+
case *kvpb.GetRequest, *kvpb.ScanRequest, *kvpb.ReverseScanRequest, *kvpb.QueryLocksRequest:
1035+
continue
1036+
default:
1037+
log.VEventf(ctx, 2, "allowing unbounded transformed locking scan because transformed batch contains an unexpected %s request",
1038+
req.Method())
1039+
return
1040+
}
1041+
}
1042+
1043+
var bufferRemaining int64
1044+
if twb.bufferSize > maxSize {
1045+
// Somehow the bufferSize has already grown beyond the max, perhaps because
1046+
// the setting has changed. We could bail out and flush our batch, but for
1047+
// simplicity we keep going.
1048+
const fallbackTarget = 1 << 20 // 1 MB
1049+
bufferRemaining = min(fallbackTarget, maxSize)
1050+
} else {
1051+
bufferRemaining = maxSize - twb.bufferSize
1052+
}
1053+
1054+
perKeyEstimate := lockKeyInfoSize + int64(transformedLockingScanKeySizeEstimate)
1055+
targetKeys := max(bufferRemaining/perKeyEstimate, 1)
1056+
if ba.MaxSpanRequestKeys == 0 || ba.MaxSpanRequestKeys > targetKeys {
1057+
log.VEventf(ctx, 2, "changing MaxSpanRequestKeys from %d to %d because of a transformed locking scan",
1058+
ba.MaxSpanRequestKeys,
1059+
targetKeys)
1060+
ba.MaxSpanRequestKeys = targetKeys
1061+
}
1062+
}
1063+
9091064
// seekItemForSpan returns a bufferedWrite appropriate for use with a
9101065
// write-buffer iterator. Point lookups should use a nil end key.
9111066
func (twb *txnWriteBuffer) seekItemForSpan(key, endKey roachpb.Key) *bufferedWrite {
@@ -1047,6 +1202,7 @@ func (twb *txnWriteBuffer) mergeBufferAndResp(
10471202
// larger of the two keys.
10481203
cmp = cmp * -1
10491204
}
1205+
10501206
switch cmp {
10511207
case -1:
10521208
// The key in the buffer is less than the next key in the server's
@@ -1313,18 +1469,42 @@ func (rr requestRecord) toResp(
13131469
}
13141470

13151471
case *kvpb.ScanRequest:
1316-
scanResp, err := twb.mergeWithScanResp(
1317-
rr.origRequest.(*kvpb.ScanRequest), br.GetInner().(*kvpb.ScanResponse),
1318-
)
1472+
origReq := rr.origRequest.(*kvpb.ScanRequest)
1473+
resp := br.GetInner().(*kvpb.ScanResponse)
1474+
if rr.transformed {
1475+
// We iterate over the ScanResponse here since we cannot mutate the write
1476+
// buffer while iterating over it.
1477+
respIter := newScanRespIter(origReq, resp)
1478+
for ; respIter.valid(); respIter.next() {
1479+
twb.addDurableLockedReadToBuffer(respIter.peekKey(), &bufferedDurableLockAcquisition{
1480+
str: req.KeyLockingStrength,
1481+
seq: req.Sequence,
1482+
ts: txn.ReadTimestamp,
1483+
})
1484+
}
1485+
}
1486+
scanResp, err := twb.mergeWithScanResp(origReq, resp)
13191487
if err != nil {
13201488
return kvpb.ResponseUnion{}, kvpb.NewError(err)
13211489
}
13221490
ru.MustSetInner(scanResp)
13231491

13241492
case *kvpb.ReverseScanRequest:
1325-
reverseScanResp, err := twb.mergeWithReverseScanResp(
1326-
rr.origRequest.(*kvpb.ReverseScanRequest), br.GetInner().(*kvpb.ReverseScanResponse),
1327-
)
1493+
origReq := rr.origRequest.(*kvpb.ReverseScanRequest)
1494+
resp := br.GetInner().(*kvpb.ReverseScanResponse)
1495+
if rr.transformed {
1496+
// We iterate over the ReverseScanResponse here since we cannot mutate the
1497+
// write buffer while iterating over it.
1498+
respIter := newReverseScanRespIter(origReq, resp)
1499+
for ; respIter.valid(); respIter.next() {
1500+
twb.addDurableLockedReadToBuffer(respIter.peekKey(), &bufferedDurableLockAcquisition{
1501+
str: req.KeyLockingStrength,
1502+
seq: req.Sequence,
1503+
ts: txn.ReadTimestamp,
1504+
})
1505+
}
1506+
}
1507+
reverseScanResp, err := twb.mergeWithReverseScanResp(origReq, resp)
13281508
if err != nil {
13291509
return kvpb.ResponseUnion{}, kvpb.NewError(err)
13301510
}
@@ -2174,6 +2354,8 @@ func newReverseScanRespIter(
21742354
//
21752355
// peekKey should only be called if the iterator is in valid state (i.e.
21762356
// valid() returned true).
2357+
//
2358+
// NB: Callers assume that the returned key can be retained.
21772359
func (s *respIter) peekKey() roachpb.Key {
21782360
if s.scanFormat == kvpb.KEY_VALUES {
21792361
return s.rows[s.rowsIndex].Key

0 commit comments

Comments
 (0)