Skip to content

Commit c0fbeb3

Browse files
craig[bot]arulajmanirafissYevgeniy Miretskiy
committed
108179: concurrency: use lock modes to detect conflicts during optimistic eval r=nvanbenschoten a=arulajmani This patch switches over conflict resolution performed by optimistic evaluation to use lock modes instead of ad-hoc logic. As a result of this, optimistic evaluation is able to handle shared locks. We add a test to show this. Closes cockroachdb#108142 Release note: None 108503: sql: do not evaluate AOST timestamp in session migrations r=rafiss a=rafiss fixes https://github.com/cockroachlabs/support/issues/2510 refs cockroachdb#108305 Release note (bug fix): Fixed a bug where a session migration performed by SHOW TRANSFER STATE would not handle prepared statements that used the AS OF SYSTEM TIME clause. Users who encountered this bug would see errors such as `expected 1 or 0 for number of format codes, got N`. This bug was present since v22.2.0. 108523: roachtest: Reset job load attempt when loading cdc job r=miretskiy a=miretskiy Fixes cockroachdb#108433 Release note: None Co-authored-by: Arul Ajmani <[email protected]> Co-authored-by: Rafi Shamim <[email protected]> Co-authored-by: Yevgeniy Miretskiy <[email protected]>
4 parents 1f8fa96 + 90ba1ef + 5befaa7 + fcc95cf commit c0fbeb3

File tree

6 files changed

+209
-27
lines changed

6 files changed

+209
-27
lines changed

