Skip to content

Commit d31f0f4

Browse files
craig[bot]kev-caomgartnerandy-kimball
committed
144063: roachtest: add compaction to backup fixtures r=msbutler a=kev-cao This commit enables compaction in the generation of backup fixtures. Epic: None Release note: None 144111: sql: move BackupRestoreTestingKnobs to exec_util_backup.go r=mgartner a=mgartner `BackupRestoreTestingKnobs` has been moved to a new file, `exec_util_backup.go` and `CODEOWNERS` has been updated. Epic: None Release note: None 145274: vecindex: fix bugs found during testing of vector indexes r=drewkimball a=andy-kimball #### cspann: do not overwrite root level when searching targets In the case where the root partition is in a non-Ready state, we need to search its target partitions. Previously, we were overwriting the root partition's level, which should be the same as the target partition's level. However, if there's a bug, it can be different, and overriding the root level causes cascading problems downstream, including a panic that can take down the server. This commit adds code to check the level and assert that it's equal to the root partition level rather than silently overwriting it. #### vecindex: clone treeKey when creating fixups Previously, the C-SPANN index assumed that it owned the memory for the vector index prefix value (called the "tree key" in C-SPANN). However, the backfiller assumed the opposite, and reuses the memory between SearchForInsert calls. This commit resolves the conflict by always cloning the tree key when it's enqueued as part of a fixup, so that callers can reuse the memory. Fixes: #145261 #### cspann: pass tree key to delete vector fixup Previously, the delete vector fixup did not take a tree key parameter, which meant it always searched the default tree for the vector to delete. This commit fixes that by passing the tree key to the fixup. Release justification: Vector indexing is a business priority. These changes are all in the vecindex package, for a feature that is protected by a default-off feature flag. Co-authored-by: Kevin Cao <[email protected]> Co-authored-by: Marcus Gartner <[email protected]> Co-authored-by: Andrew Kimball <[email protected]>
4 parents ddf9096 + 749a612 + ded4059 + 1b8f532 commit d31f0f4

21 files changed

+626
-151
lines changed

