Skip to content

Commit 1a882ca

Browse files
craig[bot]mw5h
andcommitted
Merge #154074
154074: vecbench: add flag to control index creation timing r=mw5h a=mw5h Previously, the vecbench SQL provider would create its test table with a vector index. This patch gives us the option to retain that behavior (the default) or to import into an empty table without a vector index and then use CREATE INDEX to build the index after data is inserted. Add the --index-after flag to provide choice between two index creation workflows: - Default: Create table with index, then import data - Flag set: Create table, import data, then create index Flag validation prevents use with --memstore since this only applies to the SQL provider. Also adds progress monitoring interface with CheckIndexCreationStatus() method for tracking async index creation progress. Informs: #146691 Release note: none Co-authored-by: Matt White <[email protected]>
2 parents 62d08bf + f18cde0 commit 1a882ca

File tree

4 files changed

+156
-14
lines changed

4 files changed

+156
-14
lines changed

pkg/cmd/vecbench/main.go

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ const (
4848
)
4949

5050
const (
51+
ClearLine = "\033[2K"
5152
HideCursor = "\033[?25l"
5253
ShowCursor = "\033[?25h"
5354
)
@@ -78,6 +79,8 @@ var flagDisableAdaptiveSearch = flag.Bool(
7879
var flagMemStore = flag.Bool("memstore", false, "Use in-memory store instead of CockroachDB")
7980
var flagDBConnStr = flag.String("db", "postgresql://root@localhost:26257",
8081
"Database connection string (when not using --memstore)")
82+
var flagCreateIndexAfterImport = flag.Bool("index-after", false,
83+
"Create vector index after data import instead of during table creation (SQL provider only)")
8184

8285
// vecbench benchmarks vector index in-memory build and search performance on a
8386
// variety of datasets. Datasets are downloaded from the
@@ -125,7 +128,7 @@ func main() {
125128
// Hide the cursor, but ensure it's restored on exit, including Ctrl+C.
126129
c := make(chan os.Signal, 1)
127130
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
128-
stopper.RunAsyncTask(ctx, "Ctrl+C", func(context.Context) {
131+
err := stopper.RunAsyncTask(ctx, "Ctrl+C", func(context.Context) {
129132
select {
130133
case <-c:
131134
fmt.Print(ShowCursor)
@@ -136,12 +139,17 @@ func main() {
136139
break
137140
}
138141
})
142+
if err != nil {
143+
fmt.Printf("Failed to install Ctrl-C handler: %v\n", err)
144+
}
139145
fmt.Print(HideCursor)
140146
defer fmt.Print(ShowCursor)
141147

142-
// Start pprof server at http://localhost:8080/debug/pprof/
148+
// Start pprof server at http://localhost:8080/debug/pprof/.
143149
go func() {
144-
http.ListenAndServe("localhost:8080", nil)
150+
if err := http.ListenAndServe("localhost:8080", nil); err != nil {
151+
fmt.Printf("Failed to start pprof server: %v\n", err)
152+
}
145153
}()
146154

147155
switch flag.Arg(0) {
@@ -340,6 +348,14 @@ func (vb *vectorBench) BuildIndex() {
340348
panic(err)
341349
}
342350

351+
// Create index immediately if flag is not set (default behavior).
352+
if !*flagCreateIndexAfterImport {
353+
err = vb.provider.CreateIndex(vb.ctx)
354+
if err != nil {
355+
panic(err)
356+
}
357+
}
358+
343359
// Compute percentile latencies.
344360
estimator := NewPercentileEstimator(1000)
345361

@@ -368,7 +384,7 @@ func (vb *vectorBench) BuildIndex() {
368384
var lastInserted int
369385
batchSize := *flagBatchSize
370386

371-
// Reset the dataset to start from the beginning
387+
// Reset the dataset to start from the beginning.
372388
vb.data.Reset()
373389

374390
for {
@@ -384,7 +400,7 @@ func (vb *vectorBench) BuildIndex() {
384400
trainBatch := vb.data.Train
385401
insertedBefore := int(insertCount.Load())
386402

387-
// Create primary keys for this batch
403+
// Create primary keys for this batch.
388404
primaryKeys := make([]cspann.KeyBytes, trainBatch.Count)
389405
keyBuf := make(cspann.KeyBytes, trainBatch.Count*4)
390406
for i := range trainBatch.Count {
@@ -456,6 +472,50 @@ func (vb *vectorBench) BuildIndex() {
456472
}
457473
}
458474

475+
// Create index after import if flag is set.
476+
if *flagCreateIndexAfterImport {
477+
if !*flagHideProgress {
478+
fmt.Println()
479+
}
480+
startIndexTime := timeutil.Now()
481+
482+
// Start index creation in a goroutine to track progress.
483+
done := make(chan error, 1)
484+
go func() {
485+
done <- vb.provider.CreateIndex(vb.ctx)
486+
}()
487+
488+
// Track progress until CreateIndex returns.
489+
ticker := time.NewTicker(time.Second)
490+
defer ticker.Stop()
491+
492+
for {
493+
select {
494+
case err := <-done:
495+
// CreateIndex returned, stop tracking.
496+
if err != nil {
497+
panic(err)
498+
}
499+
fmt.Printf(ClearLine+White+"\rVector index creation completed in %v\n"+Reset,
500+
roundDuration(timeutil.Since(startIndexTime)))
501+
goto indexCreated
502+
case <-ticker.C:
503+
if *flagHideProgress {
504+
break
505+
}
506+
// Check progress.
507+
progress, err := vb.provider.CheckIndexCreationStatus(vb.ctx)
508+
if err != nil {
509+
fmt.Printf(Red+"Error checking index status: %v\n"+Reset, err)
510+
} else {
511+
fmt.Printf(ClearLine+White+"\rIndex creation progress: %.1f%% in %v"+Reset,
512+
progress*100, timeutil.Since(startIndexTime).Truncate(time.Second))
513+
}
514+
}
515+
}
516+
indexCreated:
517+
}
518+
459519
fmt.Printf(White+"\nBuilt index in %v\n"+Reset, roundDuration(startAt.Elapsed()))
460520

461521
// Ensure that index is persisted so it can be reused.
@@ -494,6 +554,11 @@ func (vb *vectorBench) ensureDataset(ctx context.Context) {
494554
func newVectorProvider(
495555
stopper *stop.Stopper, datasetName string, dims int, distanceMetric vecpb.DistanceMetric,
496556
) (VectorProvider, error) {
557+
// Validate flag compatibility.
558+
if *flagCreateIndexAfterImport && *flagMemStore {
559+
return nil, errors.New("--create-index-after-import flag cannot be used with --memstore")
560+
}
561+
497562
options := cspann.IndexOptions{
498563
MinPartitionSize: minPartitionSize,
499564
MaxPartitionSize: maxPartitionSize,

pkg/cmd/vecbench/mem_provider.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func (m *MemProvider) Load(ctx context.Context) (bool, error) {
110110
return true, nil
111111
}
112112

113-
// New implements the VectorProvider interface
113+
// New implements the VectorProvider interface.
114114
func (m *MemProvider) New(ctx context.Context) error {
115115
// Clear any existing state.
116116
m.Close()
@@ -149,6 +149,18 @@ func (m *MemProvider) InsertVectors(
149149
})
150150
}
151151

152+
// CreateIndex implements the VectorProvider interface.
153+
func (m *MemProvider) CreateIndex(ctx context.Context) error {
154+
// No-op for in-memory provider as index is built incrementally during insertion.
155+
return nil
156+
}
157+
158+
// CheckIndexCreationStatus implements the VectorProvider interface.
159+
func (m *MemProvider) CheckIndexCreationStatus(ctx context.Context) (float64, error) {
160+
// Always return 100% complete for in-memory provider since index is built incrementally.
161+
return 1.0, nil
162+
}
163+
152164
// SetupSearch implements the VectorProvider interface.
153165
func (m *MemProvider) SetupSearch(
154166
ctx context.Context, maxResults int, beamSize int,

pkg/cmd/vecbench/sql_provider.go

Lines changed: 64 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ type SQLProvider struct {
4848
options cspann.IndexOptions
4949
pool *pgxpool.Pool
5050
tableName string
51+
indexName string
5152
retryCount atomic.Uint64
5253
}
5354

@@ -85,6 +86,7 @@ func NewSQLProvider(
8586

8687
// Create sanitized table and index names.
8788
tableName := fmt.Sprintf("vecbench_%s", sanitizeIdentifier(datasetName))
89+
indexName := fmt.Sprintf("vecbench_%s_embedding_idx", sanitizeIdentifier(datasetName))
8890

8991
return &SQLProvider{
9092
datasetName: datasetName,
@@ -93,6 +95,7 @@ func NewSQLProvider(
9395
options: options,
9496
pool: pool,
9597
tableName: tableName,
98+
indexName: indexName,
9699
}, nil
97100
}
98101

@@ -136,6 +139,20 @@ func (s *SQLProvider) New(ctx context.Context) error {
136139
return errors.Wrap(err, "dropping table")
137140
}
138141

142+
// Enable vector indexes if not already enabled.
143+
_, err = s.pool.Exec(ctx, "SET CLUSTER SETTING feature.vector_index.enabled = true")
144+
if err != nil {
145+
return errors.Wrap(err, "enabling vector indexes")
146+
}
147+
148+
// Create table without vector index. Index creation is handled separately.
149+
_, err = s.pool.Exec(ctx, fmt.Sprintf(`CREATE TABLE %s (
150+
id BYTES PRIMARY KEY,
151+
embedding VECTOR(%d))`, s.tableName, s.dims))
152+
return errors.Wrap(err, "creating table")
153+
}
154+
155+
func (s *SQLProvider) CreateIndex(ctx context.Context) error {
139156
var opClass string
140157
switch s.distMetric {
141158
case vecpb.CosineDistance:
@@ -144,13 +161,52 @@ func (s *SQLProvider) New(ctx context.Context) error {
144161
opClass = " vector_ip_ops"
145162
}
146163

147-
_, err = s.pool.Exec(ctx, fmt.Sprintf(`
148-
CREATE TABLE %s (
149-
id BYTES PRIMARY KEY,
150-
embedding VECTOR(%d),
151-
VECTOR INDEX (embedding%s)
152-
)`, s.tableName, s.dims, opClass))
153-
return errors.Wrap(err, "creating table")
164+
_, err := s.pool.Exec(ctx, fmt.Sprintf(
165+
"CREATE VECTOR INDEX %s ON %s (embedding%s)",
166+
s.indexName,
167+
s.tableName,
168+
opClass,
169+
))
170+
return errors.Wrap(err, "creating index")
171+
}
172+
173+
// CheckIndexCreationStatus implements the VectorProvider interface.
174+
func (s *SQLProvider) CheckIndexCreationStatus(ctx context.Context) (float64, error) {
175+
var status string
176+
var fractionCompleted float64
177+
178+
// Query for jobs related to our index creation.
179+
rows, err := s.pool.Query(ctx, `
180+
SELECT status, fraction_completed
181+
FROM [SHOW JOBS]
182+
WHERE description LIKE '%%vecbench%%'
183+
AND job_type = 'NEW SCHEMA CHANGE'
184+
ORDER BY created DESC
185+
LIMIT 1`)
186+
if err != nil {
187+
return 0, errors.Wrap(err, "querying job progress")
188+
}
189+
defer rows.Close()
190+
191+
if !rows.Next() {
192+
if err := rows.Err(); err != nil {
193+
return 0, errors.Wrap(err, "querying job progress")
194+
}
195+
return 0.0, nil
196+
}
197+
198+
if err := rows.Scan(&status, &fractionCompleted); err != nil {
199+
return 0, errors.Wrap(err, "scanning job progress")
200+
}
201+
202+
if status == "succeeded" {
203+
return 1.0, nil
204+
} else if status == "failed" || status == "canceled" {
205+
return fractionCompleted, errors.Newf("index creation job failed with status: %s", status)
206+
}
207+
208+
// Job is still running.
209+
return fractionCompleted, nil
154210
}
155211

156212
// InsertVectors implements the VectorProvider interface.
@@ -296,7 +352,7 @@ func (s *SQLProvider) FormatStats() string {
296352
return ""
297353
}
298354

299-
// sanitizeIdentifier makes a string safe to use as a SQL identifier
355+
// sanitizeIdentifier makes a string safe to use as a SQL identifier.
300356
func sanitizeIdentifier(s string) string {
301357
// Replace non-alphanumeric characters with underscores.
302358
return strings.Map(func(r rune) rune {

pkg/cmd/vecbench/vector_provider.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,15 @@ type VectorProvider interface {
5454
// identified by a key.
5555
InsertVectors(ctx context.Context, keys []cspann.KeyBytes, vectors vector.Set) error
5656

57+
// CreateIndex creates a vector index on the data. This is called after
58+
// table/store creation when the index should be created before data import.
59+
CreateIndex(ctx context.Context) error
60+
61+
// CheckIndexCreationStatus returns the percentage complete (0.0-1.0) of index
62+
// creation and any error. For providers that don't support async index creation,
63+
// this should return 1.0, nil.
64+
CheckIndexCreationStatus(ctx context.Context) (float64, error)
65+
5766
// SetupSearch allows the provider to perform expensive up-front steps in
5867
// preparation for many calls to Search. It returns provider-specific state
5968
// that will be passed to Search.

0 commit comments

Comments
 (0)