Skip to content

Commit 8c91603

Browse files
committed
changefeedccl: support CREATE DATABASE CHANGEFEED syntax
Add basic syntax support for database-level changefeeds. CREATE DATABASE CHANGEFEED for foo This PR only allows the aforementioned statement to be parsed; no changefeed is created. Epic: CRDB-1421 Resolves: #147369 Release note: None
1 parent c63a85b commit 8c91603

File tree

14 files changed

+202
-90
lines changed

14 files changed

+202
-90
lines changed

docs/generated/sql/bnf/create_changefeed_stmt.bnf

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
create_changefeed_stmt ::=
2-
'CREATE' 'CHANGEFEED' 'FOR' changefeed_target ( ( ',' changefeed_target ) )* 'INTO' sink 'WITH' option '=' value ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
3-
| 'CREATE' 'CHANGEFEED' 'FOR' changefeed_target ( ( ',' changefeed_target ) )* 'INTO' sink 'WITH' option ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
4-
| 'CREATE' 'CHANGEFEED' 'FOR' changefeed_target ( ( ',' changefeed_target ) )* 'INTO' sink 'WITH' option '=' value ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
5-
| 'CREATE' 'CHANGEFEED' 'FOR' changefeed_target ( ( ',' changefeed_target ) )* 'INTO' sink 'WITH' option ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
6-
| 'CREATE' 'CHANGEFEED' 'FOR' changefeed_target ( ( ',' changefeed_target ) )* 'INTO' sink
2+
'CREATE' 'CHANGEFEED' 'FOR' changefeed_table_target ( ( ',' changefeed_table_target ) )* 'INTO' sink 'WITH' option '=' value ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
3+
| 'CREATE' 'CHANGEFEED' 'FOR' changefeed_table_target ( ( ',' changefeed_table_target ) )* 'INTO' sink 'WITH' option ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
4+
| 'CREATE' 'CHANGEFEED' 'FOR' changefeed_table_target ( ( ',' changefeed_table_target ) )* 'INTO' sink 'WITH' option '=' value ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
5+
| 'CREATE' 'CHANGEFEED' 'FOR' changefeed_table_target ( ( ',' changefeed_table_target ) )* 'INTO' sink 'WITH' option ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
6+
| 'CREATE' 'CHANGEFEED' 'FOR' changefeed_table_target ( ( ',' changefeed_table_target ) )* 'INTO' sink
7+
| 'CREATE_CHANGEFEED_FOR_DATABASE' 'CHANGEFEED' 'FOR' 'DATABASE' database_option 'INTO' sink 'WITH' option '=' value ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
8+
| 'CREATE_CHANGEFEED_FOR_DATABASE' 'CHANGEFEED' 'FOR' 'DATABASE' database_option 'INTO' sink 'WITH' option ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
9+
| 'CREATE_CHANGEFEED_FOR_DATABASE' 'CHANGEFEED' 'FOR' 'DATABASE' database_option 'INTO' sink 'WITH' option '=' value ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
10+
| 'CREATE_CHANGEFEED_FOR_DATABASE' 'CHANGEFEED' 'FOR' 'DATABASE' database_option 'INTO' sink 'WITH' option ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
11+
| 'CREATE_CHANGEFEED_FOR_DATABASE' 'CHANGEFEED' 'FOR' 'DATABASE' database_option 'INTO' sink
712
| 'CREATE' 'CHANGEFEED' 'INTO' sink 'WITH' option '=' value ( ( ',' ( option '=' value | option | option '=' value | option ) ) )* 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause
813
| 'CREATE' 'CHANGEFEED' 'INTO' sink 'WITH' option ( ( ',' ( option '=' value | option | option '=' value | option ) ) )* 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause
914
| 'CREATE' 'CHANGEFEED' 'INTO' sink 'WITH' option '=' value ( ( ',' ( option '=' value | option | option '=' value | option ) ) )* 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause

docs/generated/sql/bnf/create_schedule_for_changefeed_stmt.bnf

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
create_schedule_for_changefeed_stmt ::=
2-
'CREATE' 'SCHEDULE' ( 'IF NOT EXISTS' | ) schedule_label 'FOR' 'CHANGEFEED' changefeed_target ( ( ',' changefeed_target ) )* ( 'INTO' changefeed_sink ) ( | 'WITH' changefeed_option ( ',' changefeed_option )* ) 'RECURRING' crontab 'WITH' 'SCHEDULE' 'OPTIONS' schedule_option
3-
| 'CREATE' 'SCHEDULE' ( 'IF NOT EXISTS' | ) schedule_label 'FOR' 'CHANGEFEED' changefeed_target ( ( ',' changefeed_target ) )* ( 'INTO' changefeed_sink ) ( | 'WITH' changefeed_option ( ',' changefeed_option )* ) 'RECURRING' crontab 'WITH' 'SCHEDULE' 'OPTIONS' '(' schedule_option ')'
4-
| 'CREATE' 'SCHEDULE' ( 'IF NOT EXISTS' | ) schedule_label 'FOR' 'CHANGEFEED' changefeed_target ( ( ',' changefeed_target ) )* ( 'INTO' changefeed_sink ) ( | 'WITH' changefeed_option ( ',' changefeed_option )* ) 'RECURRING' crontab
2+
'CREATE' 'SCHEDULE' ( 'IF NOT EXISTS' | ) schedule_label 'FOR' 'CHANGEFEED' changefeed_table_target ( ( ',' changefeed_table_target ) )* ( 'INTO' changefeed_sink ) ( | 'WITH' changefeed_option ( ',' changefeed_option )* ) 'RECURRING' crontab 'WITH' 'SCHEDULE' 'OPTIONS' schedule_option
3+
| 'CREATE' 'SCHEDULE' ( 'IF NOT EXISTS' | ) schedule_label 'FOR' 'CHANGEFEED' changefeed_table_target ( ( ',' changefeed_table_target ) )* ( 'INTO' changefeed_sink ) ( | 'WITH' changefeed_option ( ',' changefeed_option )* ) 'RECURRING' crontab 'WITH' 'SCHEDULE' 'OPTIONS' '(' schedule_option ')'
4+
| 'CREATE' 'SCHEDULE' ( 'IF NOT EXISTS' | ) schedule_label 'FOR' 'CHANGEFEED' changefeed_table_target ( ( ',' changefeed_table_target ) )* ( 'INTO' changefeed_sink ) ( | 'WITH' changefeed_option ( ',' changefeed_option )* ) 'RECURRING' crontab
55
| 'CREATE' 'SCHEDULE' ( 'IF NOT EXISTS' | ) schedule_label 'FOR' 'CHANGEFEED' ( 'INTO' changefeed_sink ) ( | 'WITH' changefeed_option ( ',' changefeed_option )* ) 'AS' 'SELECT' target_list 'FROM' insert_target where_clause 'RECURRING' crontab 'WITH' 'SCHEDULE' 'OPTIONS' schedule_option
66
| 'CREATE' 'SCHEDULE' ( 'IF NOT EXISTS' | ) schedule_label 'FOR' 'CHANGEFEED' ( 'INTO' changefeed_sink ) ( | 'WITH' changefeed_option ( ',' changefeed_option )* ) 'AS' 'SELECT' target_list 'FROM' insert_target where_clause 'RECURRING' crontab 'WITH' 'SCHEDULE' 'OPTIONS' '(' schedule_option ')'
77
| 'CREATE' 'SCHEDULE' ( 'IF NOT EXISTS' | ) schedule_label 'FOR' 'CHANGEFEED' ( 'INTO' changefeed_sink ) ( | 'WITH' changefeed_option ( ',' changefeed_option )* ) 'AS' 'SELECT' target_list 'FROM' insert_target where_clause 'RECURRING' crontab

docs/generated/sql/bnf/stmt_block.bnf

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -632,7 +632,8 @@ create_stats_stmt ::=
632632
'CREATE' 'STATISTICS' statistics_name opt_stats_columns 'FROM' create_stats_target opt_create_stats_options
633633

634634
create_changefeed_stmt ::=
635-
'CREATE' 'CHANGEFEED' 'FOR' changefeed_targets opt_changefeed_sink opt_with_options
635+
'CREATE' 'CHANGEFEED' 'FOR' changefeed_table_targets opt_changefeed_sink opt_with_options
636+
| 'CREATE_CHANGEFEED_FOR_DATABASE' 'CHANGEFEED' 'FOR' 'DATABASE' database_name opt_changefeed_sink opt_with_options
636637
| 'CREATE' 'CHANGEFEED' opt_changefeed_sink opt_with_options 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause
637638

638639
create_extension_stmt ::=
@@ -1922,8 +1923,8 @@ opt_create_stats_options ::=
19221923
| 'WITH' 'OPTIONS' create_stats_option_list
19231924
|
19241925

1925-
changefeed_targets ::=
1926-
( changefeed_target ) ( ( ',' changefeed_target ) )*
1926+
changefeed_table_targets ::=
1927+
( changefeed_table_target ) ( ( ',' changefeed_table_target ) )*
19271928

