Skip to content

Commit 31baea9

Browse files
craig[bot]DrewKimballkev-caorickystewartfqazi
committed
149115: opt: add tests for tpc-ds r=yuzefovich a=DrewKimball #### workload/tpcds: remove remaining queries that can't be parsed This commit removes some queries from `QueriesByNumber` that can't be parsed because of the `rollup()` function. Epic: None Release note: None #### opt: add tests for tpc-ds This commit adds a set of optimizer tests for the TPC-DS benchmark. Currently unsupported queries are left commented out. There is a version with stats (collected with scale factor = 10) and one without stats. Epic: None Release note: None 150169: backup: fix race condition in starting compaction job r=msbutler a=kev-cao In #145930, scheduled compactions are blocked from running if another compaction job is running for the schedule. However, it is currently possible for there to be a race condition which results in a compaction job being unable to find an incremental backup. Take the following circumstance: 1. Compaction job A starts. 2. A scheduled backup B completes and begins considering whether a compaction job should run. It fetches the current chain to its end time and finds that it should run a compaction. 3. Compaction job A completes. 4. B starts a transaction to create the compaction job. Because A has completed, it does not block the job from being created. 5. B creates a compaction job C that has a start time that is now skipped due to A's completion. 6. When C is picked up by the job system, it resolves the backup chain again, which now no longer has its start time and it fails. This is resolved by opening a transaction before fetching the backup chain to check for an already running compaction job ID. Fixes: #149867, #147264 Release note: None 150418: teamcity: add `s390x` unit tests build configuration r=rishabh7m a=rickystewart Epic: CRDB-21133 Release note: None 150435: ccl/cdc: change when drop column is detected in schema feed r=fqazi a=fqazi Previously, before automatic schema_locked toggle was added in the declarative schema changer, we had a guarantee that schema_locked would be unset before any column mutations are added. Unfortunately, the automatic toggle logic will disable schema_locked at the same time as the column is removed. This doesn't play nice with the existing schema feed logic. To address this, this patch detects the mutation once the new primary index is backfilled, so that is separate from the schema_locked being disabled. This is still valid behavior because the schema change is still in flight at this point, and doesn't have a usable new primary index. Informs: #150003 Release note (bug fix): Addressed a bug on schema_locked tables when a column is dropped, and schema_locked is toggled for the user. Co-authored-by: Drew Kimball <[email protected]> Co-authored-by: Kevin Cao <[email protected]> Co-authored-by: Ricky Stewart <[email protected]> Co-authored-by: Faizan Qazi <[email protected]>
5 parents 48c636e + 66872a9 + 52dc3a3 + a7e57b5 + ed66505 commit 31baea9

File tree

29 files changed

+84590
-36
lines changed

29 files changed

+84590
-36
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#!/usr/bin/env bash
2+
3+
# Copyright 2022 The Cockroach Authors.
4+
#
5+
# Use of this software is governed by the CockroachDB Software License
6+
# included in the /LICENSE file.
7+
8+
9+
set -euo pipefail
10+
11+
dir="$(dirname $(dirname $(dirname $(dirname $(dirname "${0}")))))"
12+
13+
source "$dir/teamcity-support.sh" # For $root
14+
source "$dir/teamcity-bazel-support.sh" # For run_bazel
15+
16+
tc_start_block "Run unit tests"
17+
BAZEL_SUPPORT_EXTRA_DOCKER_ARGS="-e TC_BUILD_BRANCH -e GITHUB_API_TOKEN -e BUILD_VCS_NUMBER -e TC_BUILD_ID -e TC_SERVER_URL -e TC_BUILDTYPE_ID -e GITHUB_REPO" run_bazel build/teamcity/cockroach/ci/tests-ibm-cloud-linux-s390x/unit_tests_impl.sh
18+
tc_end_block "Run unit tests"
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#!/usr/bin/env bash
2+
3+
# Copyright 2024 The Cockroach Authors.
4+
#
5+
# Use of this software is governed by the CockroachDB Software License
6+
# included in the /LICENSE file.
7+
8+
9+
set -euo pipefail
10+
11+
dir="$(dirname $(dirname $(dirname $(dirname $(dirname "${0}")))))"
12+
13+
source "$dir/teamcity-support.sh" # for 'tc_release_branch'
14+
15+
bazel build //pkg/cmd/bazci
16+
17+
# Omit the ui_test as it depends on Javascript stuff; we don't have nodejs stuff
18+
# for s390x and it's expensive to pull in anyway.
19+
TESTS=$(bazel query 'kind(go_test, pkg/...) except attr("tags", "[\[ ]integration[,\]]", kind(go_test, pkg/...))' | grep -v ui_test)
20+
21+
set -x
22+
23+
$(bazel info bazel-bin)/pkg/cmd/bazci/bazci_/bazci -- test --config=ci --config=dev \
24+
$TESTS \
25+
--profile=/artifacts/profile.gz

