Skip to content

Commit 32b9329

Browse files
craig[bot]stevendanna
andcommitted
Merge #148364
148364: kvserver,storage: add ExpectExclusionSince to Get r=arulajmani a=stevendanna This adds ExpectExclusionSince to Get requests. This option semantically works the same as the ExpectExclusionSince option on Put and Del requests. The implementation, however, is slightly different. It adjusts the read timestamp used by storage.MVCCGet and then configures the scanner to return an ExclusionViolationError rather than a WriteTooOld error. Informs #142977 Release note: None Co-authored-by: Steven Danna <[email protected]>
2 parents b464ea9 + b688fc6 commit 32b9329

File tree

7 files changed

+324
-11
lines changed

7 files changed

+324
-11
lines changed

pkg/kv/kvpb/api.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2530,6 +2530,13 @@ func (r *ConditionalPutRequest) Validate(_ Header) error {
25302530
return nil
25312531
}
25322532

2533+
func (r *GetRequest) Validate(_ Header) error {
2534+
if !r.ExpectExclusionSince.IsEmpty() && r.KeyLockingStrength == lock.None {
2535+
return errors.AssertionFailedf("invalid GetRequest: ExpectExclusionSince is non-empty for non-locking request")
2536+
}
2537+
return nil
2538+
}
2539+
25332540
func (r *PutRequest) Validate(bh Header) error {
25342541
if err := validateExclusionTimestampForBatch(r.ExpectExclusionSince, bh); err != nil {
25352542
return errors.NewAssertionErrorWithWrappedErrf(err, "invalid PutRequest")

pkg/kv/kvpb/api.proto

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,15 @@ message GetRequest {
223223
// LockNonExisting indicates whether the Get request should acquire a lock
224224
// on keys that don't exist.
225225
bool lock_non_existing = 5;
226+
227+
// ExpectExclusionSince, if set, indicates that the request should return an
228+
// error if this key has been written to at a timestamp at or after the given
229+
// timestamp. This allows the request to verify that a previously acquired
230+
// unreplicated lock has provided the required protection. Note that it does
231+
// not check for the existence of the lock. It checks for a write that the
232+
// lock should have prevented. If such a write is found an
233+
// ExclusionViolationError is returned.
234+
util.hlc.Timestamp expect_exclusion_since = 6 [(gogoproto.nullable) = false];
226235
}
227236

228237
// A GetResponse is the return value from the Get() method.
@@ -2965,7 +2974,7 @@ message Header {
29652974

29662975
// HasBufferedAllPrecedingWrites, if set, indicates that the batch belongs to
29672976
// a transaction that has buffered all of its writes (from preceding batches)
2968-
// on the client.
2977+
// on the client.
29692978
//
29702979
// The server may use this field to omit checking the AbortSpan. Transactions
29712980
// use the AbortSpan to check whether they've been aborted or not. If they
@@ -2980,7 +2989,7 @@ message Header {
29802989
// read-your-own-writes. As such, they can eschew checking the AbortSpan on
29812990
// the server.
29822991
bool has_buffered_all_preceding_writes = 37;
2983-
2992+
29842993
reserved 7, 10, 12, 14, 20;
29852994

29862995
// Next ID: 38

pkg/kv/kvpb/api_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,20 @@ func TestFlagCombinations(t *testing.T) {
370370
}
371371
}
372372

373+
func TestGetValidate(t *testing.T) {
374+
t.Run("ExpectExclusionSinceOnNonLockingGet", func(t *testing.T) {
375+
getReq := &GetRequest{ExpectExclusionSince: hlc.Timestamp{WallTime: 1}}
376+
require.Error(t, getReq.Validate(Header{}))
377+
})
378+
t.Run("ExpectExclusionSinceOnLockingGet", func(t *testing.T) {
379+
getReq := &GetRequest{
380+
ExpectExclusionSince: hlc.Timestamp{WallTime: 1},
381+
KeyLockingStrength: lock.Exclusive,
382+
}
383+
require.NoError(t, getReq.Validate(Header{}))
384+
})
385+
}
386+
373387
func TestRequestHeaderRoundTrip(t *testing.T) {
374388
var seq kvnemesisutil.Container
375389
seq.Set(123)

pkg/kv/kvpb/batch.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,15 @@ func (ba *BatchRequest) EarliestActiveTimestamp() hlc.Timestamp {
108108
// NB: StartTime.Next() because StartTime is exclusive.
109109
ts.Backward(t.StartTime.Next())
110110
}
111+
case *GetRequest:
112+
// A GetRequest with ExpectExclusionSince set need to be able to observe
113+
// MVCC versions from the specified time to correctly detect isolation
114+
// violations.
115+
//
116+
// See the example in RefreshRequest for more details.
117+
if !t.ExpectExclusionSince.IsEmpty() {
118+
ts.Backward(t.ExpectExclusionSince.Next())
119+
}
111120
case *PutRequest:
112121
// A PutRequest with ExpectExclusionSince set need to be able to observe MVCC
113122
// versions from the specified time to correctly detect isolation

pkg/kv/kvpb/data.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,6 @@ func PrepareTransactionForRetry(
112112
return roachpb.Transaction{}, errors.AssertionFailedf(
113113
"unexpected intent missing error (%T); should be transformed into retry error", pErr.GetDetail())
114114
case *ExclusionViolationError:
115-
// With a fixed read snapshot, we should never see this error as WriteTooOld will take priority.
116-
if !txn.IsoLevel.PerStatementReadSnapshot() {
117-
return roachpb.Transaction{}, errors.AssertionFailedf(
118-
"unexpected exclusion violation error in %s transaction: %s should be transformed into retry error",
119-
txn.IsoLevel,
120-
pErr)
121-
}
122115
// An exclusion violation error always requires a restart.
123116
txn.WriteTimestamp.Forward(tErr.RetryTimestamp())
124117
errorAlwaysRequireRestart = true

pkg/kv/kvserver/batcheval/cmd_get.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/storage"
1616
"github.com/cockroachdb/cockroach/pkg/storage/fs"
1717
"github.com/cockroachdb/cockroach/pkg/util/log"
18+
"github.com/cockroachdb/errors"
1819
)
1920

2021
func init() {
@@ -29,6 +30,10 @@ func Get(
2930
h := cArgs.Header
3031
reply := resp.(*kvpb.GetResponse)
3132

33+
if err := args.Validate(h); err != nil {
34+
return result.Result{}, err
35+
}
36+
3237
var lockTableForSkipLocked storage.LockTableView
3338
if h.WaitPolicy == lock.WaitPolicy_SkipLocked {
3439
lockTableForSkipLocked = newRequestBoundLockTableView(
@@ -37,11 +42,24 @@ func Get(
3742
defer lockTableForSkipLocked.Close()
3843
}
3944

40-
getRes, err := storage.MVCCGet(ctx, readWriter, args.Key, h.Timestamp, storage.MVCCGetOptions{
45+
readTimestamp := h.Timestamp
46+
failOnMoreRecent := args.KeyLockingStrength != lock.None
47+
expectExclusionSince := !args.ExpectExclusionSince.IsEmpty()
48+
49+
// If ExpectExclusionSince is set, use it as the read timestamp and set the
50+
// MoreRecentPolicy to ExclusionViolationErrorOnMoreRecent to ensure that an
51+
// exclusion violation error is returned if a write has occurred since the
52+
// exclusion timestamp.
53+
if expectExclusionSince {
54+
readTimestamp = args.ExpectExclusionSince
55+
failOnMoreRecent = true
56+
}
57+
58+
getRes, err := storage.MVCCGet(ctx, readWriter, args.Key, readTimestamp, storage.MVCCGetOptions{
4159
Inconsistent: h.ReadConsistency != kvpb.CONSISTENT,
4260
SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked,
4361
Txn: h.Txn,
44-
FailOnMoreRecent: args.KeyLockingStrength != lock.None,
62+
FailOnMoreRecent: failOnMoreRecent,
4563
ScanStats: cArgs.ScanStats,
4664
Uncertainty: cArgs.Uncertainty,
4765
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
@@ -54,6 +72,17 @@ func Get(
5472
ReturnRawMVCCValues: args.ReturnRawMVCCValues,
5573
})
5674
if err != nil {
75+
// If the user has set ExpectExclusionSince, transform any WriteTooOld error
76+
// into an ExclusionViolationError.
77+
if expectExclusionSince {
78+
if wtoErr := (*kvpb.WriteTooOldError)(nil); errors.As(err, &wtoErr) {
79+
err = kvpb.NewExclusionViolationError(
80+
readTimestamp,
81+
wtoErr.ActualTimestamp.Prev(),
82+
args.Key,
83+
)
84+
}
85+
}
5786
return result.Result{}, err
5887
}
5988
reply.ResumeSpan = getRes.ResumeSpan

0 commit comments

Comments
 (0)