Skip to content

Commit 67a692a

Browse files
craig[bot]mw5hRaduBerinde
committed
144520: optbuilder: disable mutation of vector indexes while backfilling r=mw5h a=mw5h Merging concurrent writes on vector indexes while backfill is in progress is somewhat more complicated problem than can be solved in the 25.2 timeframe. In order to ensure data integrity, we will disable mutation on vector indexes while backfill is in progress. Informs: #144443 Release note: None 144570: sqlproxyccl/acl: use newer btree r=RaduBerinde a=RaduBerinde Use `BtreeG` in the acl watcher code. Informs: #144504 Release note: None Co-authored-by: Matt White <[email protected]> Co-authored-by: Radu Berinde <[email protected]>
3 parents 1538e9b + 44df6bf + 439c35a commit 67a692a

File tree

17 files changed

+179
-28
lines changed

17 files changed

+179
-28
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ require (
3434
github.com/golang/mock v1.6.0
3535
github.com/golang/protobuf v1.5.4
3636
github.com/golang/snappy v0.0.5-0.20231225225746-43d5d4cd4e0e
37-
github.com/google/btree v1.1.3 // indirect
37+
github.com/google/btree v1.1.3
3838
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7
3939
github.com/google/uuid v1.6.0 // indirect
4040
google.golang.org/api v0.114.0

pkg/ccl/sqlproxyccl/acl/BUILD.bazel

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ go_library(
2121
"//pkg/util/syncutil",
2222
"//pkg/util/timeutil",
2323
"@com_github_cockroachdb_errors//:errors",
24+
"@com_github_google_btree//:btree",
2425
"@com_github_pires_go_proxyproto//:go-proxyproto",
2526
"@com_github_pires_go_proxyproto//tlvparse",
26-
"@com_github_raduberinde_btree//:btree",
2727
"@in_gopkg_yaml_v2//:yaml_v2",
2828
],
2929
)
@@ -50,7 +50,6 @@ go_test(
5050
"@com_github_cockroachdb_errors//:errors",
5151
"@com_github_pires_go_proxyproto//:go-proxyproto",
5252
"@com_github_pires_go_proxyproto//tlvparse",
53-
"@com_github_raduberinde_btree//:btree",
5453
"@com_github_stretchr_testify//require",
5554
"@in_gopkg_yaml_v2//:yaml_v2",
5655
],

pkg/ccl/sqlproxyccl/acl/watcher.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@ import (
99
"context"
1010
"time"
1111

12-
"github.com/RaduBerinde/btree" // TODO(#144504): switch to the newer btree
1312
"github.com/cockroachdb/cockroach/pkg/util/log"
1413
"github.com/cockroachdb/cockroach/pkg/util/metric"
1514
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
1615
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
16+
"github.com/google/btree"
1717
)
1818