pkg/backup/compaction_job.go

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,28 @@ func maybeStartCompactionJob(
9999
user,
100100
)
101101

102+
// A race condition can occur where a compaction job ends after we fetch the
103+
// backup chain but before we open a transaction to write the record. As a
104+
// result, the written record is based on a chain that did not include the
105+
// newly completed compaction job. In this scenario, it is possible that the
106+
// chosen times for this compaction job actually no longer exist in the chain
107+
// because it was compacted away. To avoid this, we need to check for the lock
108+
// before fetching the backup chain.
109+
//
110+
// Note: _Technically_, this isn't entirely alleviated as a compaction job
111+
// could start and finish in between the time we grab the backup chain and
112+
// before we write the job record. However, this would require the schedule to
113+
// have `on_previous_running=start` and realistically speaking, no compaction
114+
// job would complete that quickly.
115+
if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
116+
_, err := getScheduledExecutionArgsAndCheckCompactionLock(
117+
ctx, txn, env, triggerJob.ScheduleID,
118+
)
119+
return err
120+
}); err != nil {
121+
return 0, err
122+
}
123+
102124
chain, _, _, _, err := getBackupChain(
103125
ctx, execCfg, user, triggerJob.Destination, triggerJob.EncryptionOptions,
104126
triggerJob.EndTime, &kmsEnv,
@@ -115,23 +137,14 @@ func maybeStartCompactionJob(
115137
return 0, err
116138
}
117139
startTS, endTS := chain[start].StartTime, chain[end-1].EndTime
118-
log.Infof(ctx, "compacting backups from %s to %s", startTS, endTS)
119140

120141
var jobID jobspb.JobID
121142
err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
122-
_, args, err := getScheduledBackupExecutionArgsFromSchedule(
123-
ctx, env, jobs.ScheduledJobTxn(txn), triggerJob.ScheduleID,
143+
args, err := getScheduledExecutionArgsAndCheckCompactionLock(
144+
ctx, txn, env, triggerJob.ScheduleID,
124145
)
125146
if err != nil {
126-
return errors.Wrapf(
127-
err, "failed to get scheduled backup execution args for schedule %d", triggerJob.ScheduleID,
128-
)
129-
}
130-
if args.CompactionJobID != 0 {
131-
return errors.Newf(
132-
"compaction job %d already running for schedule %d",
133-
args.CompactionJobID, triggerJob.ScheduleID,
134-
)
147+
return err
135148
}
136149
datums, err := txn.QueryRowEx(
137150
ctx,
@@ -171,6 +184,9 @@ func maybeStartCompactionJob(
171184
)
172185
return scheduledJob.Update(ctx, backupSchedule)
173186
})
187+
if err == nil {
188+
log.Infof(ctx, "compacting backups from %s to %s", startTS, endTS)
189+
}
174190
return jobID, err
175191
}
176192

@@ -931,6 +947,33 @@ func maybeWriteBackupCheckpoint(
931947
return true, nil
932948
}
933949