pkg/ccl/testccl/sqlccl/show_transfer_state_test.go

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@ func TestShowTransferState(t *testing.T) {
4343
require.NoError(t, err)
4444
_, err = tenantDB.Exec("CREATE TYPE typ AS ENUM ('foo', 'bar')")
4545
require.NoError(t, err)
46+
_, err = tenantDB.Exec("CREATE TABLE tab (a INT4, b typ)")
47+
require.NoError(t, err)
48+
_, err = tenantDB.Exec("INSERT INTO tab VALUES (1, 'foo')")
49+
require.NoError(t, err)
50+
_, err = tenantDB.Exec("GRANT SELECT ON tab TO testuser")
51+
require.NoError(t, err)
4652

4753
testUserConn := tenant.SQLConnForUser(t, username.TestUser, "")
4854

@@ -90,12 +96,18 @@ func TestShowTransferState(t *testing.T) {
9096
defer func() { _ = conn.Close(ctx) }()
9197

9298
// Add a prepared statement to make sure SHOW TRANSFER STATE handles it.
93-
_, err = conn.Prepare(ctx, "prepared_stmt", "SELECT $1::INT4, 'foo'::typ WHERE 1 = 1")
99+
_, err = conn.Prepare(ctx, "prepared_stmt_const", "SELECT $1::INT4, 'foo'::typ WHERE 1 = 1")
100+
require.NoError(t, err)
101+
_, err = conn.Prepare(ctx, "prepared_stmt_aost", "SELECT a, b FROM tab AS OF SYSTEM TIME '-1us'")
94102
require.NoError(t, err)
95103

96104
var intResult int
97105
var enumResult string
98-
err = conn.QueryRow(ctx, "prepared_stmt", 1).Scan(&intResult, &enumResult)
106+
err = conn.QueryRow(ctx, "prepared_stmt_const", 1).Scan(&intResult, &enumResult)
107+
require.NoError(t, err)
108+
require.Equal(t, 1, intResult)
109+
require.Equal(t, "foo", enumResult)
110+
err = conn.QueryRow(ctx, "prepared_stmt_aost").Scan(&intResult, &enumResult)
99111
require.NoError(t, err)
100112
require.Equal(t, 1, intResult)
101113
require.Equal(t, "foo", enumResult)
@@ -171,14 +183,26 @@ func TestShowTransferState(t *testing.T) {
171183
// session.
172184
result := conn.PgConn().ExecPrepared(
173185
ctx,
174-
"prepared_stmt",
186+
"prepared_stmt_const",
175187
[][]byte{{0, 0, 0, 2}}, // binary representation of 2
176188
[]int16{1}, // paramFormats - 1 means binary
177-
[]int16{1}, // resultFormats - 1 means binary
189+
[]int16{1, 1}, // resultFormats - 1 means binary
178190
).Read()
191+
require.NoError(t, result.Err)
179192
require.Equal(t, [][][]byte{{
180193
{0, 0, 0, 2}, {0x66, 0x6f, 0x6f}, // binary representation of 2, 'foo'
181194
}}, result.Rows)
195+
result = conn.PgConn().ExecPrepared(
196+
ctx,
197+
"prepared_stmt_aost",
198+
[][]byte{}, // paramValues
199+
[]int16{}, // paramFormats
200+
[]int16{1, 1}, // resultFormats - 1 means binary
201+
).Read()
202+
require.NoError(t, result.Err)
203+
require.Equal(t, [][][]byte{{
204+
{0, 0, 0, 1}, {0x66, 0x6f, 0x6f}, // binary representation of 1, 'foo'
205+
}}, result.Rows)
182206
})
183207

184208
// Errors should be displayed as a SQL value.

pkg/cmd/roachtest/tests/cdc_bench.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,8 @@ func waitForChangefeed(
548548
) (changefeedInfo, error) {
549549
ticker := time.NewTicker(5 * time.Second)
550550
defer ticker.Stop()
551-
for attempt := 0; ; attempt++ {
551+
const maxLoadJobAttempts = 5
552+
for loadJobAttempt := 0; ; loadJobAttempt++ {
552553
select {
553554
case <-ticker.C:
554555
case <-ctx.Done():
@@ -557,9 +558,9 @@ func waitForChangefeed(
557558

558559
info, err := getChangefeedInfo(conn, jobID)
559560
if err != nil {
560-
logger.Errorf("error getting changefeed info: %v (attempt %d)", err, attempt+1)
561-
if attempt > 5 {
562-
return changefeedInfo{}, errors.Wrap(err, "failed 5 attempts to get changefeed info")
561+
logger.Errorf("error getting changefeed info: %v (attempt %d)", err, loadJobAttempt+1)
562+
if loadJobAttempt > 5 {
563+
return changefeedInfo{}, errors.Wrapf(err, "failed %d attempts to get changefeed info", maxLoadJobAttempts)
563564
}
564565
continue
565566
} else if info.errMsg != "" {
@@ -570,6 +571,7 @@ func waitForChangefeed(
570571
} else if ok {
571572
return *info, nil
572573
}
574+
loadJobAttempt = 0
573575
}
574576
}
575577

pkg/kv/kvserver/concurrency/lock_table.go

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,7 @@ func (g *lockTableGuardImpl) CheckOptimisticNoConflicts(
667667
ltRange := &lockState{key: startKey, endKey: span.EndKey}
668668
for iter.FirstOverlap(ltRange); iter.Valid(); iter.NextOverlap(ltRange) {
669669
l := iter.Cur()
670-
if !l.isNonConflictingLock(g, g.curStrength()) {
670+
if !l.isNonConflictingLock(g) {
671671
return false
672672
}
673673
}
@@ -2359,7 +2359,7 @@ func (l *lockState) claimBeforeProceeding(g *lockTableGuardImpl) {
23592359
panic("lock table bug: did not find enqueued request")
23602360
}
23612361

2362-
func (l *lockState) isNonConflictingLock(g *lockTableGuardImpl, str lock.Strength) bool {
2362+
func (l *lockState) isNonConflictingLock(g *lockTableGuardImpl) bool {
23632363
l.mu.Lock()
23642364
defer l.mu.Unlock()
23652365

@@ -2368,13 +2368,12 @@ func (l *lockState) isNonConflictingLock(g *lockTableGuardImpl, str lock.Strengt
23682368
return true
23692369
}
23702370
// Lock is not empty.
2371-
lockHolderTxn, lockHolderTS := l.getLockHolder()
2372-
if lockHolderTxn == nil {
2373-
// Transactions that have claimed the lock, but have not acquired it yet,
2374-
// are considered non-conflicting.
2375-
//
2376-
// Optimistic evaluation may call into this function with or without holding
2377-
// latches. It's worth considering both these cases separately:
2371+
if !l.isHeld() {
2372+
// If the lock is neither empty nor held it must be the case that another
2373+
// transaction has claimed the lock. Locks that have been claimed, but have
2374+
// not been acquired yet, are considered non-conflicting. Optimistic
2375+
// evaluation may call into this function with or without holding latches.
2376+
// It's worth considering both these cases separately:
23782377
//
23792378
// 1. If Optimistic evaluation is holding latches, then there cannot be a
23802379
// conflicting request that has claimed (but not acquired) the lock that is
@@ -2397,19 +2396,29 @@ func (l *lockState) isNonConflictingLock(g *lockTableGuardImpl, str lock.Strengt
23972396
// claimed the lock will know what happened and what to do about it.
23982397
return true
23992398
}
2399+
lockHolderTxn, _ := l.getLockHolder()
24002400
if g.isSameTxn(lockHolderTxn) {
2401-
// Already locked by this txn.
2401+
// NB: Unlike the pessimistic (normal) evaluation code path, we do not need
2402+
// to check the lock's strength if it is already held by this transaction --
2403+
// it's non-conflicting. There's two cases to consider:
2404+
//
2405+
// 1. If the lock is held with the same/higher lock strength on this key
2406+
// then this optimistic evaluation attempt already has all the protection it
2407+
// needs.
2408+
//
2409+
// 2. If the lock is held with a weaker lock strength other transactions may
2410+
// be able to acquire a lock on this key that conflicts with this optimistic
2411+
// evaluation attempt. This is okay, as we'll detect such cases -- however,
2412+
// the weaker lock in itself is not conflicting with the optimistic
2413+
// evaluation attempt.
24022414
return true
24032415
}
2416+
24042417
// NB: We do not look at the txnStatusCache in this optimistic evaluation
24052418
// path. A conflict with a finalized txn will be noticed when retrying
24062419
// pessimistically.
24072420

2408-
if str == lock.None && g.ts.Less(lockHolderTS) {
2409-
return true
2410-
}
2411-
// Conflicts.
2412-
return false
2421+
return !lock.Conflicts(l.getLockMode(), g.curLockMode(), &g.lt.settings.SV) // non-conflicting
24132422
}
24142423

24152424
// Acquires this lock. Any requests that are waiting in the lock's wait queues

pkg/kv/kvserver/concurrency/testdata/lock_table/optimistic

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,3 +132,125 @@ num=2
132132
holder: txn: 00000000-0000-0000-0000-000000000001 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Exclusive seq: 0)]
133133
lock: "g"
134134
holder: txn: 00000000-0000-0000-0000-000000000001 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Exclusive seq: 0)]
135+
136+
137+
# ------------------------------------------------------------------------------
138+
# Test that optimistic evaluation succeeds if the lock is held by our own
139+
# transaction, regardless of lock strengths.
140+
# ------------------------------------------------------------------------------
141+
142+
clear
143+
----
144+
num=0
145+
146+
new-request r=req5 txn=txn1 ts=10,1 spans=shared@a
147+
----
148+
149+
scan r=req5
150+
----
151+
start-waiting: false
152+
153+
should-wait r=req5
154+
----
155+
false
156+
157+
acquire r=req5 k=a durability=u strength=shared
158+
----
159+
num=1
160+
lock: "a"
161+
holder: txn: 00000000-0000-0000-0000-000000000001 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Shared seq: 0)]
162+
163+
# Ensure a optimistic evaluation attempt from the same transaction that covers
164+
# key "a" succeeds -- both with lower and higher lock strengths than the
165+
# strength of the lock already held (shared).
166+
167+
new-request r=req6 txn=txn1 ts=10,1 spans=exclusive@a,c
168+
----
169+
170+
scan-opt r=req6
171+
----
172+
start-waiting: false
173+
174+
should-wait r=req6
175+
----
176+
false
177+
178+
check-opt-no-conflicts r=req6 spans=exclusive@a,c
179+
----
180+
no-conflicts: true
181+
182+
dequeue r=req6
183+
----
184+
num=1
185+
lock: "a"
186+
holder: txn: 00000000-0000-0000-0000-000000000001 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Shared seq: 0)]
187+
188+
new-request r=req7 txn=txn1 ts=10,1 spans=none@a,c
189+
----
190+
191+
scan-opt r=req7
192+
----
193+
start-waiting: false
194+
195+
should-wait r=req7
196+
----
197+
false
198+
199+
check-opt-no-conflicts r=req7 spans=none@a,c
200+
----
201+
no-conflicts: true
202+
203+
dequeue r=req7
204+
----
205+
num=1
206+
lock: "a"
207+
holder: txn: 00000000-0000-0000-0000-000000000001 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Shared seq: 0)]
208+
209+
# ------------------------------------------------------------------------------
210+
# Test that optimistic evaluation works with SHARED locking strength -- if a
211+
# shared lock is held, another transaction should be able to perform optimistic
212+
# evaluation with shared locking strength and not conflict; optimistic evaluation
213+
# should conflict if run with exclusive lock strength.
214+
# ------------------------------------------------------------------------------
215+
216+
new-request r=req8 txn=txn2 ts=10,1 spans=none@a,c
217+
----
218+
219+
scan-opt r=req8
220+
----
221+
start-waiting: false
222+
223+
should-wait r=req8
224+
----
225+
false
226+
227+
check-opt-no-conflicts r=req8 spans=shared@a,c
228+
----
229+
no-conflicts: true
230+
231+
dequeue r=req8
232+
----
233+
num=1
234+
lock: "a"
235+
holder: txn: 00000000-0000-0000-0000-000000000001 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Shared seq: 0)]
236+
237+
new-request r=req9 txn=txn2 ts=10,1 spans=exclusive@a,c
238+
----
239+
240+
scan-opt r=req9
241+
----
242+
start-waiting: false
243+
244+
should-wait r=req9
245+
----
246+
false
247+
248+
check-opt-no-conflicts r=req9 spans=exclusive@a,c
249+
----
250+
no-conflicts: false
251+
252+
dequeue r=req9
253+
----
254+
num=1
255+
lock: "a"
256+
holder: txn: 00000000-0000-0000-0000-000000000001 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Shared seq: 0)]

