Skip to content

Commit 223fdb4

Browse files
committed
sql: enable TestUpsertFastPath with buffered writes
We're able to hit 1PC in more cases when buffered writes are enabled. Teach TestUpsertFastPath about it. Epic: none Release note: None
1 parent 0868f32 commit 223fdb4

File tree

1 file changed

+139
-118
lines changed

1 file changed

+139
-118
lines changed

pkg/sql/upsert_test.go

Lines changed: 139 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@ import (
1515

1616
"github.com/cockroachdb/cockroach/pkg/base"
1717
"github.com/cockroachdb/cockroach/pkg/keys"
18+
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
1819
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1920
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
2021
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
2122
"github.com/cockroachdb/cockroach/pkg/roachpb"
23+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2224
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
2325
"github.com/cockroachdb/cockroach/pkg/sql/mutations"
26+
"github.com/cockroachdb/cockroach/pkg/testutils"
2427
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
2528
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
2629
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
@@ -35,135 +38,153 @@ func TestUpsertFastPath(t *testing.T) {
3538
defer leaktest.AfterTest(t)()
3639
defer log.Scope(t).Close(t)
3740

38-
if mutations.MaxBatchSize(false /* forceProductionMaxBatchSize */) == 1 {
39-
// The fast path requires that the max batch size is at least 2, so
40-
// we'll skip the test.
41-
skip.UnderMetamorphic(t)
42-
}
41+
testutils.RunTrueAndFalse(t, "buffered-writes-enabled", func(t *testing.T, bufferedWritesEnabled bool) {
42+
st := cluster.MakeTestingClusterSettings()
43+
kvcoord.BufferedWritesEnabled.Override(ctx, &st.SV, bufferedWritesEnabled)
4344

44-
// This filter increments scans and endTxn for every ScanRequest and
45-
// EndTxnRequest that hits user table data.
46-
var gets uint64
47-
var scans uint64
48-
var endTxn uint64
49-
var codecValue atomic.Value
50-
filter := func(filterArgs kvserverbase.FilterArgs) *kvpb.Error {
51-
codec := codecValue.Load()
52-
if codec == nil {
53-
return nil
45+
if mutations.MaxBatchSize(false /* forceProductionMaxBatchSize */) == 1 {
46+
// The fast path requires that the max batch size is at least 2, so
47+
// we'll skip the test.
48+
skip.UnderMetamorphic(t)
5449
}
55-
if bytes.Compare(filterArgs.Req.Header().Key, bootstrap.TestingUserTableDataMin(codec.(keys.SQLCodec))) >= 0 {
56-
switch filterArgs.Req.Method() {
57-
case kvpb.Scan:
58-
atomic.AddUint64(&scans, 1)
59-
case kvpb.Get:
60-
atomic.AddUint64(&gets, 1)
61-
case kvpb.EndTxn:
62-
if filterArgs.Hdr.Txn.Status == roachpb.STAGING {
63-
// Ignore async explicit commits.
64-
return nil
50+
51+
// This filter increments scans and endTxn for every ScanRequest and
52+
// EndTxnRequest that hits user table data.
53+
var gets uint64
54+
var scans uint64
55+
var endTxn uint64
56+
var codecValue atomic.Value
57+
filter := func(filterArgs kvserverbase.FilterArgs) *kvpb.Error {
58+
codec := codecValue.Load()
59+
if codec == nil {
60+
return nil
61+
}
62+
if bytes.Compare(filterArgs.Req.Header().Key, bootstrap.TestingUserTableDataMin(codec.(keys.SQLCodec))) >= 0 {
63+
switch filterArgs.Req.Method() {
64+
case kvpb.Scan:
65+
atomic.AddUint64(&scans, 1)
66+
case kvpb.Get:
67+
atomic.AddUint64(&gets, 1)
68+
case kvpb.EndTxn:
69+
if filterArgs.Hdr.Txn.Status == roachpb.STAGING {
70+
// Ignore async explicit commits.
71+
return nil
72+
}
73+
atomic.AddUint64(&endTxn, 1)
6574
}
66-
atomic.AddUint64(&endTxn, 1)
6775
}
76+
return nil
6877
}
69-
return nil
70-
}
7178

72-
srv, conn, _ := serverutils.StartServer(t, base.TestServerArgs{
73-
Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
74-
EvalKnobs: kvserverbase.BatchEvalTestingKnobs{
75-
TestingEvalFilter: filter,
76-
},
77-
}},
78-
})
79-
defer srv.Stopper().Stop(context.Background())
80-
codecValue.Store(srv.ApplicationLayer().Codec())
81-
sqlDB := sqlutils.MakeSQLRunner(conn)
82-
sqlDB.Exec(t, `CREATE DATABASE d`)
83-
sqlDB.Exec(t, `CREATE TABLE d.kv (k INT PRIMARY KEY, v INT)`)
79+
srv, conn, _ := serverutils.StartServer(t, base.TestServerArgs{
80+
Settings: st,
81+
Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
82+
EvalKnobs: kvserverbase.BatchEvalTestingKnobs{
83+
TestingEvalFilter: filter,
84+
},
85+
}},
86+
})
87+
defer srv.Stopper().Stop(context.Background())
88+
codecValue.Store(srv.ApplicationLayer().Codec())
89+
sqlDB := sqlutils.MakeSQLRunner(conn)
90+
sqlDB.Exec(t, `CREATE DATABASE d`)
91+
sqlDB.Exec(t, `CREATE TABLE d.kv (k INT PRIMARY KEY, v INT)`)
8492

85-
// This should hit the fast path.
86-
atomic.StoreUint64(&gets, 0)
87-
atomic.StoreUint64(&scans, 0)
88-
atomic.StoreUint64(&endTxn, 0)
89-
sqlDB.Exec(t, `UPSERT INTO d.kv VALUES (1, 1)`)
90-
if s := atomic.LoadUint64(&scans); s != 0 {
91-
t.Errorf("expected no scans (the upsert fast path) but got %d", s)
92-
}
93-
if s := atomic.LoadUint64(&endTxn); s != 0 {
94-
t.Errorf("expected no end-txn (1PC) but got %d", s)
95-
}
93+
// This should hit the fast path.
94+
atomic.StoreUint64(&gets, 0)
95+
atomic.StoreUint64(&scans, 0)
96+
atomic.StoreUint64(&endTxn, 0)
97+
sqlDB.Exec(t, `UPSERT INTO d.kv VALUES (1, 1)`)
98+
if s := atomic.LoadUint64(&scans); s != 0 {
99+
t.Errorf("expected no scans (the upsert fast path) but got %d", s)
100+
}
101+
if s := atomic.LoadUint64(&endTxn); s != 0 {
102+
t.Errorf("expected no end-txn (1PC) but got %d", s)
103+
}
96104

97-
// This could hit the fast path, but doesn't right now because of #14482.
98-
atomic.StoreUint64(&gets, 0)
99-
atomic.StoreUint64(&scans, 0)
100-
atomic.StoreUint64(&endTxn, 0)
101-
sqlDB.Exec(t, `INSERT INTO d.kv VALUES (1, 1) ON CONFLICT (k) DO UPDATE SET v=excluded.v`)
102-
if s := atomic.LoadUint64(&gets); s != 1 {
103-
t.Errorf("expected 1 get (no upsert fast path) but got %d", s)
104-
}
105-
if s := atomic.LoadUint64(&scans); s != 0 {
106-
t.Errorf("expected 0 scans but got %d", s)
107-
}
108-
if s := atomic.LoadUint64(&endTxn); s != 0 {
109-
t.Errorf("expected no end-txn (1PC) but got %d", s)
110-
}
105+
// This could hit the fast path, but doesn't right now because of #14482.
106+
atomic.StoreUint64(&gets, 0)
107+
atomic.StoreUint64(&scans, 0)
108+
atomic.StoreUint64(&endTxn, 0)
109+
sqlDB.Exec(t, `INSERT INTO d.kv VALUES (1, 1) ON CONFLICT (k) DO UPDATE SET v=excluded.v`)
110+
if s := atomic.LoadUint64(&gets); s != 1 {
111+
t.Errorf("expected 1 get (no upsert fast path) but got %d", s)
112+
}
113+
if s := atomic.LoadUint64(&scans); s != 0 {
114+
t.Errorf("expected 0 scans but got %d", s)
115+
}
116+
if s := atomic.LoadUint64(&endTxn); s != 0 {
117+
t.Errorf("expected no end-txn (1PC) but got %d", s)
118+
}
111119

112-
// This should not hit the fast path because it doesn't set every column.
113-
atomic.StoreUint64(&gets, 0)
114-
atomic.StoreUint64(&scans, 0)
115-
atomic.StoreUint64(&endTxn, 0)
116-
sqlDB.Exec(t, `UPSERT INTO d.kv (k) VALUES (1)`)
117-
if s := atomic.LoadUint64(&gets); s != 1 {
118-
t.Errorf("expected 1 get (no upsert fast path) but got %d", s)
119-
}
120-
if s := atomic.LoadUint64(&scans); s != 0 {
121-
t.Errorf("expected 0 scans but got %d", s)
122-
}
123-
if s := atomic.LoadUint64(&endTxn); s != 0 {
124-
t.Errorf("expected no end-txn (1PC) but got %d", s)
125-
}
120+
// This should not hit the fast path because it doesn't set every column.
121+
atomic.StoreUint64(&gets, 0)
122+
atomic.StoreUint64(&scans, 0)
123+
atomic.StoreUint64(&endTxn, 0)
124+
sqlDB.Exec(t, `UPSERT INTO d.kv (k) VALUES (1)`)
125+
if s := atomic.LoadUint64(&gets); s != 1 {
126+
t.Errorf("expected 1 get (no upsert fast path) but got %d", s)
127+
}
128+
if s := atomic.LoadUint64(&scans); s != 0 {
129+
t.Errorf("expected 0 scans but got %d", s)
130+
}
131+
if s := atomic.LoadUint64(&endTxn); s != 0 {
132+
t.Errorf("expected no end-txn (1PC) but got %d", s)
133+
}
126134

127-
// This should hit the fast path, but won't be a 1PC because of the explicit
128-
// transaction.
129-
atomic.StoreUint64(&gets, 0)
130-
atomic.StoreUint64(&scans, 0)
131-
atomic.StoreUint64(&endTxn, 0)
132-
tx, err := conn.Begin()
133-
if err != nil {
134-
t.Fatal(err)
135-
}
136-
if _, err := tx.Exec(`UPSERT INTO d.kv VALUES (1, 1)`); err != nil {
137-
t.Fatal(err)
138-
}
139-
if err := tx.Commit(); err != nil {
140-
t.Fatal(err)
141-
}
142-
if s := atomic.LoadUint64(&gets); s != 0 {
143-
t.Errorf("expected no gets (the upsert fast path) but got %d", s)
144-
}
145-
if s := atomic.LoadUint64(&scans); s != 0 {
146-
t.Errorf("expected no scans (the upsert fast path) but got %d", s)
147-
}
148-
if s := atomic.LoadUint64(&endTxn); s != 1 {
149-
t.Errorf("expected 1 end-txn (no 1PC) but got %d", s)
150-
}
135+
// This should hit the fast path. 1PC is contingent on the use of buffered
136+
// writes because we're executing an explicit transaction.
137+
atomic.StoreUint64(&gets, 0)
138+
atomic.StoreUint64(&scans, 0)
139+
atomic.StoreUint64(&endTxn, 0)
140+
tx, err := conn.Begin()
141+
if err != nil {
142+
t.Fatal(err)
143+
}
144+
if _, err := tx.Exec(`UPSERT INTO d.kv VALUES (1, 1)`); err != nil {
145+
t.Fatal(err)
146+
}
147+
if err := tx.Commit(); err != nil {
148+
t.Fatal(err)
149+
}
151150

152-
// This should not hit the fast path because kv has a secondary index.
153-
sqlDB.Exec(t, `CREATE INDEX vidx ON d.kv (v)`)
154-
atomic.StoreUint64(&gets, 0)
155-
atomic.StoreUint64(&scans, 0)
156-
atomic.StoreUint64(&endTxn, 0)
157-
sqlDB.Exec(t, `UPSERT INTO d.kv VALUES (1, 1)`)
158-
if s := atomic.LoadUint64(&gets); s != 1 {
159-
t.Errorf("expected 1 get (no upsert fast path) but got %d", s)
160-
}
161-
if s := atomic.LoadUint64(&scans); s != 0 {
162-
t.Errorf("expected 0 scans (no upsert fast path) but got %d", s)
163-
}
164-
if s := atomic.LoadUint64(&endTxn); s != 0 {
165-
t.Errorf("expected no end-txn (1PC) but got %d", s)
166-
}
151+
expGets := uint64(0)
152+
expEndTxns := uint64(1)
153+
if bufferedWritesEnabled {
154+
// When buffered writes are enabled, the Put is decomposed into a locking
155+
// Get and a buffered Put.
156+
expGets = 1
157+
// When buffered writes are enabled, we're able to hit 1PC even though
158+
// we're executing an explicit transaction.
159+
expEndTxns = 0
160+
}
161+
162+
if s := atomic.LoadUint64(&gets); s != expGets {
163+
t.Errorf("expected %d gets (the upsert fast path) but got %d", expGets, s)
164+
}
165+
if s := atomic.LoadUint64(&scans); s != 0 {
166+
t.Errorf("expected no scans (the upsert fast path) but got %d", s)
167+
}
168+
if s := atomic.LoadUint64(&endTxn); s != expEndTxns {
169+
t.Errorf("expected %d end-txn (no 1PC) but got %d", expEndTxns, s)
170+
}
171+
172+
// This should not hit the fast path because kv has a secondary index.
173+
sqlDB.Exec(t, `CREATE INDEX vidx ON d.kv (v)`)
174+
atomic.StoreUint64(&gets, 0)
175+
atomic.StoreUint64(&scans, 0)
176+
atomic.StoreUint64(&endTxn, 0)
177+
sqlDB.Exec(t, `UPSERT INTO d.kv VALUES (1, 1)`)
178+
if s := atomic.LoadUint64(&gets); s != 1 {
179+
t.Errorf("expected 1 get (no upsert fast path) but got %d", s)
180+
}
181+
if s := atomic.LoadUint64(&scans); s != 0 {
182+
t.Errorf("expected 0 scans (no upsert fast path) but got %d", s)
183+
}
184+
if s := atomic.LoadUint64(&endTxn); s != 0 {
185+
t.Errorf("expected no end-txn (1PC) but got %d", s)
186+
}
187+
})
167188
}
168189

169190
func TestConcurrentUpsert(t *testing.T) {

0 commit comments

Comments
 (0)