950+
// getScheduledExecutionArgsAndCheckCompactionLock retrieves the scheduled
951+
// backup execution args and also checks if a compaction jobs is already
952+
// running. If we fail to fetch the args or if a compaction job is already
953+
// running for this schedule, an error is returned.
954+
func getScheduledExecutionArgsAndCheckCompactionLock(
955+
ctx context.Context,
956+
txn isql.Txn,
957+
env scheduledjobs.JobSchedulerEnv,
958+
scheduleID jobspb.ScheduleID,
959+
) (*backuppb.ScheduledBackupExecutionArgs, error) {
960+
_, args, err := getScheduledBackupExecutionArgsFromSchedule(
961+
ctx, env, jobs.ScheduledJobTxn(txn), scheduleID,
962+
)
963+
if err != nil {
964+
return nil, errors.Wrapf(
965+
err, "failed to get scheduled backup execution args for schedule %d", scheduleID,
966+
)
967+
}
968+
if args.CompactionJobID != 0 {
969+
return nil, errors.Newf(
970+
"compaction job %d already running for schedule %d",
971+
args.CompactionJobID, scheduleID,
972+
)
973+
}
974+
return args, nil
975+
}
976+
934977
func init() {
935978
builtins.StartCompactionJob = StartCompactionJob
936979
}

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2180,6 +2180,54 @@ func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamilies(t *testing.T) {
21802180
})
21812181
}
21822182