pkg/sql/conn_executor_prepare.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -288,8 +288,12 @@ func (ex *connExecutor) prepare(
288288
}
289289

290290
// Use the existing transaction.
291-
if err := prepare(ctx, ex.state.mu.txn); err != nil && origin != PreparedStatementOriginSessionMigration {
292-
return nil, err
291+
if err := prepare(ctx, ex.state.mu.txn); err != nil {
292+
if origin != PreparedStatementOriginSessionMigration {
293+
return nil, err
294+
} else {
295+
log.Warningf(ctx, "could not prepare statement during session migration: %v", err)
296+
}
293297
}
294298

295299
// Account for the memory used by this prepared statement.
@@ -320,8 +324,15 @@ func (ex *connExecutor) populatePrepared(
320324
return 0, err
321325
}
322326
p.extendedEvalCtx.PrepareOnly = true
323-
if err := ex.handleAOST(ctx, p.stmt.AST); err != nil {
324-
return 0, err
327+
// If the statement is being prepared by a session migration, then we should
328+
// not evaluate the AS OF SYSTEM TIME timestamp. During session migration,
329+
// there is no way for the statement being prepared to be executed in this
330+
// transaction, so there's no need to fix the timestamp, unlike how we must
331+
// for pgwire- or SQL-level prepared statements.
332+
if origin != PreparedStatementOriginSessionMigration {
333+
if err := ex.handleAOST(ctx, p.stmt.AST); err != nil {
334+
return 0, err
335+
}
325336
}
326337

327338
// PREPARE has a limited subset of statements it can be run with. Postgres

pkg/sql/testdata/session_migration/prepared_statements

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ wire_prepare s4
3333
INSERT INTO t2 VALUES($1, $2)
3434
----
3535

36+
# Regression test for transferring statements with AOST.
37+
wire_prepare s5
38+
SELECT a, b FROM t2 AS OF SYSTEM TIME '-2us'
39+
----
40+
3641
wire_prepare s_empty
3742
;
3843
----
@@ -102,6 +107,15 @@ SELECT * FROM t2
102107
----
103108
1 cat
104109

110+
query
111+
SELECT pg_sleep(0.1)
112+
----
113+
true
114+
115+
wire_query s5
116+
----
117+
1 cat
118+
105119
reset
106120
----
107121

0 commit comments

Comments
 (0)