Skip to content

Commit aa79556

Browse files
committed
changefeedccl: change db-level feed default to no initial scan
This change the default initial scan behavior for db-level feeds to not do an initial scan if no option is specified. When an initial scan is specified, that is respected. We do not allow db-level changefeeds with initial_scan='only'. This includes a rewrite of existing db-level feeds tests which rely on default initial scans in their testing. Epic: CRDB-1421 Informs: #156484 Release note: None
1 parent 7e8456c commit aa79556

File tree

3 files changed

+360
-188
lines changed

3 files changed

+360
-188
lines changed

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,9 +230,18 @@ func changefeedPlanHook(
230230
return nil, nil, false, err
231231
}
232232

233-
// Treat all tables inside a database as if "split_column_families" is set.
234233
if changefeedStmt.Level == tree.ChangefeedLevelDatabase {
234+
// Treat all tables inside a database as if "split_column_families" is set.
235235
rawOpts[changefeedbase.OptSplitColumnFamilies] = `yes`
236+
237+
// The default behavior for a database-level changefeed is
238+
// NOT to perform an initial scan, unlike table-level changefeeds.
239+
_, initialScanSet := rawOpts[changefeedbase.OptInitialScan]
240+
_, initialScanOnlySet := rawOpts[changefeedbase.OptInitialScanOnly]
241+
_, noInitialScanSet := rawOpts[changefeedbase.OptNoInitialScan]
242+
if !initialScanOnlySet && !noInitialScanSet && !initialScanSet {
243+
rawOpts[changefeedbase.OptInitialScan] = `no`
244+
}
236245
}
237246
opts := changefeedbase.MakeStatementOptions(rawOpts)
238247

@@ -1462,6 +1471,16 @@ func validateDetailsAndOptions(
14621471
if err := opts.ValidateForCreateChangefeed(details.Select != ""); err != nil {
14631472
return err
14641473
}
1474+
if isDBLevelChangefeed(details) {
1475+
scanType, err := opts.GetInitialScanType()
1476+
if err != nil {
1477+
return err
1478+
}
1479+
if scanType == changefeedbase.OnlyInitialScan {
1480+
return errors.Errorf(
1481+
`cannot specify %s on a database level changefeed`, changefeedbase.OptInitialScanOnly)
1482+
}
1483+
}
14651484
if opts.HasEndTime() {
14661485
scanType, err := opts.GetInitialScanType()
14671486
if err != nil {
@@ -2123,6 +2142,14 @@ func getQualifiedTableNameObj(
21232142
return tbName, nil
21242143
}
21252144

2145+
func isDBLevelChangefeed(details jobspb.ChangefeedDetails) bool {
2146+
targetSpecs := details.TargetSpecifications
2147+
if len(targetSpecs) == 0 {
2148+
return false
2149+
}
2150+
return targetSpecs[0].Type == jobspb.ChangefeedTargetSpecification_DATABASE
2151+
}
2152+
21262153
// getChangefeedTargetName gets a table name with or without the dots
21272154
func getChangefeedTargetName(
21282155
ctx context.Context,

0 commit comments

Comments
 (0)