2183+
func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamiliesWithManualSchemaLocked(
2184+
t *testing.T,
2185+
) {
2186+
defer leaktest.AfterTest(t)()
2187+
defer log.Scope(t).Close(t)
2188+
2189+
require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))
2190+
2191+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
2192+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
2193+
2194+
sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`)
2195+
sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`)
2196+
2197+
var args []any
2198+
if _, ok := f.(*webhookFeedFactory); ok {
2199+
args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "metamorphic enriched envelope does not support column families for webhook sinks"})
2200+
}
2201+
// Open up the changefeed.
2202+
cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY id_a, TABLE hasfams FAMILY b_and_c`, args...)
2203+
defer closeFeed(t, cf)
2204+
assertPayloads(t, cf, []string{
2205+
`hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`,
2206+
`hasfams.id_a: [0]->{"after": {"a": "a", "id": 0}}`,
2207+
})
2208+
2209+
// Check that dropping a watched column will backfill the changefeed.
2210+
sqlDB.Exec(t, `ALTER TABLE hasfams SET (schema_locked=false)`)
2211+
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`)
2212+
sqlDB.Exec(t, `ALTER TABLE hasfams SET (schema_locked=true)`)
2213+
assertPayloads(t, cf, []string{
2214+
`hasfams.id_a: [0]->{"after": {"id": 0}}`,
2215+
})
2216+
2217+
// Check that dropping a watched column will backfill the changefeed.
2218+
sqlDB.Exec(t, `ALTER TABLE hasfams SET (schema_locked=false)`)
2219+
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN b`)
2220+
sqlDB.Exec(t, `ALTER TABLE hasfams SET (schema_locked=true)`)
2221+
assertPayloads(t, cf, []string{
2222+
`hasfams.b_and_c: [0]->{"after": {"c": "c"}}`,
2223+
})
2224+
}
2225+
2226+
runWithAndWithoutRegression141453(t, testFn, func(t *testing.T, testFn cdcTestFn) {
2227+
cdcTest(t, testFn)
2228+
})
2229+
}
2230+
21832231
func TestNoStopAfterNonTargetAddColumnWithBackfill(t *testing.T) {
21842232
defer leaktest.AfterTest(t)()
21852233
defer log.Scope(t).Close(t)
@@ -3203,7 +3251,7 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
32033251
`drop_column: [2]->{"after": {"a": 2, "b": "2"}}`,
32043252
})
32053253
sqlDB.Exec(t, `ALTER TABLE drop_column DROP COLUMN b`)
3206-
ts := schematestutils.FetchDescVersionModificationTime(t, s.Server, `d`, `public`, `drop_column`, 2)
3254+
ts := schematestutils.FetchDescVersionModificationTime(t, s.Server, `d`, `public`, `drop_column`, 6)
32073255

32083256
// Backfill for DROP COLUMN b.
32093257
assertPayloads(t, dropColumn, []string{
@@ -3256,7 +3304,7 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
32563304
// the 7th step (version 15). Finally, when adding column d, it goes from 17->25 ith the schema change
32573305
// being visible at the 7th step (version 23).
32583306
// TODO(#142936): Investigate if this descriptor version hardcoding is sound.
3259-
dropTS := schematestutils.FetchDescVersionModificationTime(t, s.Server, `d`, `public`, `multiple_alters`, 2)
3307+
dropTS := schematestutils.FetchDescVersionModificationTime(t, s.Server, `d`, `public`, `multiple_alters`, 6)
32603308
addTS := schematestutils.FetchDescVersionModificationTime(t, s.Server, `d`, `public`, `multiple_alters`, 15)
32613309
addTS2 := schematestutils.FetchDescVersionModificationTime(t, s.Server, `d`, `public`, `multiple_alters`, 23)
32623310

pkg/ccl/changefeedccl/schemafeed/table_event_filter.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,26 @@ func shouldFilterAddColumnEvent(e TableEvent, targets changefeedbase.Targets) (b
188188
return !watched, nil
189189
}
190190

191+
// notDeclarativeOrHasMergedIndex returns true if the descriptor has a declarative
192+
// schema changer with a merged index.
193+
func notDeclarativeOrHasMergedIndex(desc catalog.TableDescriptor) bool {
194+
// If there are not declarative schema changes then this will always be
195+
// true.
196+
if desc.GetDeclarativeSchemaChangerState() == nil {
197+
return true
198+
}
199+
// For declarative schema changes detect when a new primary index becomes
200+
// WRITE_ONLY (i.e. backfill has been completed).
201+
for idx, target := range desc.GetDeclarativeSchemaChangerState().Targets {
202+
if target.GetPrimaryIndex() != nil &&
203+
target.TargetStatus == scpb.Status_PUBLIC &&
204+
desc.GetDeclarativeSchemaChangerState().CurrentStatuses[idx] == scpb.Status_WRITE_ONLY {
205+
return true
206+
}
207+
}
208+
return false
209+
}
210+
191211
// Returns true if the changefeed targets a column which has a drop mutation inside the table event.
192212
func droppedColumnIsWatched(e TableEvent, targets changefeedbase.Targets) (bool, error) {
193213
// If no column families are specified, then all columns are targeted.
@@ -212,7 +232,13 @@ func droppedColumnIsWatched(e TableEvent, targets changefeedbase.Targets) (bool,
212232
if m.AsColumn() == nil || m.AsColumn().IsHidden() {
213233
continue
214234
}
215-
if m.Dropped() && m.WriteAndDeleteOnly() && watchedColumnIDs.Contains(int(m.AsColumn().GetID())) {
235+
// For dropped columns wait for WriteAndDeleteOnly to be hit. When using
236+
// the declarative schema changer we will wait a bit later in the plan to
237+
// publish the dropped column, since schema_locked and the column being
238+
// write and delete only happen at the same stage. Since the schema change
239+
// is still in progress, there is a gray area in terms of when the change
240+
// should be visible.
241+
if m.Dropped() && m.WriteAndDeleteOnly() && notDeclarativeOrHasMergedIndex(e.After) && watchedColumnIDs.Contains(int(m.AsColumn().GetID())) {
216242
return true, nil
217243
}
218244
}
@@ -273,7 +299,13 @@ func dropVisibleColumnMutationExists(desc catalog.TableDescriptor) bool {
273299
if m.AsColumn() == nil || m.AsColumn().IsHidden() {
274300
continue
275301
}
276-
if m.Dropped() && m.WriteAndDeleteOnly() {
302+
// For dropped columns wait for WriteAndDeleteOnly to be hit. When using
303+
// the declarative schema changer we will wait a bit later in the plan to
304+
// publish the dropped column, since schema_locked and the column being
305+
// write and delete only happen at the same stage. Since the schema change
306+
// is still in progress, there is a gray area in terms of when the change
307+
// should be visible.
308+
if m.Dropped() && m.WriteAndDeleteOnly() && notDeclarativeOrHasMergedIndex(desc) {
277309
return true
278310
}
279311
}

pkg/ccl/changefeedccl/schemafeed/testdata/drop_column

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ ALTER TABLE t DROP COLUMN j;
1212

1313
pop f=1
1414
----
15-
t 1->2: DropColumn
15+
t 1->2: Unknown
1616
t 2->3: Unknown
1717
t 3->4: Unknown
1818
t 4->5: Unknown
19-
t 5->6: Unknown
19+
t 5->6: DropColumn
2020
t 6->7: PrimaryKeyChange (no column changes)
2121
t 7->8: Unknown
2222
t 8->9: AddHiddenColumn

0 commit comments

Comments
 (0)