1919
// Watcher maintains a list of connections waiting for changes to the
@@ -33,7 +33,7 @@ type Watcher struct {
3333
options *aclOptions
3434

3535
// All of the listeners waiting for changes to the access control list.
36-
listeners *btree.BTree
36+
listeners *btree.BTreeG[*listener]
3737

3838
// These control whether or not a connection is allowd based on it's
3939
// ConnectionTags.
@@ -128,7 +128,7 @@ func NewWatcher(ctx context.Context, opts ...Option) (*Watcher, error) {
128128
opt(options)
129129
}
130130
w := &Watcher{
131-
listeners: btree.New(8),
131+
listeners: btree.NewG[*listener](8, func(a, b *listener) bool { return a.id < b.id }),
132132
options: options,
133133
controllers: make([]AccessController, 0),
134134
}
@@ -235,7 +235,7 @@ func (w *Watcher) addAccessController(
235235
func (w *Watcher) updateAccessController(
236236
ctx context.Context, index int, controller AccessController,
237237
) {
238-
var copy *btree.BTree
238+
var copy *btree.BTreeG[*listener]
239239
var controllers []AccessController
240240
func() {
241241
w.mu.Lock()
@@ -304,14 +304,10 @@ func (w *Watcher) removeListener(l *listener) {
304304
w.listeners.Delete(l)
305305
}
306306

307-
// Less implements the btree.Item interface for listener.
308-
func (l *listener) Less(than btree.Item) bool {
309-
return l.id < than.(*listener).id
310-
}
311-
312-
func checkListeners(ctx context.Context, listeners *btree.BTree, controllers []AccessController) {
313-
listeners.Ascend(func(i btree.Item) bool {
314-
lst := i.(*listener)
307+
func checkListeners(
308+
ctx context.Context, listeners *btree.BTreeG[*listener], controllers []AccessController,
309+
) {
310+
listeners.Ascend(func(lst *listener) bool {
315311
if err := checkConnection(ctx, lst.connection, controllers); err != nil {
316312
lst.mu.Lock()
317313
defer lst.mu.Unlock()

pkg/ccl/sqlproxyccl/acl/watcher_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"testing"
1212
"time"
1313

14-
"github.com/RaduBerinde/btree" // TODO(#144504): switch to the newer btree
1514
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
1615
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenantdirsvr"
1716
"github.com/cockroachdb/cockroach/pkg/ccl/testutilsccl"
@@ -403,8 +402,8 @@ func TestACLWatcher(t *testing.T) {
403402
require.Nil(t, err)
404403

405404
var l *listener
406-
watcher.listeners.Ascend(func(i btree.Item) bool {
407-
l = i.(*listener)
405+
watcher.listeners.Ascend(func(lst *listener) bool {
406+
l = lst
408407
return false
409408
})
410409
require.NotNil(t, l)

pkg/sql/alter_primary_key.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,15 @@ func (p *planner) AlterPrimaryKey(
5454
alterPKNode tree.AlterTableAlterPrimaryKey,
5555
alterPrimaryKeyLocalitySwap *alterPrimaryKeyLocalitySwap,
5656
) error {
57+
// Check if sql_safe_updates is enabled and the table has vector indexes
58+
if p.EvalContext().SessionData().SafeUpdates {
59+
for _, idx := range tableDesc.AllIndexes() {
60+
if idx.GetType() == idxtype.VECTOR {
61+
return pgerror.DangerousStatementf("ALTER PRIMARY KEY on a table with vector indexes will disable writes to the table while the index is being rebuilt")
62+
}
63+
}
64+
}
65+
5766
if err := paramparse.ValidateUniqueConstraintParams(
5867
alterPKNode.StorageParams,
5968
paramparse.UniqueConstraintParamContext{

pkg/sql/backfill/backfill_test.go

Lines changed: 90 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package backfill_test
77

88
import (
99
"context"
10+
"sync"
1011
"testing"
1112

1213
"github.com/cockroachdb/cockroach/pkg/base"
@@ -54,16 +55,18 @@ func TestVectorColumnAndIndexBackfill(t *testing.T) {
5455
sqlDB.Exec(t, `
5556
CREATE TABLE vectors (
5657
id INT PRIMARY KEY,
57-
vec VECTOR(3)
58+
vec VECTOR(3),
59+
data INT
5860
)
5961
`)
6062

6163
// Insert 200 rows with random vector data
6264
sqlDB.Exec(t, `
63-
INSERT INTO vectors (id, vec)
65+
INSERT INTO vectors (id, vec, data)
6466
SELECT
6567
generate_series(1, 200) as id,
66-
ARRAY[random(), random(), random()]::vector(3) as vec
68+
ARRAY[random(), random(), random()]::vector(3) as vec,
69+
generate_series(1, 200) as data
6770
`)
6871

6972
// Create a vector index on the vector column
@@ -88,3 +91,87 @@ func TestVectorColumnAndIndexBackfill(t *testing.T) {
8891
// be 200 in most cases.
8992
require.Greater(t, matchCount, 190, "Expected to find at least 190 similar vectors")
9093
}
94+
95+
func TestConcurrentOperationsDuringVectorIndexCreation(t *testing.T) {
96+
defer leaktest.AfterTest(t)()
97+
defer log.Scope(t).Close(t)
98+
99+
// Channel to block the backfill process
100+
blockBackfill := make(chan struct{})
101+
backfillBlocked := make(chan struct{})
102+
103+
ctx := context.Background()
104+
srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{
105+
Knobs: base.TestingKnobs{
106+
DistSQL: &execinfra.TestingKnobs{
107+
// Block the backfill process after the first batch
108+
RunAfterBackfillChunk: func() {
109+
backfillBlocked <- struct{}{}
110+
<-blockBackfill
111+
},
112+
},
113+
},
114+
})
115+
defer srv.Stopper().Stop(ctx)
116+
sqlDB := sqlutils.MakeSQLRunner(db)
117+
118+
// Create a table with a vector column
119+
sqlDB.Exec(t, `
120+
CREATE TABLE vectors (
121+
id INT PRIMARY KEY,
122+
vec VECTOR(3),
123+
data INT
124+
)
125+
`)
126+
127+
// Insert some initial data
128+
sqlDB.Exec(t, `
129+
INSERT INTO vectors (id, vec, data) VALUES
130+
(1, '[1, 2, 3]', 100),
131+
(2, '[4, 5, 6]', 200),
132+
(3, '[7, 8, 9]', 300)
133+
`)
134+
135+
wg := sync.WaitGroup{}
136+
wg.Add(1)
137+
138+
// Start creating the vector index in a goroutine
139+
var createIndexErr error
140+
go func() {
141+
defer wg.Done()
142+
_, createIndexErr = db.ExecContext(ctx, `CREATE VECTOR INDEX vec_idx ON vectors (vec)`)
143+
}()
144+
145+
// Wait for the backfill to be blocked
146+
<-backfillBlocked
147+
148+
// Attempt concurrent operations while the index is being created
149+
// These should fail with the appropriate error
150+
_, err := db.ExecContext(ctx, `UPDATE vectors SET vec = '[10, 11, 12]', data = 150 WHERE id = 1`)
151+
require.Error(t, err)
152+
require.Contains(t, err.Error(), "Cannot write to a vector index while it is being built")
153+
154+
_, err = db.ExecContext(ctx, `INSERT INTO vectors (id, vec, data) VALUES (4, '[13, 14, 15]', 400)`)
155+
require.Error(t, err)
156+
require.Contains(t, err.Error(), "Cannot write to a vector index while it is being built")
157+
158+
_, err = db.ExecContext(ctx, `DELETE FROM vectors WHERE id = 2`)
159+
require.Error(t, err)
160+
require.Contains(t, err.Error(), "Cannot write to a vector index while it is being built")
161+
162+
// This update should succeed since it only modifies the data column
163+
_, err = db.ExecContext(ctx, `UPDATE vectors SET data = 250 WHERE id = 2`)
164+
require.NoError(t, err, "Updating only the data column should succeed during index creation")
165+
166+
// Unblock the backfill process
167+
close(blockBackfill)
168+
169+
wg.Wait()
170+
// Wait for the index creation to complete
171+
require.NoError(t, createIndexErr)
172+
173+
// Verify the index was created successfully
174+
var id int
175+
sqlDB.QueryRow(t, `SELECT id FROM vectors@vec_idx ORDER BY vec <-> '[1, 2, 3]' LIMIT 1`).Scan(&id)
176+
require.Equal(t, 1, id)
177+
}

pkg/sql/catalog/tabledesc/index.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -464,12 +464,7 @@ func (w index) CreatedAt() time.Time {
464464
return timeutil.Unix(0, w.desc.CreatedAtNanos)
465465
}
466466

467-
// IsTemporaryIndexForBackfill returns true iff the index is
468-
// an index being used as the temporary index being used by an
469-
// in-progress index backfill.
470-
//
471-
// TODO(ssd): This could be its own boolean or we could store the ID
472-
// of the index it is a temporary index for.
467+
// IsTemporaryIndexForBackfill implements the cat.Index interface.
473468
func (w index) IsTemporaryIndexForBackfill() bool {
474469
return w.desc.UseDeletePreservingEncoding
475470
}

pkg/sql/create_index.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ func (p *planner) CreateIndex(ctx context.Context, n *tree.CreateIndex) (planNod
5757
); err != nil {
5858
return nil, err
5959
}
60+
61+
// Check if sql_safe_updates is enabled and this is a vector index
62+
if n.Type == idxtype.VECTOR && p.EvalContext().SessionData().SafeUpdates {
63+
return nil, pgerror.DangerousStatementf("CREATE VECTOR INDEX will disable writes to the table while the index is being built")
64+
}
65+
6066
_, tableDesc, err := p.ResolveMutableTableDescriptor(
6167
ctx, &n.Table, true /*required*/, tree.ResolveRequireTableOrViewDesc,
6268
)

pkg/sql/logictest/testdata/logic_test/vector_index

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,30 @@
22
# CREATE TABLE/INDEX tests.
33
# ------------------------------------------------------------------------------
44

5+
# Test guardrail for vector index creation.
6+
# TODO(mw5h): remove these two statements once online modifications are supported.
7+
statement ok
8+
SET sql_safe_updates = true
9+
510
# Simple vector index.
611
statement ok
712
CREATE TABLE simple (
813
a INT PRIMARY KEY,
14+
b INT NOT NULL,
915
vec1 VECTOR(3),
1016
VECTOR INDEX (vec1),
1117
FAMILY (a, vec1)
1218
)
1319

20+
statement error pgcode 01000 pq: rejected \(sql_safe_updates = true\): CREATE VECTOR INDEX will disable writes to the table while the index is being built
21+
CREATE VECTOR INDEX ON simple (vec1)
22+
23+
statement error pgcode 01000 pq: rejected \(sql_safe_updates = true\): ALTER PRIMARY KEY on a table with vector indexes will disable writes to the table while the index is being rebuilt
24+
ALTER TABLE simple ALTER PRIMARY KEY USING COLUMNS (b)
25+
26+
statement ok
27+
SET sql_safe_updates = false
28+
1429
statement ok
1530
CREATE VECTOR INDEX ON simple (vec1)
1631

@@ -23,12 +38,13 @@ SHOW CREATE TABLE simple
2338
----
2439
simple CREATE TABLE public.simple (
2540
a INT8 NOT NULL,
41+
b INT8 NOT NULL,
2642
vec1 VECTOR(3) NULL,
2743
CONSTRAINT simple_pkey PRIMARY KEY (a ASC),
2844
VECTOR INDEX simple_vec1_idx (vec1),
2945
VECTOR INDEX simple_vec1_idx1 (vec1),
3046
VECTOR INDEX simple_vec1_idx2 (vec1),
31-
FAMILY fam_0_a_vec1 (a, vec1)
47+
FAMILY fam_0_a_vec1_b (a, vec1, b)
3248
)
3349

3450
statement ok

pkg/sql/opt/cat/index.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,10 @@ type Index interface {
204204
// Partition returns the ith PARTITION BY LIST partition within the index
205205
// definition, where i < PartitionCount.
206206
Partition(i int) Partition
207+
208+
// IsTemporaryIndexForBackfill returns true iff the index is an index being
209+
// used as the temporary index being used by an in-progress index backfill.
210+
IsTemporaryIndexForBackfill() bool
207211
}
208212

209213
// IndexColumn describes a single column that is part of an index definition.

0 commit comments

Comments
 (0)