Skip to content

Commit 988f4fa

Browse files
committed
kvcoord: only add to the buffer on the response path
This patch moves all additions to the write buffer on the response path of a request. This cleans things up for CPuts, where in the old structure, we were forced to optimistically add writes to the buffer so that subsequent requests in the batch saw them. This posed an issue whenver there was a ConditionFailedError, as a transaction is allowed to commit despite this. Closes #139055 Release note: None
1 parent 0b5639e commit 988f4fa

File tree

2 files changed

+70
-94
lines changed

2 files changed

+70
-94
lines changed

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 61 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -638,22 +638,16 @@ func (twb *txnWriteBuffer) applyTransformations(
638638
req := ru.GetInner()
639639
switch t := req.(type) {
640640
case *kvpb.ConditionalPutRequest:
641-
var resp kvpb.ResponseUnion
642-
val, served := twb.maybeServeRead(t.Key, t.Sequence)
643-
if served {
644-
// TODO(ssd): If we tracked locked information here, we could avoid the
645-
// locking Get below.
646-
log.VEventf(ctx, 2, "serving read portion of %s on key %s from the buffer", t.Method(), t.Key)
647-
resp.MustSetInner(&kvpb.GetResponse{
648-
Value: val,
649-
})
650-
}
651641
ts = append(ts, transformation{
652642
stripped: false,
653643
index: i,
654644
origRequest: req,
655-
resp: resp,
656645
})
646+
// NB: Regardless of whether there is already a buffered write on
647+
// this key or not, we need to send a locking Get to the KV layer to
648+
// acquire a lock. However, if we had knowledge of what locks the
649+
// transaction already holds, we could avoid the locking Get in some
650+
// cases.
657651
getReq := &kvpb.GetRequest{
658652
RequestHeader: kvpb.RequestHeader{
659653
Key: t.Key,
@@ -667,17 +661,15 @@ func (twb *txnWriteBuffer) applyTransformations(
667661
// Send a locking Get request to the KV layer; we'll evaluate the
668662
// condition locally based on the response.
669663
baRemote.Requests = append(baRemote.Requests, getReqU)
670-
// Buffer a Put under the optimistic assumption that the condition
671-
// will be satisfied.
672-
twb.addToBuffer(t.Key, t.Value, t.Sequence)
673664

674665
case *kvpb.PutRequest:
675-
// If the MustAcquireExclusiveLock flag is set on the Put, then we need to
676-
// add a locking Get to the BatchRequest, including if the key doesn't
677-
// exist.
666+
// If the MustAcquireExclusiveLock flag is set on the Put, then we
667+
// need to add a locking Get to the BatchRequest, including if the
668+
// key doesn't exist.
678669
if t.MustAcquireExclusiveLock {
679-
// TODO(yuzefovich,ssd): ensure that we elide the lock acquisition
680-
// whenever possible (e.g. blind UPSERT in an implicit txn).
670+
// TODO(yuzefovich,ssd): ensure that we elide the lock
671+
// acquisition whenever possible (e.g. blind UPSERT in an
672+
// implicit txn).
681673
var getReqU kvpb.RequestUnion
682674
getReqU.MustSetInner(&kvpb.GetRequest{
683675
RequestHeader: kvpb.RequestHeader{
@@ -689,30 +681,16 @@ func (twb *txnWriteBuffer) applyTransformations(
689681
})
690682
baRemote.Requests = append(baRemote.Requests, getReqU)
691683
}
692-
693-
var ru kvpb.ResponseUnion
694-
ru.MustSetInner(&kvpb.PutResponse{})
695684
ts = append(ts, transformation{
696685
stripped: !t.MustAcquireExclusiveLock,
697686
index: i,
698687
origRequest: req,
699-
resp: ru,
700688
})
701-
twb.addToBuffer(t.Key, t.Value, t.Sequence)
702689

703690
case *kvpb.DeleteRequest:
704-
// To correctly populate FoundKey in the response, we need to look in our
705-
// write buffer to see if there is a tombstone.
706-
var foundKey bool
707-
val, served := twb.maybeServeRead(t.Key, t.Sequence)
708-
if served {
709-
log.VEventf(ctx, 2, "serving read portion of %s on key %s from the buffer", t.Method(), t.Key)
710-
foundKey = val.IsPresent()
711-
}
712-
713-
// If MustAcquireExclusiveLock flag is set on the DeleteRequest, then we
714-
// need to add a locking Get to the BatchRequest, including if the key
715-
// doesn't exist.
691+
// If MustAcquireExclusiveLock flag is set on the DeleteRequest,
692+
// then we need to add a locking Get to the BatchRequest, including
693+
// if the key doesn't exist.
716694
if t.MustAcquireExclusiveLock {
717695
// TODO(ssd): ensure that we elide the lock acquisition
718696
// whenever possible.
@@ -727,28 +705,11 @@ func (twb *txnWriteBuffer) applyTransformations(
727705
})
728706
baRemote.Requests = append(baRemote.Requests, getReqU)
729707
}
730-
731-
// If we found a key in our write buffer we use that
732-
// result regardless of what the GetResponse that we
733-
// might have sent says.
734-
//
735-
// NOTE(ssd): We are assuming that callers who care
736-
// about an accurate value of FoundKey also set
737-
// MustAcquireExclusiveLock.
738-
var ru kvpb.ResponseUnion
739-
if served || !t.MustAcquireExclusiveLock {
740-
ru.MustSetInner(&kvpb.DeleteResponse{
741-
FoundKey: foundKey,
742-
})
743-
}
744-
745708
ts = append(ts, transformation{
746709
stripped: !t.MustAcquireExclusiveLock,
747710
index: i,
748711
origRequest: req,
749-
resp: ru,
750712
})
751-
twb.addToBuffer(t.Key, roachpb.Value{}, t.Sequence)
752713

753714
case *kvpb.GetRequest:
754715
// If the key is in the buffer, we must serve the read from the buffer.
@@ -1106,10 +1067,6 @@ type transformation struct {
11061067
func (t transformation) toResp(
11071068
ctx context.Context, twb *txnWriteBuffer, br kvpb.ResponseUnion, txn *roachpb.Transaction,
11081069
) (kvpb.ResponseUnion, *kvpb.Error) {
1109-
if t.stripped {
1110-
return t.resp, nil
1111-
}
1112-
11131070
var ru kvpb.ResponseUnion
11141071
switch req := t.origRequest.(type) {
11151072
case *kvpb.ConditionalPutRequest:
@@ -1119,54 +1076,76 @@ func (t transformation) toResp(
11191076
evalFn = twb.testingOverrideCPutEvalFn
11201077
}
11211078

1122-
var getResp *kvpb.GetResponse
1123-
if bufResp := t.resp.GetGet(); bufResp != nil {
1124-
// If we served the response out of the buffer, we don't care what came
1125-
// back from KV.
1126-
getResp = bufResp
1127-
} else {
1128-
getResp = br.GetInner().(*kvpb.GetResponse)
1079+
var val *roachpb.Value
1080+
var served bool
1081+
val, served = twb.maybeServeRead(req.Key, req.Sequence)
1082+
if !served {
1083+
// We only use the response from KV if there wasn't already a
1084+
// buffered value for this key that our transaction wrote
1085+
// previously.
1086+
val = br.GetInner().(*kvpb.GetResponse).Value
11291087
}
11301088

11311089
condFailedErr := evalFn(
11321090
req.ExpBytes,
1133-
getResp.Value,
1134-
getResp.Value.IsPresent(),
1091+
val,
1092+
val.IsPresent(),
11351093
req.AllowIfDoesNotExist,
11361094
)
11371095
if condFailedErr != nil {
1138-
// TODO(yuzefovich): consider "poisoning" the txnWriteBuffer when we
1139-
// hit a condition failed error to avoid mistaken usages (e.g. an
1140-
// attempt to flush with the EndTxn request with Commit=true).
11411096
pErr := kvpb.NewErrorWithTxn(condFailedErr, txn)
11421097
pErr.SetErrorIndex(int32(t.index))
11431098
return kvpb.ResponseUnion{}, pErr
11441099
}
1145-
// The condition was satisfied - return a synthesized response.
1100+
// The condition was satisfied; buffer the write and return a
1101+
// synthesized response.
11461102
ru.MustSetInner(&kvpb.ConditionalPutResponse{})
1103+
twb.addToBuffer(req.Key, req.Value, req.Sequence)
11471104

11481105
case *kvpb.PutRequest:
1149-
ru = t.resp
1106+
ru.MustSetInner(&kvpb.PutResponse{})
1107+
twb.addToBuffer(req.Key, req.Value, req.Sequence)
11501108

11511109
case *kvpb.DeleteRequest:
1152-
ru = t.resp
1153-
// If the deletion response is already set, it means we served response from
1154-
// the write buffer. We can still be here because we happened to need to
1155-
// send a GetRequest solely for the locking behaviour.
1156-
if ru.GetDelete() == nil {
1110+
// To correctly populate FoundKey in the response, we must prefer any
1111+
// buffered values (if they exist).
1112+
var foundKey bool
1113+
val, served := twb.maybeServeRead(req.Key, req.Sequence)
1114+
if served {
1115+
log.VEventf(ctx, 2, "serving read portion of %s on key %s from the buffer", req.Method(), req.Key)
1116+
foundKey = val.IsPresent()
1117+
} else if req.MustAcquireExclusiveLock {
1118+
// We sent a GetRequest to the KV layer to acquire an exclusive lock
1119+
// on the key, regardless of whether the key already exists or not.
1120+
// Populate FoundKey using the response.
11571121
getResp := br.GetInner().(*kvpb.GetResponse)
11581122
if log.ExpensiveLogEnabled(ctx, 2) {
11591123
log.Eventf(ctx, "synthesizing DeleteResponse from GetResponse: %#v", getResp)
11601124
}
1161-
ru.MustSetInner(&kvpb.DeleteResponse{
1162-
FoundKey: getResp.Value.IsPresent(),
1163-
})
1125+
foundKey = getResp.Value.IsPresent()
1126+
} else {
1127+
// NB: If MustAcquireExclusiveLock wasn't set by the client then we
1128+
// eschew sending a Get request to the KV layer just to populate
1129+
// FoundKey correctly. So we're assuming that callers who care
1130+
// whether a key is found or not also want to acquire an exclusive
1131+
// lock on it. While this is true as of the time of writing, the
1132+
// behaviour here is less than ideal.
1133+
//
1134+
// TODO(arul): improve the FoundKey semantics to have callers opt
1135+
// into whether the care about the key being found. Alternatively,
1136+
// clarify the behaviour on DeleteRequest.
1137+
foundKey = false
11641138
}
1139+
ru.MustSetInner(&kvpb.DeleteResponse{
1140+
FoundKey: foundKey,
1141+
})
1142+
twb.addToBuffer(req.Key, roachpb.Value{}, req.Sequence)
1143+
11651144
case *kvpb.GetRequest:
11661145
// Get requests must be served from the local buffer if a transaction
1167-
// performed a previous write to the key being read. However, Get requests
1168-
// must be sent to the KV layer (i.e. not be stripped) iff they are locking
1169-
// in nature.
1146+
// performed a previous write to the key being read. However, Get
1147+
// requests must be sent to the KV layer (i.e. not be stripped) iff they
1148+
// are locking in nature.
11701149
assertTrue(t.stripped == (req.KeyLockingStrength == lock.None),
11711150
"Get requests should either be stripped or be locking")
11721151
ru = t.resp

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -461,20 +461,17 @@ func TestTxnWriteBufferCorrectlyAdjustsErrorsAfterBuffering(t *testing.T) {
461461
require.NotNil(t, pErr.Index)
462462
require.Equal(t, resErrIdx, pErr.Index.Index)
463463

464-
// Finish off the test by commiting the transaction and sanity checking the
465-
// buffer is flushed as expected.
464+
// The batch we sent encountered an error; nothing should have been
465+
// buffered.
466+
require.Empty(t, twb.testingBufferedWritesAsSlice())
467+
468+
// Don't commit transactions that have encountered an error.
466469
ba = &kvpb.BatchRequest{}
467470
ba.Header = kvpb.Header{Txn: &txn}
468-
ba.Add(&kvpb.EndTxnRequest{Commit: true})
471+
ba.Add(&kvpb.EndTxnRequest{Commit: false})
469472

470473
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
471-
require.Len(t, ba.Requests, 4)
472-
473-
// We now expect the buffer to be flushed along with the commit.
474-
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
475-
require.IsType(t, &kvpb.DeleteRequest{}, ba.Requests[1].GetInner())
476-
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[2].GetInner())
477-
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[3].GetInner())
474+
require.Len(t, ba.Requests, 1)
478475

479476
br = ba.CreateReply()
480477
br.Txn = ba.Txn
@@ -1076,10 +1073,10 @@ func TestTxnWriteBufferLockingGetRequests(t *testing.T) {
10761073
func TestTxnWriteBufferDecomposesConditionalPuts(t *testing.T) {
10771074
defer leaktest.AfterTest(t)()
10781075
defer log.Scope(t).Close(t)
1079-
ctx := context.Background()
1080-
twb, mockSender := makeMockTxnWriteBuffer(cluster.MakeClusterSettings())
10811076

10821077
testutils.RunTrueAndFalse(t, "condEvalSuccessful", func(t *testing.T, condEvalSuccessful bool) {
1078+
ctx := context.Background()
1079+
twb, mockSender := makeMockTxnWriteBuffer(cluster.MakeClusterSettings())
10831080
twb.testingOverrideCPutEvalFn = func(expBytes []byte, actVal *roachpb.Value, actValPresent bool, allowNoExisting bool) *kvpb.ConditionFailedError {
10841081
if condEvalSuccessful {
10851082
return nil

0 commit comments

Comments
 (0)