@@ -740,34 +740,35 @@ func (twb *txnWriteBuffer) applyTransformations(
740
740
}
741
741
switch t := req .(type ) {
742
742
case * kvpb.ConditionalPutRequest :
743
- record .transformed = true
744
- // NB: Regardless of whether there is already a buffered write on
745
- // this key or not, we need to send a locking Get to the KV layer to
746
- // acquire a lock. However, if we had knowledge of what locks the
747
- // transaction already holds, we could avoid the locking Get in some
748
- // cases.
749
- getReq := & kvpb.GetRequest {
750
- RequestHeader : kvpb.RequestHeader {
751
- Key : t .Key ,
752
- Sequence : t .Sequence ,
753
- },
754
- LockNonExisting : len (t .ExpBytes ) == 0 || t .AllowIfDoesNotExist ,
755
- KeyLockingStrength : lock .Exclusive ,
743
+ _ , lockStr , isServed := twb .maybeServeRead (t .Key , t .Sequence )
744
+ // To elide the locking request, we must have both a value (to evaluate
745
+ // the condition) and a lock.
746
+ if isServed && lockExcludesWrites (lockStr ) {
747
+ record .stripped = true
748
+ } else {
749
+ record .transformed = true
750
+ getReq := & kvpb.GetRequest {
751
+ RequestHeader : kvpb.RequestHeader {
752
+ Key : t .Key ,
753
+ Sequence : t .Sequence ,
754
+ },
755
+ LockNonExisting : len (t .ExpBytes ) == 0 || t .AllowIfDoesNotExist ,
756
+ KeyLockingStrength : lock .Exclusive ,
757
+ }
758
+ var getReqU kvpb.RequestUnion
759
+ getReqU .MustSetInner (getReq )
760
+ // Send a locking Get request to the KV layer; we'll evaluate the
761
+ // condition locally based on the response.
762
+ baRemote .Requests = append (baRemote .Requests , getReqU )
756
763
}
757
- var getReqU kvpb.RequestUnion
758
- getReqU .MustSetInner (getReq )
759
- // Send a locking Get request to the KV layer; we'll evaluate the
760
- // condition locally based on the response.
761
- baRemote .Requests = append (baRemote .Requests , getReqU )
762
764
763
765
case * kvpb.PutRequest :
764
- // If the MustAcquireExclusiveLock flag is set on the Put, then we
765
- // need to add a locking Get to the BatchRequest, including if the
766
- // key doesn't exist.
767
- if t .MustAcquireExclusiveLock {
768
- // TODO(yuzefovich,ssd): ensure that we elide the lock
769
- // acquisition whenever possible (e.g. blind UPSERT in an
770
- // implicit txn).
766
+ _ , lockStr , _ := twb .maybeServeRead (t .Key , t .Sequence )
767
+ // If the MustAcquireExclusiveLock flag is set then we need to add a
768
+ // locking Get to the BatchRequest, including if the key doesn't exist. We
769
+ // can elide this locking request when we already have an existing lock.
770
+ lockRequired := t .MustAcquireExclusiveLock && ! lockExcludesWrites (lockStr )
771
+ if lockRequired {
771
772
var getReqU kvpb.RequestUnion
772
773
getReqU .MustSetInner (& kvpb.GetRequest {
773
774
RequestHeader : kvpb.RequestHeader {
@@ -779,16 +780,17 @@ func (twb *txnWriteBuffer) applyTransformations(
779
780
})
780
781
baRemote .Requests = append (baRemote .Requests , getReqU )
781
782
}
782
- record .stripped = ! t . MustAcquireExclusiveLock
783
- record .transformed = t . MustAcquireExclusiveLock
783
+ record .stripped = ! lockRequired
784
+ record .transformed = lockRequired
784
785
785
786
case * kvpb.DeleteRequest :
786
- // If MustAcquireExclusiveLock flag is set on the DeleteRequest,
787
- // then we need to add a locking Get to the BatchRequest, including
788
- // if the key doesn't exist.
789
- if t .MustAcquireExclusiveLock {
790
- // TODO(ssd): ensure that we elide the lock acquisition
791
- // whenever possible.
787
+ _ , lockStr , served := twb .maybeServeRead (t .Key , t .Sequence )
788
+ // If MustAcquireExclusiveLock flag is set on the DeleteRequest, then we
789
+ // need to add a locking Get to the BatchRequest, including if the key
790
+ // doesn't exist. We can elide this locking request when we have both a
791
+ // value (to populate the FoundKey field in the response) and a lock.
792
+ lockRequired := t .MustAcquireExclusiveLock && ! (served && lockExcludesWrites (lockStr ))
793
+ if lockRequired {
792
794
var getReqU kvpb.RequestUnion
793
795
getReqU .MustSetInner (& kvpb.GetRequest {
794
796
RequestHeader : kvpb.RequestHeader {
@@ -800,32 +802,32 @@ func (twb *txnWriteBuffer) applyTransformations(
800
802
})
801
803
baRemote .Requests = append (baRemote .Requests , getReqU )
802
804
}
803
- record .stripped = ! t . MustAcquireExclusiveLock
804
- record .transformed = t . MustAcquireExclusiveLock
805
+ record .stripped = ! lockRequired
806
+ record .transformed = lockRequired
805
807
806
808
case * kvpb.GetRequest :
807
809
// If the key is in the buffer, we must serve the read from the buffer.
808
810
// The actual serving of the read will happen on the response path though.
809
811
stripped := false
810
- _ , served := twb .maybeServeRead (t .Key , t .Sequence )
812
+ _ , lockStr , served := twb .maybeServeRead (t .Key , t .Sequence )
811
813
if served {
812
- if t .KeyLockingStrength != lock . None {
814
+ if t .KeyLockingStrength > lockStr {
813
815
// Even though the Get request must be served from the buffer, as the
814
816
// transaction performed a previous write to the key, we still need to
815
- // acquire a lock at the leaseholder. As a result, we can't strip the
816
- // request from the remote batch.
817
- //
818
- // TODO(arul): we could eschew sending this request if we knew there
819
- // was a sufficiently strong lock already present on the key.
820
- log .VEventf (ctx , 2 , "locking %s on key %s must be sent to the server" , t .Method (), t .Key )
817
+ // acquire a lock at the leaseholder because we don't have a known
818
+ // lock of a sufficient strength on this key. As a result, we can't
819
+ // strip the request from the remote batch.
820
+ log .VEventf (ctx , 2 , "%s(Locking=%s) on key %s must be sent to the server" ,
821
+ t .Method (), t .KeyLockingStrength , t .Key )
821
822
baRemote .Requests = append (baRemote .Requests , ru )
822
823
} else {
823
824
// We'll synthesize the response from the buffer on the response path;
824
- // eschew sending the request to the KV layer as we don't need to
825
- // acquire a lock .
825
+ // eschew sending the request to the KV layer since we already have a
826
+ // lock of a sufficient strength on this key .
826
827
stripped = true
827
828
log .VEventf (
828
- ctx , 2 , "non-locking %s on key %s can be fully served by the client; not sending to KV" , t .Method (), t .Key ,
829
+ ctx , 2 , "%s(Locking=%s) on key %s can be fully served by the client; not sending to KV" ,
830
+ t .Method (), t .KeyLockingStrength , t .Key ,
829
831
)
830
832
}
831
833
} else {
@@ -866,14 +868,18 @@ func (twb *txnWriteBuffer) seekItemForSpan(key, endKey roachpb.Key) *bufferedWri
866
868
// deletion tombstone on the key is present in the buffer. Additionally, a
867
869
// boolean indicating whether the read request was served or not is also
868
870
// returned.
871
+ //
872
+ // The returned locked strength is the highest lock strength known to be held at
873
+ // the given sequence number.
869
874
func (twb * txnWriteBuffer ) maybeServeRead (
870
875
key roachpb.Key , seq enginepb.TxnSeq ,
871
- ) (* roachpb.Value , bool ) {
876
+ ) (* roachpb.Value , lock. Strength , bool ) {
872
877
it := twb .buffer .MakeIter ()
873
878
seek := twb .seekItemForSpan (key , nil )
874
879
it .FirstOverlap (seek )
875
880
if it .Valid () {
876
881
bufferedVals := it .Cur ().vals
882
+ lockStr := it .Cur ().heldStr (seq )
877
883
// In the common case, we're reading the most recently buffered write. That
878
884
// is, the sequence number we're reading at is greater than or equal to the
879
885
// sequence number of the last write that was buffered. The list of buffered
@@ -885,15 +891,15 @@ func (twb *txnWriteBuffer) maybeServeRead(
885
891
// using a binary search here instead.
886
892
for i := len (bufferedVals ) - 1 ; i >= 0 ; i -- {
887
893
if seq >= bufferedVals [i ].seq {
888
- return bufferedVals [i ].valPtr (), true
894
+ return bufferedVals [i ].valPtr (), lockStr , true
889
895
}
890
896
}
891
897
// We've iterated through the buffer, but it seems like our sequence number
892
898
// is smaller than any buffered write performed by our transaction. We can't
893
899
// serve the read locally.
894
- return nil , false
900
+ return nil , lockStr , false
895
901
}
896
- return nil , false
902
+ return nil , lock . None , false
897
903
}
898
904
899
905
// mergeWithScanResp takes a ScanRequest, that was sent to the KV layer, and the
@@ -994,7 +1000,7 @@ func (twb *txnWriteBuffer) mergeBufferAndResp(
994
1000
case - 1 :
995
1001
// The key in the buffer is less than the next key in the server's
996
1002
// response, so we prefer it.
997
- val , served := twb .maybeServeRead (it .Cur ().key , respIter .seq ())
1003
+ val , _ , served := twb .maybeServeRead (it .Cur ().key , respIter .seq ())
998
1004
if served && val .IsPresent () {
999
1005
// NB: Only include a buffered value in the response if it hasn't been
1000
1006
// deleted by the transaction previously. This matches the behaviour
@@ -1007,7 +1013,7 @@ func (twb *txnWriteBuffer) mergeBufferAndResp(
1007
1013
case 0 :
1008
1014
// The key exists in the buffer. We must serve the read from the buffer,
1009
1015
// assuming it is visible to the sequence number of the request.
1010
- val , served := twb .maybeServeRead (it .Cur ().key , respIter .seq ())
1016
+ val , _ , served := twb .maybeServeRead (it .Cur ().key , respIter .seq ())
1011
1017
if served {
1012
1018
if val .IsPresent () {
1013
1019
// NB: Only include a buffered value in the response if it hasn't been
@@ -1039,7 +1045,7 @@ func (twb *txnWriteBuffer) mergeBufferAndResp(
1039
1045
respIter .next ()
1040
1046
}
1041
1047
for it .Valid () {
1042
- val , served := twb .maybeServeRead (it .Cur ().key , respIter .seq ())
1048
+ val , _ , served := twb .maybeServeRead (it .Cur ().key , respIter .seq ())
1043
1049
if served && val .IsPresent () {
1044
1050
// Like above, we'll only include the value in the response if the Scan's
1045
1051
// sequence number requires us to see it and it isn't a deletion
@@ -1130,7 +1136,7 @@ func (rr requestRecord) toResp(
1130
1136
1131
1137
var val * roachpb.Value
1132
1138
var served bool
1133
- val , served = twb .maybeServeRead (req .Key , req .Sequence )
1139
+ val , _ , served = twb .maybeServeRead (req .Key , req .Sequence )
1134
1140
if ! served {
1135
1141
// We only use the response from KV if there wasn't already a
1136
1142
// buffered value for this key that our transaction wrote
@@ -1151,7 +1157,7 @@ func (rr requestRecord) toResp(
1151
1157
}
1152
1158
1153
1159
var lei * lockAcquisition
1154
- if exclusionTimestampRequired {
1160
+ if rr . transformed && exclusionTimestampRequired {
1155
1161
lei = & lockAcquisition {
1156
1162
str : lock .Exclusive ,
1157
1163
seq : req .Sequence ,
@@ -1166,7 +1172,7 @@ func (rr requestRecord) toResp(
1166
1172
1167
1173
case * kvpb.PutRequest :
1168
1174
var lei * lockAcquisition
1169
- if req . MustAcquireExclusiveLock && exclusionTimestampRequired {
1175
+ if rr . transformed && exclusionTimestampRequired {
1170
1176
lei = & lockAcquisition {
1171
1177
str : lock .Exclusive ,
1172
1178
seq : req .Sequence ,
@@ -1180,14 +1186,13 @@ func (rr requestRecord) toResp(
1180
1186
// To correctly populate FoundKey in the response, we must prefer any
1181
1187
// buffered values (if they exist).
1182
1188
var foundKey bool
1183
- val , served := twb .maybeServeRead (req .Key , req .Sequence )
1189
+ val , _ , served := twb .maybeServeRead (req .Key , req .Sequence )
1184
1190
if served {
1185
1191
log .VEventf (ctx , 2 , "serving read portion of %s on key %s from the buffer" , req .Method (), req .Key )
1186
1192
foundKey = val .IsPresent ()
1187
- } else if req . MustAcquireExclusiveLock {
1193
+ } else if rr . transformed {
1188
1194
// We sent a GetRequest to the KV layer to acquire an exclusive lock
1189
- // on the key, regardless of whether the key already exists or not.
1190
- // Populate FoundKey using the response.
1195
+ // on the key, populate FoundKey using the response.
1191
1196
getResp := br .GetInner ().(* kvpb.GetResponse )
1192
1197
if log .ExpensiveLogEnabled (ctx , 2 ) {
1193
1198
log .Eventf (ctx , "synthesizing DeleteResponse from GetResponse: %#v" , getResp )
@@ -1208,7 +1213,7 @@ func (rr requestRecord) toResp(
1208
1213
}
1209
1214
1210
1215
var lei * lockAcquisition
1211
- if req . MustAcquireExclusiveLock && exclusionTimestampRequired {
1216
+ if rr . transformed && exclusionTimestampRequired {
1212
1217
lei = & lockAcquisition {
1213
1218
str : lock .Exclusive ,
1214
1219
seq : req .Sequence ,
@@ -1222,7 +1227,7 @@ func (rr requestRecord) toResp(
1222
1227
twb .addToBuffer (req .Key , roachpb.Value {}, req .Sequence , req .KVNemesisSeq , lei )
1223
1228
1224
1229
case * kvpb.GetRequest :
1225
- val , served := twb .maybeServeRead (req .Key , req .Sequence )
1230
+ val , _ , served := twb .maybeServeRead (req .Key , req .Sequence )
1226
1231
if served {
1227
1232
getResp := & kvpb.GetResponse {}
1228
1233
if val .IsPresent () {
@@ -1527,6 +1532,13 @@ func (bw *bufferedWrite) exclusionExpectedSinceTimestamp() hlc.Timestamp {
1527
1532
return hlc.Timestamp {}
1528
1533
}
1529
1534
1535
+ func (bw * bufferedWrite ) heldStr (seq enginepb.TxnSeq ) lock.Strength {
1536
+ if bw .lki == nil {
1537
+ return lock .None
1538
+ }
1539
+ return bw .lki .heldStr (seq )
1540
+ }
1541
+
1530
1542
func (bw * bufferedWrite ) rollbackLockInfo (seq enginepb.TxnSeq ) {
1531
1543
if bw .lki == nil {
1532
1544
return
@@ -1754,6 +1766,19 @@ func (li *lockedKeyInfo) heldGE(str lock.Strength) bool {
1754
1766
return false
1755
1767
}
1756
1768
1769
+ // heldStr returns the strongest lock held at the given sequence number.
1770
+ func (li * lockedKeyInfo ) heldStr (seq enginepb.TxnSeq ) lock.Strength {
1771
+ for _ , str := range unreplicatedHolderStrengths {
1772
+ minSeq := li.heldStrengths [heldStrengthToIndexMap [str ]]
1773
+ if minSeq != notHeldSentinel {
1774
+ if minSeq <= seq {
1775
+ return str
1776
+ }
1777
+ }
1778
+ }
1779
+ return lock .None
1780
+ }
1781
+
1757
1782
// rollbackSequence rolls back the given sequence number. It returns false if
1758
1783
// the lock is no longer held at any sequence number.
1759
1784
func (li * lockedKeyInfo ) rollbackSequence (seq enginepb.TxnSeq ) bool {
@@ -1774,6 +1799,10 @@ func (li *lockedKeyInfo) rollbackSequence(seq enginepb.TxnSeq) bool {
1774
1799
return stillHeld
1775
1800
}
1776
1801
1802
+ func lockExcludesWrites (str lock.Strength ) bool {
1803
+ return str >= lock .Shared
1804
+ }
1805
+
1777
1806
// getKey reads the key for the next KV from a slice of BatchResponses field of
1778
1807
// {,Reverse}ScanResponse. The KV is encoded in the following format:
1779
1808
//
0 commit comments