Skip to content

Commit 4fbc6ec

Browse files
committed
kvserver,storage: add ExpectExclusionSince to Get
This adds ExpectExclusionSince to Get requests. This option semantically works the same as the ExpectExclusionSince option on Put and Del requests. The implementation, however, is slightly different. It adjusts the read timestamp used by storage.MVCCGet and then configures the scanner to return an ExclusionViolationError rather than a WriteTooOld error. Informs #142977 Release note: None
1 parent d70ca10 commit 4fbc6ec

File tree

11 files changed

+386
-26
lines changed

11 files changed

+386
-26
lines changed

pkg/kv/kvpb/api.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2530,6 +2530,13 @@ func (r *ConditionalPutRequest) Validate(_ Header) error {
25302530
return nil
25312531
}
25322532

2533+
func (r *GetRequest) Validate(_ Header) error {
2534+
if !r.ExpectExclusionSince.IsEmpty() && r.KeyLockingStrength == lock.None {
2535+
return errors.AssertionFailedf("invalid GetRequest: ExpectExclusionSince is non-empty for non-locking request")
2536+
}
2537+
return nil
2538+
}
2539+
25332540
func (r *PutRequest) Validate(bh Header) error {
25342541
if err := validateExclusionTimestampForBatch(r.ExpectExclusionSince, bh); err != nil {
25352542
return errors.NewAssertionErrorWithWrappedErrf(err, "invalid PutRequest")

pkg/kv/kvpb/api.proto

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,13 @@ message GetRequest {
223223
// LockNonExisting indicates whether the Get request should acquire a lock
224224
// on keys that don't exist.
225225
bool lock_non_existing = 5;
226+
227+
// ExpectExclusionSince, if set, indicates that the request should return an
228+
// error if this key has been written to at a timestamp at or after the given
229+
// timestamp. This is different from the ReadTimestamp of the transaction in
230+
// that it allows a writer to verify an unreplicated lock acquired at an
231+
// earlier timestamp has provided the required protection.
232+
util.hlc.Timestamp expect_exclusion_since = 6 [(gogoproto.nullable) = false];
226233
}
227234

228235
// A GetResponse is the return value from the Get() method.
@@ -2959,7 +2966,7 @@ message Header {
29592966

29602967
// HasBufferedAllPrecedingWrites, if set, indicates that the batch belongs to
29612968
// a transaction that has buffered all of its writes (from preceding batches)
2962-
// on the client.
2969+
// on the client.
29632970
//
29642971
// The server may use this field to omit checking the AbortSpan. Transactions
29652972
// use the AbortSpan to check whether they've been aborted or not. If they
@@ -2974,7 +2981,7 @@ message Header {
29742981
// read-your-own-writes. As such, they can eschew checking the AbortSpan on
29752982
// the server.
29762983
bool has_buffered_all_preceding_writes = 37;
2977-
2984+
29782985
reserved 7, 10, 12, 14, 20;
29792986

29802987
// Next ID: 38

pkg/kv/kvpb/api_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,20 @@ func TestFlagCombinations(t *testing.T) {
370370
}
371371
}
372372

373+
func TestGetValidate(t *testing.T) {
374+
t.Run("ExpectExclusionSinceOnNonLockingGet", func(t *testing.T) {
375+
getReq := &GetRequest{ExpectExclusionSince: hlc.Timestamp{WallTime: 1}}
376+
require.Error(t, getReq.Validate(Header{}))
377+
})
378+
t.Run("ExpectExclusionSinceOnLockingGet", func(t *testing.T) {
379+
getReq := &GetRequest{
380+
ExpectExclusionSince: hlc.Timestamp{WallTime: 1},
381+
KeyLockingStrength: lock.Exclusive,
382+
}
383+
require.NoError(t, getReq.Validate(Header{}))
384+
})
385+
}
386+
373387
func TestRequestHeaderRoundTrip(t *testing.T) {
374388
var seq kvnemesisutil.Container
375389
seq.Set(123)

pkg/kv/kvpb/batch.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,15 @@ func (ba *BatchRequest) EarliestActiveTimestamp() hlc.Timestamp {
108108
// NB: StartTime.Next() because StartTime is exclusive.
109109
ts.Backward(t.StartTime.Next())
110110
}
111+
case *GetRequest:
112+
// A GetRequest with ExpectExclusionSince set need to be able to observe
113+
// MVCC versions from the specified time to correctly detect isolation
114+
// violations.
115+
//
116+
// See the example in RefreshRequest for more details.
117+
if !t.ExpectExclusionSince.IsEmpty() {
118+
ts.Backward(t.ExpectExclusionSince.Next())
119+
}
111120
case *PutRequest:
112121
// A PutRequest with ExpectExclusionSince set need to be able to observe MVCC
113122
// versions from the specified time to correctly detect isolation

pkg/kv/kvserver/batcheval/cmd_get.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ func Get(
2929
h := cArgs.Header
3030
reply := resp.(*kvpb.GetResponse)
3131

32+
if err := args.Validate(h); err != nil {
33+
return result.Result{}, err
34+
}
35+
3236
var lockTableForSkipLocked storage.LockTableView
3337
if h.WaitPolicy == lock.WaitPolicy_SkipLocked {
3438
lockTableForSkipLocked = newRequestBoundLockTableView(
@@ -37,11 +41,25 @@ func Get(
3741
defer lockTableForSkipLocked.Close()
3842
}
3943

40-
getRes, err := storage.MVCCGet(ctx, readWriter, args.Key, h.Timestamp, storage.MVCCGetOptions{
44+
readTimestamp := h.Timestamp
45+
moreRecentPolicy := storage.IgnoreMoreRecent
46+
47+
// If ExpectExclusionSince is set, use it as the read timestamp and set the
48+
// MoreRecentPolicy to ExclusionViolationErrorOnMoreRecent to ensure that an
49+
// exclusion violation error is returned if a write has occurred since the
50+
// exclusion timestamp.
51+
if !args.ExpectExclusionSince.IsEmpty() {
52+
readTimestamp = args.ExpectExclusionSince
53+
moreRecentPolicy = storage.ExclusionViolationErrorOnMoreRecent
54+
} else if args.KeyLockingStrength != lock.None {
55+
moreRecentPolicy = storage.WriteTooOldErrorOnMoreRecent
56+
}
57+
58+
getRes, err := storage.MVCCGet(ctx, readWriter, args.Key, readTimestamp, storage.MVCCGetOptions{
4159
Inconsistent: h.ReadConsistency != kvpb.CONSISTENT,
4260
SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked,
4361
Txn: h.Txn,
44-
FailOnMoreRecent: args.KeyLockingStrength != lock.None,
62+
MoreRecentPolicy: moreRecentPolicy,
4563
ScanStats: cArgs.ScanStats,
4664
Uncertainty: cArgs.Uncertainty,
4765
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),

pkg/kv/kvserver/batcheval/cmd_get_test.go

Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,14 @@ import (
1111
"testing"
1212

1313
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
14+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
15+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
1416
"github.com/cockroachdb/cockroach/pkg/roachpb"
1517
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1618
"github.com/cockroachdb/cockroach/pkg/storage"
19+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1720
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
21+
"github.com/cockroachdb/cockroach/pkg/util/log"
1822
"github.com/stretchr/testify/require"
1923
)
2024

@@ -106,3 +110,251 @@ func TestGetResumeSpan(t *testing.T) {
106110
})
107111
}
108112
}
113+
114+
// TestExpectExclusionSince tests evaluation of the ExpectExclusionSince option
115+
// on all commands that support it.
116+
func TestExpectExclusionSince(t *testing.T) {
117+
defer leaktest.AfterTest(t)()
118+
defer log.Scope(t).Close(t)
119+
120+
ctx := context.Background()
121+
122+
var (
123+
key = roachpb.Key([]byte{'a'})
124+
value = roachpb.MakeValueFromString("woohoo")
125+
126+
clock = hlc.NewClockForTesting(nil)
127+
settings = cluster.MakeTestingClusterSettings()
128+
evalCtx = (&MockEvalCtx{Clock: clock, ClusterSettings: settings}).EvalContext()
129+
writeTS = clock.Now()
130+
)
131+
132+
putWithTxn := func(key roachpb.Key, value roachpb.Value, exclusionTS hlc.Timestamp, db storage.Engine, txn *roachpb.Transaction) error {
133+
putResp := kvpb.PutResponse{}
134+
ts := writeTS
135+
if txn != nil {
136+
ts = txn.ReadTimestamp
137+
}
138+
_, err := Put(ctx, db, CommandArgs{
139+
EvalCtx: evalCtx,
140+
Header: kvpb.Header{
141+
Txn: txn,
142+
Timestamp: ts,
143+
},
144+
Args: &kvpb.PutRequest{
145+
ExpectExclusionSince: exclusionTS,
146+
RequestHeader: kvpb.RequestHeader{Key: key},
147+
Value: value,
148+
},
149+
}, &putResp)
150+
return err
151+
}
152+
153+
getWithTxn := func(key roachpb.Key, exclusionTS hlc.Timestamp,
154+
str lock.Strength, dur lock.Durability, db storage.Engine, txn *roachpb.Transaction) error {
155+
getResp := kvpb.GetResponse{}
156+
ts := writeTS
157+
if txn != nil {
158+
ts = txn.ReadTimestamp
159+
}
160+
_, err := Get(ctx, db, CommandArgs{
161+
EvalCtx: evalCtx,
162+
Header: kvpb.Header{
163+
Txn: txn,
164+
Timestamp: ts,
165+
},
166+
Args: &kvpb.GetRequest{
167+
LockNonExisting: true,
168+
ExpectExclusionSince: exclusionTS,
169+
KeyLockingStrength: str,
170+
KeyLockingDurability: dur,
171+
RequestHeader: kvpb.RequestHeader{Key: key},
172+
},
173+
}, &getResp)
174+
return err
175+
}
176+
177+
// We'll setup each test in a new engine. Placing a write or lock on `key` at
178+
// `writeTS`.
179+
type existingWriteType string
180+
const (
181+
intentWrite existingWriteType = "intent"
182+
committedWrite existingWriteType = "committed"
183+
replicatedLock existingWriteType = "replicated-lock"
184+
)
185+
186+
var (
187+
beforeExistingTS = writeTS.Prev()
188+
equalExistingTS = writeTS
189+
afterExistingTS = writeTS.Next()
190+
)
191+
192+
type testCase struct {
193+
writeType existingWriteType
194+
exclusionTS hlc.Timestamp
195+
expectExclusionViolation bool
196+
expectLockConflictError bool
197+
}
198+
199+
setup := func(t *testing.T, writeType existingWriteType) (storage.Engine, *roachpb.Transaction) {
200+
db := storage.NewDefaultInMemForTesting()
201+
202+
var txn *roachpb.Transaction
203+
if writeType == intentWrite || writeType == replicatedLock {
204+
txn1 := roachpb.MakeTransaction("test", nil, /* baseKey */
205+
isolation.Serializable,
206+
roachpb.NormalUserPriority, writeTS, 0, 0, 0, false /* omitInRangefeeds */)
207+
txn = &txn1
208+
}
209+
if writeType == replicatedLock {
210+
require.NoError(t, getWithTxn(key, hlc.Timestamp{}, lock.Exclusive, lock.Replicated, db, txn))
211+
} else {
212+
require.NoError(t, putWithTxn(key, value, hlc.Timestamp{}, db, txn))
213+
}
214+
return db, txn
215+
}
216+
217+
// ops are the operations that support sending a ExpectExclusionSince
218+
// timestamp.
219+
ops := []struct {
220+
name string
221+
request func(key roachpb.Key, exclusionTS hlc.Timestamp, db storage.Engine, txn *roachpb.Transaction) error
222+
}{
223+
{
224+
name: "Get",
225+
request: func(key roachpb.Key, exclusionTS hlc.Timestamp, db storage.Engine, txn *roachpb.Transaction) error {
226+
return getWithTxn(key, exclusionTS, lock.Exclusive, lock.Replicated, db, txn)
227+
},
228+
},
229+
{
230+
name: "Put",
231+
request: func(key roachpb.Key, exclusionTS hlc.Timestamp, db storage.Engine, txn *roachpb.Transaction) error {
232+
return putWithTxn(key, value, exclusionTS, db, txn)
233+
},
234+
},
235+
{
236+
name: "Delete",
237+
request: func(key roachpb.Key, exclusionTS hlc.Timestamp, db storage.Engine, txn *roachpb.Transaction) error {
238+
delResp := kvpb.DeleteResponse{}
239+
_, err := Delete(ctx, db, CommandArgs{
240+
EvalCtx: evalCtx,
241+
Header: kvpb.Header{
242+
Txn: txn,
243+
Timestamp: txn.ReadTimestamp,
244+
},
245+
Args: &kvpb.DeleteRequest{
246+
ExpectExclusionSince: exclusionTS,
247+
RequestHeader: kvpb.RequestHeader{Key: key},
248+
},
249+
}, &delResp)
250+
return err
251+
},
252+
},
253+
}
254+
255+
testCases := []testCase{
256+
// If an intent write exists, it doesn't matter the timestamp it is at. We
257+
// expect a lock conflict error and then wait on whoever violated our write
258+
// exclusion.
259+
{
260+
writeType: intentWrite,
261+
exclusionTS: beforeExistingTS,
262+
expectLockConflictError: true,
263+
},
264+
{
265+
writeType: intentWrite,
266+
exclusionTS: equalExistingTS,
267+
expectLockConflictError: true,
268+
},
269+
{
270+
writeType: intentWrite,
271+
exclusionTS: afterExistingTS,
272+
expectLockConflictError: true,
273+
},
274+
275+
// Committed writes are where we expect to see a write exclusion violation
276+
// error.
277+
{
278+
writeType: committedWrite,
279+
exclusionTS: beforeExistingTS,
280+
expectExclusionViolation: true,
281+
},
282+
{
283+
writeType: committedWrite,
284+
exclusionTS: equalExistingTS,
285+
expectExclusionViolation: true,
286+
},
287+
{
288+
writeType: committedWrite,
289+
exclusionTS: afterExistingTS,
290+
},
291+
292+
// For replicatedLocks, we get a lock conflict error in all case just like
293+
// an intent write.
294+
{
295+
writeType: replicatedLock,
296+
exclusionTS: beforeExistingTS,
297+
expectLockConflictError: true,
298+
},
299+
{
300+
writeType: replicatedLock,
301+
exclusionTS: equalExistingTS,
302+
expectLockConflictError: true,
303+
},
304+
{
305+
writeType: replicatedLock,
306+
exclusionTS: afterExistingTS,
307+
expectLockConflictError: true,
308+
},
309+
}
310+
311+
for _, op := range ops {
312+
for _, tc := range testCases {
313+
exclusionTSString := "unknown"
314+
if tc.exclusionTS.Equal(equalExistingTS) {
315+
exclusionTSString = "equal"
316+
} else if tc.exclusionTS.Equal(beforeExistingTS) {
317+
exclusionTSString = "before"
318+
} else if tc.exclusionTS.Equal(afterExistingTS) {
319+
exclusionTSString = "after"
320+
}
321+
322+
for _, sameTxn := range []bool{true, false} {
323+
// Committed writes don't have an associated transaction so the sameTxn
324+
// variant doesn't make sense here.
325+
if sameTxn && tc.writeType == committedWrite {
326+
continue
327+
}
328+
329+
name := fmt.Sprintf("%s/%s/exclusion_ts=%s/sameTxn=%v",
330+
op.name, tc.writeType, exclusionTSString, sameTxn)
331+
332+
t.Run(name, func(t *testing.T) {
333+
db, txn := setup(t, tc.writeType)
334+
defer db.Close()
335+
336+
if !sameTxn {
337+
txn1 := roachpb.MakeTransaction("test", nil, /* baseKey */
338+
isolation.Serializable,
339+
roachpb.NormalUserPriority, clock.Now(), 0, 0, 0, false /* omitInRangefeeds */)
340+
txn = &txn1
341+
} else {
342+
txn.Sequence++
343+
txn.BumpReadTimestamp(clock.Now())
344+
}
345+
346+
err := op.request(key, tc.exclusionTS, db, txn)
347+
if sameTxn {
348+
require.NoError(t, err, "expected no error for write in the same txn")
349+
} else if tc.expectExclusionViolation {
350+
require.ErrorContains(t, err, "write exclusion on key")
351+
} else if tc.expectLockConflictError {
352+
require.ErrorContains(t, err, "conflicting locks on")
353+
} else {
354+
require.NoError(t, err)
355+
}
356+
})
357+
}
358+
}
359+
}
360+
}

0 commit comments

Comments
 (0)