19281929
opt_changefeed_sink ::=
19291930
'INTO' string_or_placeholder
@@ -1949,7 +1950,7 @@ opt_logical_replication_create_table_options ::=
19491950
|
19501951

19511952
create_schedule_for_changefeed_stmt ::=
1952-
'CREATE' 'SCHEDULE' schedule_label_spec 'FOR' 'CHANGEFEED' changefeed_targets changefeed_sink opt_with_options cron_expr opt_with_schedule_options
1953+
'CREATE' 'SCHEDULE' schedule_label_spec 'FOR' 'CHANGEFEED' changefeed_table_targets changefeed_sink opt_with_options cron_expr opt_with_schedule_options
19531954
| 'CREATE' 'SCHEDULE' schedule_label_spec 'FOR' 'CHANGEFEED' changefeed_sink opt_with_options 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause cron_expr opt_with_schedule_options
19541955

19551956
create_schedule_for_backup_stmt ::=
@@ -2822,7 +2823,7 @@ opt_policy_command ::=
28222823
create_stats_option_list ::=
28232824
( create_stats_option ) ( ( create_stats_option ) )*
28242825

2825-
changefeed_target ::=
2826+
changefeed_table_target ::=
28262827
opt_table_prefix table_name opt_changefeed_family
28272828

28282829
target_elem ::=
@@ -3227,8 +3228,8 @@ target_object_type ::=
32273228
| 'ROUTINES'
32283229

32293230
alter_changefeed_cmd ::=
3230-
'ADD' changefeed_targets opt_with_options
3231-
| 'DROP' changefeed_targets
3231+
'ADD' changefeed_table_targets opt_with_options
3232+
| 'DROP' changefeed_table_targets
32323233
| 'SET' kv_option_list
32333234
| 'UNSET' name_list
32343235

pkg/ccl/changefeedccl/alter_changefeed_stmt.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ func alterChangefeedPlanHook(
173173
}
174174
}
175175
}
176-
newChangefeedStmt.Targets = newTargets
176+
newChangefeedStmt.TableTargets = newTargets
177177