.github/CODEOWNERS

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
/pkg/sql/control_job* @cockroachdb/sql-queries-prs @cockroachdb/jobs-prs
7474
/pkg/sql/job_exec_context* @cockroachdb/sql-queries-prs @cockroachdb/jobs-prs
7575
/pkg/sql/delegate/*job*.go @cockroachdb/jobs-prs @cockroachdb/disaster-recovery
76+
#!/pkg/sql/BUILD.bazel @cockroachdb/sql-queries-noreview
7677

7778
/pkg/sql/importer/ @cockroachdb/sql-queries-prs
7879
/pkg/sql/importer/export* @cockroachdb/cdc-prs
@@ -258,8 +259,9 @@
258259
/pkg/ccl/changefeedccl/ @cockroachdb/cdc-prs
259260

260261
/pkg/crosscluster/ @cockroachdb/disaster-recovery
261-
/pkg/backup/ @cockroachdb/disaster-recovery
262-
/pkg//backup/*_job.go @cockroachdb/disaster-recovery @cockroachdb/jobs-prs
262+
/pkg/backup/ @cockroachdb/disaster-recovery
263+
/pkg//backup/*_job.go @cockroachdb/disaster-recovery @cockroachdb/jobs-prs
264+
/pkg/sql/exec_util_backup.go @cockroachdb/disaster-recovery
263265
/pkg/revert/ @cockroachdb/disaster-recovery
264266
/pkg/ccl/storageccl/ @cockroachdb/disaster-recovery
265267
/pkg/ccl/cloudccl/ @cockroachdb/disaster-recovery

pkg/cmd/roachtest/tests/backup_fixtures.go

Lines changed: 131 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade"
2121
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
2222
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
23+
"github.com/cockroachdb/cockroach/pkg/jobs"
24+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2325
"github.com/cockroachdb/cockroach/pkg/roachprod"
2426
"github.com/cockroachdb/cockroach/pkg/roachprod/blobfixture"
2527
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
@@ -35,6 +37,8 @@ type TpccFixture struct {
3537
WorkloadWarehouses int
3638
MinutesPerIncremental int
3739
IncrementalChainLength int
40+
CompactionThreshold int
41+
CompactionWindow int
3842
RestoredSizeEstimate string
3943
}
4044

@@ -45,6 +49,8 @@ var TinyFixture = TpccFixture{
4549
ImportWarehouses: 10,
4650
WorkloadWarehouses: 10,
4751
IncrementalChainLength: 4,
52+
CompactionThreshold: 3,
53+
CompactionWindow: 2,
4854
RestoredSizeEstimate: "700MiB",
4955
}
5056

@@ -55,6 +61,8 @@ var SmallFixture = TpccFixture{
5561
ImportWarehouses: 5000,
5662
WorkloadWarehouses: 1000,
5763
IncrementalChainLength: 48,
64+
CompactionThreshold: 3,
65+
CompactionWindow: 2,
5866
RestoredSizeEstimate: "350GiB",
5967
}
6068

@@ -65,6 +73,8 @@ var MediumFixture = TpccFixture{
6573
ImportWarehouses: 30000,
6674
WorkloadWarehouses: 5000,
6775
IncrementalChainLength: 400,
76+
CompactionThreshold: 3,
77+
CompactionWindow: 2,
6878
RestoredSizeEstimate: "2TiB",
6979
}
7080

@@ -77,6 +87,8 @@ var LargeFixture = TpccFixture{
7787
ImportWarehouses: 300000,
7888
WorkloadWarehouses: 7500,
7989
IncrementalChainLength: 400,
90+
CompactionThreshold: 3,
91+
CompactionWindow: 2,
8092
RestoredSizeEstimate: "20TiB",
8193
}
8294

@@ -205,37 +217,136 @@ func (bd *backupDriver) runWorkload(ctx context.Context) (func(), error) {
205217
// scheduleBackups begins the backup schedule.
206218
func (bd *backupDriver) scheduleBackups(ctx context.Context) {
207219
bd.t.L().Printf("creating backup schedule", bd.sp.fixture.WorkloadWarehouses)
208-
209-
createScheduleStatement := CreateScheduleStatement(bd.registry.URI(bd.fixture.DataPath))
210220
conn := bd.c.Conn(ctx, bd.t.L(), 1)
221+
defer conn.Close()
222+
if bd.sp.fixture.CompactionThreshold > 0 {
223+
bd.t.L().Printf(
224+
"enabling compaction with threshold %d and window size %d",
225+
bd.sp.fixture.CompactionThreshold, bd.sp.fixture.CompactionWindow,
226+
)
227+
_, err := conn.Exec(fmt.Sprintf(
228+
"SET CLUSTER SETTING backup.compaction.threshold = %d", bd.sp.fixture.CompactionThreshold,
229+
))
230+
require.NoError(bd.t, err)
231+
_, err = conn.Exec(fmt.Sprintf(
232+
"SET CLUSTER SETTING backup.compaction.window_size = %d", bd.sp.fixture.CompactionWindow,
233+
))
234+
require.NoError(bd.t, err)
235+
}
236+
createScheduleStatement := CreateScheduleStatement(bd.registry.URI(bd.fixture.DataPath))
211237
_, err := conn.Exec(createScheduleStatement)
212238
require.NoError(bd.t, err)
213239
}
214240

215241
// monitorBackups pauses the schedule once the target number of backups in the
216242
// chain have been taken.
217-
func (bd *backupDriver) monitorBackups(ctx context.Context) {
218-
sql := sqlutils.MakeSQLRunner(bd.c.Conn(ctx, bd.t.L(), 1))
243+
func (bd *backupDriver) monitorBackups(ctx context.Context) error {
244+
conn := bd.c.Conn(ctx, bd.t.L(), 1)
245+
defer conn.Close()
246+
sql := sqlutils.MakeSQLRunner(conn)
219247
fixtureURI := bd.registry.URI(bd.fixture.DataPath)
220-
for {
248+
const (
249+
WaitingFirstFull = iota
250+
RunningIncrementals
251+
WaitingCompactions
252+
Done
253+
)
254+
state := WaitingFirstFull
255+
for state != Done {
221256
time.Sleep(1 * time.Minute)
222-
var activeScheduleCount int
223-
scheduleCountQuery := fmt.Sprintf(`SELECT count(*) FROM [SHOW SCHEDULES] WHERE label='%s' AND schedule_status='ACTIVE'`, scheduleLabel)
224-
sql.QueryRow(bd.t, scheduleCountQuery).Scan(&activeScheduleCount)
225-
if activeScheduleCount < 2 {
226-
bd.t.L().Printf(`First full backup still running`)
227-
continue
257+
compSuccess, compRunning, compFailed, err := bd.compactionJobStates(sql)
258+
if err != nil {
259+
return err
260+
}
261+
switch state {
262+
case WaitingFirstFull:
263+
var activeScheduleCount int
264+
scheduleCountQuery := fmt.Sprintf(
265+
`SELECT count(*) FROM [SHOW SCHEDULES] WHERE label='%s' AND schedule_status='ACTIVE'`, scheduleLabel,
266+
)
267+
sql.QueryRow(bd.t, scheduleCountQuery).Scan(&activeScheduleCount)
268+
if activeScheduleCount < 2 {
269+
bd.t.L().Printf(`First full backup still running`)
270+
} else {
271+
state = RunningIncrementals
272+
}
273+
case RunningIncrementals:
274+
var backupCount int
275+
backupCountQuery := fmt.Sprintf(
276+
`SELECT count(DISTINCT end_time) FROM [SHOW BACKUP FROM LATEST IN '%s']`, fixtureURI.String(),
277+
)
278+
sql.QueryRow(bd.t, backupCountQuery).Scan(&backupCount)
279+
bd.t.L().Printf(`%d scheduled backups taken`, backupCount)
280+
281+
if bd.sp.fixture.CompactionThreshold > 0 {
282+
bd.t.L().Printf("%d compaction jobs succeeded, %d running", len(compSuccess), len(compRunning))
283+
if len(compFailed) > 0 {
284+
return errors.Newf("compaction jobs with ids %v failed", compFailed)
285+
}
286+
}
287+
288+
if backupCount >= bd.sp.fixture.IncrementalChainLength {
289+
pauseSchedulesQuery := fmt.Sprintf(
290+
`PAUSE SCHEDULES WITH x AS (SHOW SCHEDULES) SELECT id FROM x WHERE label = '%s'`, scheduleLabel,
291+
)
292+
sql.Exec(bd.t, pauseSchedulesQuery)
293+
if len(compRunning) > 0 {
294+
state = WaitingCompactions
295+
} else {
296+
state = Done
297+
}
298+
}
299+
case WaitingCompactions:
300+
if len(compFailed) > 0 {
301+
return errors.Newf("compaction jobs with ids %v failed", compFailed)
302+
} else if len(compRunning) > 0 {
303+
bd.t.L().Printf("waiting for %d compaction jobs to finish", len(compRunning))
304+
} else {
305+
state = Done
306+
}
307+
}
308+
}
309+
return nil
310+
}
311+
312+
// compactionJobStates returns the state of the compaction jobs, returning
313+
// the IDs of the jobs that succeeded, are running, and failed.
314+
func (bd *backupDriver) compactionJobStates(
315+
sql *sqlutils.SQLRunner,
316+
) ([]jobspb.JobID, []jobspb.JobID, []jobspb.JobID, error) {
317+
if bd.sp.fixture.CompactionThreshold == 0 {
318+
return nil, nil, nil, nil
319+
}
320+
type Job struct {
321+
jobID jobspb.JobID
322+
status jobs.State
323+
}
324+
compactionQuery := `SELECT job_id, status FROM [SHOW JOBS] WHERE job_type = 'BACKUP' AND
325+
description ILIKE 'COMPACT BACKUPS%'`
326+
rows := sql.Query(bd.t, compactionQuery)
327+
defer rows.Close()
328+
var compJobs []Job
329+
for rows.Next() {
330+
var job Job
331+
if err := rows.Scan(&job.jobID, &job.status); err != nil {
332+
return nil, nil, nil, errors.Wrapf(err, "error scanning compaction job")
228333
}
229-
var backupCount int
230-
backupCountQuery := fmt.Sprintf(`SELECT count(DISTINCT end_time) FROM [SHOW BACKUP FROM LATEST IN '%s']`, fixtureURI.String())
231-
sql.QueryRow(bd.t, backupCountQuery).Scan(&backupCount)
232-
bd.t.L().Printf(`%d scheduled backups taken`, backupCount)
233-
if backupCount >= bd.sp.fixture.IncrementalChainLength {
234-
pauseSchedulesQuery := fmt.Sprintf(`PAUSE SCHEDULES WITH x AS (SHOW SCHEDULES) SELECT id FROM x WHERE label = '%s'`, scheduleLabel)
235-
sql.Exec(bd.t, pauseSchedulesQuery)
236-
break
334+
compJobs = append(compJobs, job)
335+
}
336+
var successes, running, failures []jobspb.JobID
337+
for _, job := range compJobs {
338+
switch job.status {
339+
case jobs.StateSucceeded:
340+
successes = append(successes, job.jobID)
341+
case jobs.StateRunning:
342+
running = append(running, job.jobID)
343+
case jobs.StateFailed:
344+
failures = append(failures, job.jobID)
345+
default:
346+
bd.t.L().Printf(`unexpected compaction job %d in state %s`, job.jobID, job.status)
237347
}
238348
}
349+
return successes, running, failures, nil
239350
}
240351

241352
func fixtureDirectory() string {
@@ -357,7 +468,7 @@ func registerBackupFixtures(r registry.Registry) {
357468
require.NoError(t, err)
358469

359470
bd.scheduleBackups(ctx)
360-
bd.monitorBackups(ctx)
471+
require.NoError(t, bd.monitorBackups(ctx))
361472

362473
stopWorkload()
363474

pkg/sql/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ go_library(
114114
"exec_factory_util.go",
115115
"exec_log.go",
116116
"exec_util.go",
117+
"exec_util_backup.go",
117118
"execute.go",
118119
"executor_statement_metrics.go",
119120
"explain_bundle.go",

pkg/sql/backfill/backfill_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,3 +181,63 @@ func TestConcurrentOperationsDuringVectorIndexCreation(t *testing.T) {
181181
sqlDB.QueryRow(t, `SELECT id FROM vectors@vec_idx ORDER BY vec <-> '[1, 2, 3]' LIMIT 1`).Scan(&id)
182182
require.Equal(t, 1, id)
183183
}
184+
185+
// Regression for issue #145261: vector index backfill with a prefix column
186+
// crashes the node.
187+
func TestVectorIndexWithPrefixBackfill(t *testing.T) {
188+
defer leaktest.AfterTest(t)()
189+
defer log.Scope(t).Close(t)
190+
191+
ctx := context.Background()
192+
srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
193+
defer srv.Stopper().Stop(ctx)
194+
sqlDB := sqlutils.MakeSQLRunner(db)
195+
196+
// Enable vector indexes.
197+
sqlDB.Exec(t, `SET CLUSTER SETTING feature.vector_index.enabled = true`)
198+
199+
// Create a table with a vector column + a prefix column.
200+
sqlDB.Exec(t, `
201+
CREATE TABLE items (
202+
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
203+
customer_id INT NOT NULL,
204+
name TEXT,
205+
embedding VECTOR(3)
206+
);
207+
`)
208+
209+
// Generate 10 customer id's, each with 100 vectors.
210+
sqlDB.Exec(t, `
211+
INSERT INTO items (customer_id, name, embedding)
212+
SELECT
213+
(i % 10) + 1 AS customer_id,
214+
'Item ' || i,
215+
ARRAY[random(), random(), random()]::vector
216+
FROM generate_series(1, 1000) AS s(i);
217+
`)
218+
219+
// Create the vector index with a small partition size so that the trees
220+
// have more levels and splits to get there.
221+
sqlDB.Exec(t, `
222+
CREATE VECTOR INDEX ON items (customer_id, embedding)
223+
WITH (min_partition_size=2, max_partition_size=8, build_beam_size=2);
224+
`)
225+
226+
// Ensure that each customer has 100 vectors.
227+
for i := range 10 {
228+
func() {
229+
rows := sqlDB.Query(t, `
230+
SELECT id FROM items
231+
WHERE customer_id = $1
232+
ORDER BY embedding <-> $2
233+
LIMIT 200`, i+1, "[0, 0, 0]")
234+
defer rows.Close()
235+
236+
count := 0
237+
for rows.Next() {
238+
count++
239+
}
240+
require.Equal(t, 100, count)
241+
}()
242+
}
243+
}

pkg/sql/exec_util.go

Lines changed: 0 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"time"
2222

2323
apd "github.com/cockroachdb/apd/v3"
24-
"github.com/cockroachdb/cockroach/pkg/backup/backuppb"
2524
"github.com/cockroachdb/cockroach/pkg/base"
2625
"github.com/cockroachdb/cockroach/pkg/cloud/externalconn"
2726
"github.com/cockroachdb/cockroach/pkg/clusterversion"
@@ -41,7 +40,6 @@ import (
4140
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
4241
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
4342
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache"
44-
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
4543
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
4644
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
4745
"github.com/cockroachdb/cockroach/pkg/multitenant"
@@ -1901,67 +1899,6 @@ type SchemaTelemetryTestingKnobs struct {
19011899
// ModuleTestingKnobs implements the base.ModuleTestingKnobs interface.
19021900
func (*SchemaTelemetryTestingKnobs) ModuleTestingKnobs() {}
19031901

1904-
// BackupRestoreTestingKnobs contains knobs for backup and restore behavior.
1905-
//
1906-
// TODO (msbutler): move these to backup
1907-
type BackupRestoreTestingKnobs struct {
1908-
// AfterBackupChunk is called after each chunk of a backup is completed.
1909-
AfterBackupChunk func()
1910-
1911-
// AfterBackupCheckpoint if set will be called after a BACKUP-CHECKPOINT
1912-
// is written.
1913-
AfterBackupCheckpoint func()
1914-
1915-
// AfterLoadingCompactionManifestOnResume is run once the backup manifest has been
1916-
// loaded/created on the resumption of a compaction job.
1917-
AfterLoadingCompactionManifestOnResume func(manifest *backuppb.BackupManifest)
1918-
1919-
// CaptureResolvedTableDescSpans allows for intercepting the spans which are
1920-
// resolved during backup planning, and will eventually be backed up during
1921-
// execution.
1922-
CaptureResolvedTableDescSpans func([]roachpb.Span)
1923-
1924-
// RunAfterSplitAndScatteringEntry allows blocking the RESTORE job after a
1925-
// single RestoreSpanEntry has been split and scattered.
1926-
RunAfterSplitAndScatteringEntry func(ctx context.Context)
1927-
1928-
// RunAfterProcessingRestoreSpanEntry allows blocking the RESTORE job after a
1929-
// single RestoreSpanEntry has been processed and added to the SSTBatcher.
1930-
RunAfterProcessingRestoreSpanEntry func(ctx context.Context, entry *execinfrapb.RestoreSpanEntry) error
1931-
1932-
// RunAfterExportingSpanEntry allows blocking the BACKUP job after a single
1933-
// span has been exported.
1934-
RunAfterExportingSpanEntry func(ctx context.Context, response *kvpb.ExportResponse)
1935-
1936-
// BackupMonitor is used to overwrite the monitor used by backup during
1937-
// testing. This is typically the bulk mem monitor if not
1938-
// specified here.
1939-
BackupMemMonitor *mon.BytesMonitor
1940-
1941-
RestoreDistSQLRetryPolicy *retry.Options
1942-
1943-
RunBeforeRestoreFlow func() error
1944-
1945-
RunAfterRestoreFlow func() error
1946-
1947-
BackupDistSQLRetryPolicy *retry.Options
1948-
1949-
RunBeforeBackupFlow func() error
1950-
1951-
RunAfterBackupFlow func() error
1952-
1953-
RunAfterRetryIteration func(err error) error
1954-
1955-
RunAfterRestoreProcDrains func()
1956-
1957-
RunBeforeResolvingCompactionDest func() error
1958-
}
1959-
1960-
var _ base.ModuleTestingKnobs = &BackupRestoreTestingKnobs{}
1961-
1962-
// ModuleTestingKnobs implements the base.ModuleTestingKnobs interface.
1963-
func (*BackupRestoreTestingKnobs) ModuleTestingKnobs() {}
1964-
19651902
// StreamingTestingKnobs contains knobs for streaming behavior.
19661903
type StreamingTestingKnobs struct {
19671904
// RunAfterReceivingEvent allows blocking the stream ingestion processor after

0 commit comments

Comments
 (0)