178178
if prevDetails.Select != "" {
179179
query, err := cdceval.ParseChangefeedExpression(prevDetails.Select)
@@ -394,27 +394,27 @@ func generateAndValidateNewTargets(
394394
prevProgress jobspb.Progress,
395395
sinkURI string,
396396
) (
397-
tree.ChangefeedTargets,
397+
tree.ChangefeedTableTargets,
398398
*jobspb.Progress,
399399
hlc.Timestamp,
400-
map[tree.ChangefeedTarget]jobspb.ChangefeedTargetSpecification,
400+
map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification,
401401
error,
402402
) {
403403

404404
type targetKey struct {
405405
TableID descpb.ID
406406
FamilyName tree.Name
407407
}
408-
newTargets := make(map[targetKey]tree.ChangefeedTarget)
409-
droppedTargets := make(map[targetKey]tree.ChangefeedTarget)
408+
newTargets := make(map[targetKey]tree.ChangefeedTableTarget)
409+
droppedTargets := make(map[targetKey]tree.ChangefeedTableTarget)
410410
newTableDescs := make(map[descpb.ID]catalog.Descriptor)
411411

412412
// originalSpecs provides a mapping between tree.ChangefeedTargets that
413413
// existed prior to the alteration of the changefeed to their corresponding
414414
// jobspb.ChangefeedTargetSpecification. The purpose of this mapping is to ensure
415415
// that the StatementTimeName of the existing targets are not modified when the
416416
// name of the target was modified.
417-
originalSpecs := make(map[tree.ChangefeedTarget]jobspb.ChangefeedTargetSpecification)
417+
originalSpecs := make(map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification)
418418

419419
// We want to store the value of whether or not the original changefeed had
420420
// initial_scan set to only so that we only do an initial scan on an alter
@@ -486,7 +486,7 @@ func generateAndValidateNewTargets(
486486
return err
487487
}
488488

489-
newTarget := tree.ChangefeedTarget{
489+
newTarget := tree.ChangefeedTableTarget{
490490
TableName: tablePattern,
491491
FamilyName: tree.Name(targetSpec.FamilyName),
492492
}
@@ -658,7 +658,7 @@ func generateAndValidateNewTargets(
658658
}
659659
}
660660

661-
newTargetList := tree.ChangefeedTargets{}
661+
newTargetList := tree.ChangefeedTableTargets{}
662662

663663
for _, target := range newTargets {
664664
newTargetList = append(newTargetList, target)
@@ -688,7 +688,7 @@ func generateAndValidateNewTargets(
688688
func validateNewTargets(
689689
ctx context.Context,
690690
p sql.PlanHookState,
691-
newTargets tree.ChangefeedTargets,
691+
newTargets tree.ChangefeedTableTargets,
692692
jobProgress jobspb.Progress,
693693
jobStatementTime hlc.Timestamp,
694694
) error {

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func init() {
8888

8989
type annotatedChangefeedStatement struct {
9090
*tree.CreateChangefeed
91-
originalSpecs map[tree.ChangefeedTarget]jobspb.ChangefeedTargetSpecification
91+
originalSpecs map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification
9292
alterChangefeedAsOf hlc.Timestamp
9393
CreatedByInfo *jobs.CreatedByInfo
9494
}
@@ -226,6 +226,10 @@ func changefeedPlanHook(
226226
}
227227
opts := changefeedbase.MakeStatementOptions(rawOpts)
228228

229+
if changefeedStmt.Level == tree.ChangefeedLevelDatabase {
230+
return nil, nil, false, errors.UnimplementedError(errors.IssueLink{}, "database-level changefeed is not implemented")
231+
}
232+
229233
description, err := makeChangefeedDescription(ctx, changefeedStmt.CreateChangefeed, sinkURI, opts)
230234
if err != nil {
231235
return nil, nil, false, err
@@ -539,7 +543,7 @@ func createChangefeedJobRecord(
539543
}
540544

541545
tableOnlyTargetList := tree.BackupTargetList{}
542-
for _, t := range changefeedStmt.Targets {
546+
for _, t := range changefeedStmt.TableTargets {
543547
tableOnlyTargetList.Tables.TablePatterns = append(tableOnlyTargetList.Tables.TablePatterns, t.TableName)
544548
}
545549

@@ -558,8 +562,8 @@ func createChangefeedJobRecord(
558562
}
559563
}
560564

561-
targets, tables, err := getTargetsAndTables(ctx, p, targetDescs, changefeedStmt.Targets,
562-
changefeedStmt.originalSpecs, opts.ShouldUseFullStatementTimeName(), sinkURI)
565+
targets, tables, err := getTargetsAndTables(ctx, p, targetDescs, changefeedStmt.TableTargets,
566+
changefeedStmt.originalSpecs, opts.ShouldUseFullStatementTimeName())
563567

564568
if err != nil {
565569
return nil, err
@@ -940,14 +944,13 @@ func getTargetsAndTables(
940944
ctx context.Context,
941945
p sql.PlanHookState,
942946
targetDescs map[tree.TablePattern]catalog.Descriptor,
943-
rawTargets tree.ChangefeedTargets,
944-
originalSpecs map[tree.ChangefeedTarget]jobspb.ChangefeedTargetSpecification,
947+
rawTargets tree.ChangefeedTableTargets,
948+
originalSpecs map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification,
945949
fullTableName bool,
946-
sinkURI string,
947950
) ([]jobspb.ChangefeedTargetSpecification, jobspb.ChangefeedTargets, error) {
948951
tables := make(jobspb.ChangefeedTargets, len(targetDescs))
949952
targets := make([]jobspb.ChangefeedTargetSpecification, len(rawTargets))
950-
seen := make(map[jobspb.ChangefeedTargetSpecification]tree.ChangefeedTarget)
953+
seen := make(map[jobspb.ChangefeedTargetSpecification]tree.ChangefeedTableTarget)
951954

952955
for i, ct := range rawTargets {
953956
desc, ok := targetDescs[ct.TableName]
@@ -1122,8 +1125,8 @@ func makeChangefeedDescription(
11221125
opts changefeedbase.StatementOptions,
11231126
) (string, error) {
11241127
c := &tree.CreateChangefeed{
1125-
Targets: changefeed.Targets,
1126-
Select: changefeed.Select,
1128+
TableTargets: changefeed.TableTargets,
1129+
Select: changefeed.Select,
11271130
}
11281131

11291132
if sinkURI != "" {

pkg/ccl/changefeedccl/scheduled_changefeed.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ func makeScheduledChangefeedSpec(
275275
var err error
276276

277277
tablePatterns := make([]tree.TablePattern, 0)
278-
for _, target := range schedule.Targets {
278+
for _, target := range schedule.TableTargets {
279279
tablePatterns = append(tablePatterns, target.TableName)
280280
}
281281

@@ -284,12 +284,13 @@ func makeScheduledChangefeedSpec(
284284
return nil, errors.Wrap(err, "qualifying target tables")
285285
}
286286

287-
newTargets := make([]tree.ChangefeedTarget, 0)
287+
newTargets := make([]tree.ChangefeedTableTarget, 0)
288288
for i, table := range qualifiedTablePatterns {
289-
newTargets = append(newTargets, tree.ChangefeedTarget{TableName: table, FamilyName: schedule.Targets[i].FamilyName})
289+
target := schedule.TableTargets[i]
290+
newTargets = append(newTargets, tree.ChangefeedTableTarget{TableName: table, FamilyName: target.FamilyName})
290291
}
291292

292-
schedule.Targets = newTargets
293+
schedule.TableTargets = newTargets
293294

294295
// We need to change the TableExpr inside the Select clause to be fully
295296
// qualified. Otherwise, when we execute the schedule, we will parse the
@@ -626,10 +627,10 @@ func doCreateChangefeedSchedule(
626627
}
627628

628629
createChangefeedNode := &tree.CreateChangefeed{
629-
Targets: spec.Targets,
630-
SinkURI: tree.NewStrVal(*spec.evaluatedSinkURI),
631-
Options: spec.Options,
632-
Select: spec.Select,
630+
TableTargets: spec.TableTargets,
631+
SinkURI: tree.NewStrVal(*spec.evaluatedSinkURI),
632+
Options: spec.Options,
633+
Select: spec.Select,
633634
}
634635

635636
es, err := makeChangefeedSchedule(env, p.User(), scheduleLabel, recurrence, details, createChangefeedNode)

pkg/ccl/changefeedccl/scheduled_changefeed_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -870,7 +870,7 @@ func TestFullyQualifyTables(t *testing.T) {
870870
defer cleanupPlanHook()
871871

872872
tablePatterns := make([]tree.TablePattern, 0)
873-
for _, target := range createChangeFeedStmt.Targets {
873+
for _, target := range createChangeFeedStmt.TableTargets {
874874
tablePatterns = append(tablePatterns, target.TableName)
875875
}
876876

pkg/cmd/docgen/diagrams.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -782,9 +782,10 @@ var specs = []stmtSpec{
782782
unlink: []string{"non_reserved_word_or_sconst", "signed_iconst", "encoding", "limit"},
783783
nosplit: true,
784784
},
785+
// TODO: Add new database level changefeed syntax here when it is ready to be released (#149347).
785786
{
786787
name: "create_changefeed_stmt",
787-
inline: []string{"changefeed_targets", "opt_changefeed_sink", "opt_with_options", "kv_option_list", "kv_option"},
788+
inline: []string{"changefeed_table_targets", "opt_changefeed_sink", "opt_with_options", "kv_option_list", "kv_option"},
788789
replace: map[string]string{
789790
"table_option": "table_name",
790791
"'INTO' string_or_placeholder": "'INTO' sink",
@@ -839,7 +840,7 @@ var specs = []stmtSpec{
839840
},
840841
{
841842
name: "create_schedule_for_changefeed_stmt",
842-
inline: []string{"opt_with_schedule_options", "changefeed_targets", "table_pattern", "opt_where_clause", "changefeed_target_expr", "cron_expr"},
843+
inline: []string{"opt_with_schedule_options", "changefeed_table_targets", "table_pattern", "opt_where_clause", "changefeed_target_expr", "cron_expr"},
843844
replace: map[string]string{
844845
"schedule_label_spec": "( 'IF NOT EXISTS' | ) schedule_label",
845846
"changefeed_sink": "( 'INTO' changefeed_sink )",

pkg/sql/parser/lexer.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ func (l *lexer) Lex(lval *sqlSymType) int {
207207
}
208208
}
209209

210-
case NOT, WITH, AS, GENERATED, NULLS, RESET, ROLE, USER, ON, TENANT, CLUSTER, SET:
210+
case NOT, WITH, AS, GENERATED, NULLS, RESET, ROLE, USER, ON, TENANT, CLUSTER, SET, CREATE:
211211
nextToken := sqlSymType{}
212212
if l.lastPos+1 < len(l.tokens) {
213213
nextToken = l.tokens[l.lastPos+1]
@@ -289,6 +289,17 @@ func (l *lexer) Lex(lval *sqlSymType) int {
289289
case ALL:
290290
lval.id = CLUSTER_ALL
291291
}
292+
case CREATE:
293+
switch nextToken.id {
294+
case CHANGEFEED:
295+
switch secondToken.id {
296+
case FOR:
297+
switch thirdToken.id {
298+
case DATABASE:
299+
lval.id = CREATE_CHANGEFEED_FOR_DATABASE
300+
}
301+
}
302+
}
292303
case SET:
293304
switch nextToken.id {
294305
case TRACING:

0 commit comments

Comments
 (0)