Skip to content

Commit 12e6e15

Browse files
committed
sql/delegate: show changefeed jobs option for db-level changefeeds to
show all watched tables SHOW CHANGEFEED JOBS currently returns in a field all of the tables that a changefeed job is watching. With db-level changefeeds, often there could be a large number of tables watched in the database, so we want to default to not listing any of the tables the db-level changefeed watches. Adding a new option WITH WATCHED_TABLES to cause the full list of tables to be returned. In addition, a new field, database_name, is added to SHOW CHANGEFEED JOBS to show the name of the database, in the case of a db-level changefeed. It will be null for table-level changefeeds. Epic: CRDB-1421 Fixes: #148023 Release note (sql change): A new field is added to SHOW CHANGEFEED JOBS. database_name gives the name of the database that a db-level changefeed is on. For a table-level changefeed, it will be null. SHOW CHANGEFEED JOBS field full_table_names is changed for db-level changefeeds. By default, the list of watched tables in full_table_names will be empty for a db-level changefeed, giving the total number of watched tables. A new option WITH WATCHED_TABLES is added that will make SHOW CHANGEFEED JOBS return the full list of target tables, like with non-db level changefeeds.
1 parent 4c6b4ce commit 12e6e15

File tree

8 files changed

+227
-19
lines changed

8 files changed

+227
-19
lines changed

docs/generated/sql/bnf/show_jobs.bnf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@ show_jobs_stmt ::=
33
| 'SHOW' 'JOBS'
44
| 'SHOW' 'JOBS' 'WITH' show_job_options_list
55
| 'SHOW' 'CHANGEFEED' 'JOBS'
6+
| 'SHOW' 'CHANGEFEED' 'JOBS' 'WITH' 'WATCHED_TABLES'
67
| 'SHOW' 'JOBS' select_stmt
78
| 'SHOW' 'JOBS' select_stmt 'WITH' show_job_options_list
89
| 'SHOW' 'JOBS' 'WHEN' 'COMPLETE' select_stmt
910
| 'SHOW' 'JOBS' for_schedules_clause
1011
| 'SHOW' 'CHANGEFEED' 'JOBS' select_stmt
12+
| 'SHOW' 'CHANGEFEED' 'JOBS' select_stmt 'WITH' 'WATCHED_TABLES'
1113
| 'SHOW' 'JOB' job_id
1214
| 'SHOW' 'JOB' job_id 'WITH' show_job_options_list
1315
| 'SHOW' 'CHANGEFEED' 'JOB' job_id
16+
| 'SHOW' 'CHANGEFEED' 'JOB' job_id 'WITH' 'WATCHED_TABLES'
1417
| 'SHOW' 'JOB' 'WHEN' 'COMPLETE' job_id

docs/generated/sql/bnf/stmt_block.bnf

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -934,14 +934,17 @@ show_jobs_stmt ::=
934934
| 'SHOW' 'JOBS'
935935
| 'SHOW' 'JOBS' 'WITH' show_job_options_list
936936
| 'SHOW' 'CHANGEFEED' 'JOBS'
937+
| 'SHOW' 'CHANGEFEED' 'JOBS' 'WITH' 'WATCHED_TABLES'
937938
| 'SHOW' 'JOBS' select_stmt
938939
| 'SHOW' 'JOBS' select_stmt 'WITH' show_job_options_list
939940
| 'SHOW' 'JOBS' 'WHEN' 'COMPLETE' select_stmt
940941
| 'SHOW' 'JOBS' for_schedules_clause
941942
| 'SHOW' 'CHANGEFEED' 'JOBS' select_stmt
943+
| 'SHOW' 'CHANGEFEED' 'JOBS' select_stmt 'WITH' 'WATCHED_TABLES'
942944
| 'SHOW' 'JOB' a_expr
943945
| 'SHOW' 'JOB' a_expr 'WITH' show_job_options_list
944946
| 'SHOW' 'CHANGEFEED' 'JOB' a_expr
947+
| 'SHOW' 'CHANGEFEED' 'JOB' a_expr 'WITH' 'WATCHED_TABLES'
945948
| 'SHOW' 'JOB' 'WHEN' 'COMPLETE' a_expr
946949

947950
show_locality_stmt ::=
@@ -1593,6 +1596,7 @@ unreserved_keyword ::=
15931596
| 'VISIBILITY'
15941597
| 'VOLATILE'
15951598
| 'VOTERS'
1599+
| 'WATCHED_TABLES'
15961600
| 'WITHIN'
15971601
| 'WITHOUT'
15981602
| 'WRITE'
@@ -4596,6 +4600,7 @@ bare_label_keywords ::=
45964600
| 'VISIBILITY'
45974601
| 'VOLATILE'
45984602
| 'VOTERS'
4603+
| 'WATCHED_TABLES'
45994604
| 'WHEN'
46004605
| 'WORK'
46014606
| 'WRITE'

pkg/ccl/changefeedccl/show_changefeed_jobs_test.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,102 @@ func (d *fakeResumer) CollectProfile(context.Context, interface{}) error {
5757
return nil
5858
}
5959

60+
func TestShowChangefeedJobsDatabaseLevel(t *testing.T) {
61+
defer leaktest.AfterTest(t)()
62+
defer log.Scope(t).Close(t)
63+
64+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
65+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
66+
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
67+
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`)
68+
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b STRING)`)
69+
sqlDB.Exec(t, `INSERT INTO bar VALUES (1, 'initial')`)
70+
71+
tcf := feed(t, f, `CREATE CHANGEFEED FOR d.foo, d.bar`)
72+
defer closeFeed(t, tcf)
73+
assertPayloads(t, tcf, []string{
74+
`foo: [0]->{"after": {"a": 0, "b": "initial"}}`,
75+
`bar: [1]->{"after": {"a": 1, "b": "initial"}}`,
76+
})
77+
waitForJobState(sqlDB, t, tcf.(cdctest.EnterpriseTestFeed).JobID(), jobs.StateRunning)
78+
79+
dbcf := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d`)
80+
defer closeFeed(t, dbcf)
81+
assertPayloads(t, dbcf, []string{
82+
`foo: [0]->{"after": {"a": 0, "b": "initial"}}`,
83+
`bar: [1]->{"after": {"a": 1, "b": "initial"}}`,
84+
})
85+
waitForJobState(sqlDB, t, dbcf.(cdctest.EnterpriseTestFeed).JobID(), jobs.StateRunning)
86+
87+
t.Run("without watched tables", func(t *testing.T) {
88+
var numRows int
89+
sqlDB.QueryRow(t, `select count(*) from [SHOW CHANGEFEED JOBS]`).Scan(&numRows)
90+
require.Equal(t, 2, numRows)
91+
rowResults := sqlDB.Query(t, `select job_id, full_table_names, database_name from [SHOW CHANGEFEED JOBS]`)
92+
for rowResults.Next() {
93+
err := rowResults.Err()
94+
if err != nil {
95+
t.Fatal(err)
96+
}
97+
var jobID jobspb.JobID
98+
var fullTableNames []uint8
99+
var databaseName gosql.NullString
100+
err = rowResults.Scan(&jobID, &fullTableNames, &databaseName)
101+
if err != nil {
102+
t.Fatal(err)
103+
}
104+
if jobID == dbcf.(cdctest.EnterpriseTestFeed).JobID() {
105+
require.Equal(t, "{}", string(fullTableNames))
106+
require.True(t, databaseName.Valid)
107+
require.Equal(t, "d", databaseName.String)
108+
} else if jobID == tcf.(cdctest.EnterpriseTestFeed).JobID() {
109+
if string(fullTableNames) != "{d.public.foo,d.public.bar}" && string(fullTableNames) != "{d.public.bar,d.public.foo}" {
110+
t.Fatalf("Unexpected full table names: %s", string(fullTableNames))
111+
}
112+
require.False(t, databaseName.Valid)
113+
} else {
114+
t.Fatalf("Unexpected job ID: %d", jobID)
115+
}
116+
}
117+
})
118+
119+
t.Run("with watched tables", func(t *testing.T) {
120+
var numRows int
121+
sqlDB.QueryRow(t, `select count(*) from [SHOW CHANGEFEED JOBS WITH WATCHED_TABLES]`).Scan(&numRows)
122+
require.Equal(t, 2, numRows)
123+
rowResults := sqlDB.Query(t, `select job_id, full_table_names, database_name from [SHOW CHANGEFEED JOBS WITH WATCHED_TABLES]`)
124+
for rowResults.Next() {
125+
err := rowResults.Err()
126+
if err != nil {
127+
t.Fatal(err)
128+
}
129+
var jobID jobspb.JobID
130+
var fullTableNames []uint8
131+
var databaseName gosql.NullString
132+
err = rowResults.Scan(&jobID, &fullTableNames, &databaseName)
133+
if err != nil {
134+
t.Fatal(err)
135+
}
136+
if jobID == dbcf.(cdctest.EnterpriseTestFeed).JobID() || jobID == tcf.(cdctest.EnterpriseTestFeed).JobID() {
137+
if string(fullTableNames) != "{d.public.foo,d.public.bar}" && string(fullTableNames) != "{d.public.bar,d.public.foo}" {
138+
t.Fatalf("Unexpected full table names: %s", string(fullTableNames))
139+
}
140+
} else {
141+
t.Fatalf("Unexpected job ID: %d", jobID)
142+
}
143+
if jobID == dbcf.(cdctest.EnterpriseTestFeed).JobID() {
144+
require.True(t, databaseName.Valid)
145+
require.Equal(t, "d", databaseName.String)
146+
} else if jobID == tcf.(cdctest.EnterpriseTestFeed).JobID() {
147+
require.False(t, databaseName.Valid)
148+
} else {
149+
t.Fatalf("Unexpected job ID: %d", jobID)
150+
}
151+
}
152+
})
153+
}
154+
cdcTest(t, testFn, feedTestEnterpriseSinks)
155+
}
60156
func TestShowChangefeedJobsBasic(t *testing.T) {
61157
defer leaktest.AfterTest(t)()
62158
defer log.Scope(t).Close(t)

pkg/sql/delegate/show_changefeed_jobs.go

Lines changed: 61 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package delegate
88
import (
99
"fmt"
1010

11+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1112
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
1213
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
1314
)
@@ -29,6 +30,56 @@ WITH payload AS (
2930
FROM
3031
crdb_internal.system_jobs
3132
WHERE job_type = 'CHANGEFEED'%s
33+
),
34+
spec_details AS (
35+
SELECT
36+
id,
37+
json_array_elements(changefeed_details->'target_specifications') ? 'type' AND (json_array_elements(changefeed_details->'target_specifications')->'type')::int = %d AS is_db, -- database level changefeed
38+
json_array_elements(changefeed_details->'target_specifications')->>'descriptor_id' AS descriptor_id,
39+
changefeed_details->'tables' AS tables
40+
FROM payload
41+
),
42+
db_targets AS (
43+
SELECT
44+
sd.id,
45+
CASE
46+
WHEN %t
47+
THEN array_agg(concat(t.database_name,'.',t.schema_name,'.',t.name))
48+
ELSE
49+
ARRAY[]::STRING[]
50+
END AS names
51+
FROM spec_details sd
52+
INNER JOIN crdb_internal.tables t ON sd.descriptor_id::int = t.parent_id
53+
WHERE sd.is_db
54+
GROUP BY sd.id
55+
),
56+
table_targets AS (
57+
SELECT
58+
sd.id,
59+
array_agg(distinct concat(t.database_name,'.',t.schema_name, '.', t.name)) AS names
60+
FROM spec_details sd
61+
CROSS JOIN LATERAL json_each(sd.tables) AS j(table_id, val)
62+
INNER JOIN crdb_internal.tables t ON t.table_id = j.table_id::INT
63+
WHERE NOT sd.is_db
64+
GROUP BY sd.id
65+
),
66+
targets AS (
67+
SELECT
68+
id,
69+
names
70+
FROM db_targets
71+
UNION ALL
72+
SELECT
73+
id,
74+
names
75+
FROM table_targets
76+
),
77+
database as (
78+
SELECT
79+
sd.id,
80+
name
81+
FROM spec_details sd
82+
INNER JOIN crdb_internal.databases d ON sd.descriptor_id::int = d.id
3283
)
3384
SELECT
3485
job_id,
@@ -37,34 +88,27 @@ SELECT
3788
status,
3889
running_status,
3990
created,
40-
created as started,
91+
created AS started,
4192
finished,
4293
modified,
4394
high_water_timestamp,
44-
hlc_to_timestamp(high_water_timestamp) as readable_high_water_timestamptz,
95+
hlc_to_timestamp(high_water_timestamp) AS readable_high_water_timestamptz,
4596
error,
4697
replace(
4798
changefeed_details->>'sink_uri',
4899
'\u0026', '&'
49100
) AS sink_uri,
50-
ARRAY (
51-
SELECT
52-
concat(
53-
database_name, '.', schema_name, '.',
54-
name
55-
)
56-
FROM
57-
crdb_internal.tables
58-
WHERE
59-
table_id = ANY (SELECT key::INT FROM json_each(changefeed_details->'tables'))
60-
) AS full_table_names,
101+
targets.names AS full_table_names,
61102
changefeed_details->'opts'->>'topics' AS topics,
62-
COALESCE(changefeed_details->'opts'->>'format','json') AS format
103+
COALESCE(changefeed_details->'opts'->>'format','json') AS format,
104+
database.name AS database_name
63105
FROM
64106
crdb_internal.jobs
65-
INNER JOIN payload ON id = job_id`
107+
INNER JOIN targets ON job_id = targets.id
108+
INNER JOIN payload ON job_id = payload.id
109+
LEFT JOIN database ON job_id = database.id
110+
`
66111
)
67-
68112
var whereClause, innerWhereClause, orderbyClause string
69113
if n.Jobs == nil {
70114
// The query intends to present:
@@ -80,7 +124,7 @@ FROM
80124
innerWhereClause = fmt.Sprintf(` AND id in (%s)`, n.Jobs.String())
81125
}
82126

83-
selectClause := fmt.Sprintf(baseSelectClause, innerWhereClause)
127+
selectClause := fmt.Sprintf(baseSelectClause, innerWhereClause, jobspb.ChangefeedTargetSpecification_DATABASE, n.IncludeWatchedTables)
84128
sqlStmt := fmt.Sprintf("%s %s %s", selectClause, whereClause, orderbyClause)
85129

86130
return d.parse(sqlStmt)

pkg/sql/parser/sql.y

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1099,7 +1099,7 @@ func (u *sqlSymUnion) changefeedFilterOption() *tree.ChangefeedFilterOption {
10991099
%token <str> VIEWCLUSTERSETTING VIRTUAL VISIBLE INVISIBLE VISIBILITY VOLATILE VOTERS
11001100
%token <str> VIRTUAL_CLUSTER_NAME VIRTUAL_CLUSTER
11011101

1102-
%token <str> WHEN WHERE WINDOW WITH WITHIN WITHOUT WORK WRITE
1102+
%token <str> WATCHED_TABLES WHEN WHERE WINDOW WITH WITHIN WITHOUT WORK WRITE
11031103

11041104
%token <str> YEAR
11051105

@@ -9492,6 +9492,10 @@ show_jobs_stmt:
94929492
{
94939493
$$.val = &tree.ShowChangefeedJobs{}
94949494
}
9495+
| SHOW CHANGEFEED JOBS WITH WATCHED_TABLES
9496+
{
9497+
$$.val = &tree.ShowChangefeedJobs{IncludeWatchedTables: true}
9498+
}
94959499
| SHOW AUTOMATIC JOBS error // SHOW HELP: SHOW JOBS
94969500
| SHOW JOBS error // SHOW HELP: SHOW JOBS
94979501
| SHOW CHANGEFEED JOBS error // SHOW HELP: SHOW JOBS
@@ -9518,6 +9522,13 @@ show_jobs_stmt:
95189522
{
95199523
$$.val = &tree.ShowChangefeedJobs{Jobs: $4.slct()}
95209524
}
9525+
| SHOW CHANGEFEED JOBS select_stmt WITH WATCHED_TABLES
9526+
{
9527+
$$.val = &tree.ShowChangefeedJobs{
9528+
Jobs: $4.slct(),
9529+
IncludeWatchedTables: true,
9530+
}
9531+
}
95219532
| SHOW JOBS select_stmt error // SHOW HELP: SHOW JOBS
95229533
| SHOW JOB a_expr
95239534
{
@@ -9544,6 +9555,15 @@ show_jobs_stmt:
95449555
},
95459556
}
95469557
}
9558+
| SHOW CHANGEFEED JOB a_expr WITH WATCHED_TABLES
9559+
{
9560+
$$.val = &tree.ShowChangefeedJobs{
9561+
Jobs: &tree.Select{
9562+
Select: &tree.ValuesClause{Rows: []tree.Exprs{tree.Exprs{$4.expr()}}},
9563+
},
9564+
IncludeWatchedTables: true,
9565+
}
9566+
}
95479567
| SHOW JOB WHEN COMPLETE a_expr
95489568
{
95499569
$$.val = &tree.ShowJobs{
@@ -18933,6 +18953,7 @@ unreserved_keyword:
1893318953
| VISIBILITY
1893418954
| VOLATILE
1893518955
| VOTERS
18956+
| WATCHED_TABLES
1893618957
| WITHIN
1893718958
| WITHOUT
1893818959
| WRITE
@@ -19547,6 +19568,7 @@ bare_label_keywords:
1954719568
| VISIBILITY
1954819569
| VOLATILE
1954919570
| VOTERS
19571+
| WATCHED_TABLES
1955019572
| WHEN
1955119573
| WORK
1955219574
| WRITE

pkg/sql/parser/testdata/changefeed

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,4 +221,4 @@ CREATE CHANGEFEED FOR DATABASE d1 INCLUDE TABLES foo, bar INTO '*****' -- normal
221221
CREATE CHANGEFEED FOR DATABASE d1 INCLUDE TABLES foo, bar INTO ('*****') -- fully parenthesized
222222
CREATE CHANGEFEED FOR DATABASE d1 INCLUDE TABLES foo, bar INTO '_' -- literals removed
223223
CREATE CHANGEFEED FOR DATABASE _ INCLUDE TABLES _, _ INTO '*****' -- identifiers removed
224-
CREATE CHANGEFEED FOR DATABASE d1 INCLUDE TABLES foo, bar INTO 'foo' -- passwords exposed
224+
CREATE CHANGEFEED FOR DATABASE d1 INCLUDE TABLES foo, bar INTO 'foo' -- passwords exposed

pkg/sql/parser/testdata/show

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,22 @@ EXPLAIN SHOW CHANGEFEED JOBS VALUES ((1234)) -- fully parenthesized
721721
EXPLAIN SHOW CHANGEFEED JOBS VALUES (_) -- literals removed
722722
EXPLAIN SHOW CHANGEFEED JOBS VALUES (1234) -- identifiers removed
723723

724+
parse
725+
SHOW CHANGEFEED JOB 1234 WITH WATCHED_TABLES
726+
----
727+
SHOW CHANGEFEED JOBS VALUES (1234) WITH WATCHED_TABLES -- normalized!
728+
SHOW CHANGEFEED JOBS VALUES ((1234)) WITH WATCHED_TABLES -- fully parenthesized
729+
SHOW CHANGEFEED JOBS VALUES (_) WITH WATCHED_TABLES -- literals removed
730+
SHOW CHANGEFEED JOBS VALUES (1234) WITH WATCHED_TABLES -- identifiers removed
731+
732+
parse
733+
EXPLAIN SHOW CHANGEFEED JOB 1234 WITH WATCHED_TABLES
734+
----
735+
EXPLAIN SHOW CHANGEFEED JOBS VALUES (1234) WITH WATCHED_TABLES -- normalized!
736+
EXPLAIN SHOW CHANGEFEED JOBS VALUES ((1234)) WITH WATCHED_TABLES -- fully parenthesized
737+
EXPLAIN SHOW CHANGEFEED JOBS VALUES (_) WITH WATCHED_TABLES -- literals removed
738+
EXPLAIN SHOW CHANGEFEED JOBS VALUES (1234) WITH WATCHED_TABLES -- identifiers removed
739+
724740
parse
725741
SHOW CHANGEFEED JOBS
726742
----
@@ -737,6 +753,22 @@ EXPLAIN SHOW CHANGEFEED JOBS -- fully parenthesized
737753
EXPLAIN SHOW CHANGEFEED JOBS -- literals removed
738754
EXPLAIN SHOW CHANGEFEED JOBS -- identifiers removed
739755

756+
parse
757+
SHOW CHANGEFEED JOBS WITH WATCHED_TABLES
758+
----
759+
SHOW CHANGEFEED JOBS WITH WATCHED_TABLES
760+
SHOW CHANGEFEED JOBS WITH WATCHED_TABLES -- fully parenthesized
761+
SHOW CHANGEFEED JOBS WITH WATCHED_TABLES -- literals removed
762+
SHOW CHANGEFEED JOBS WITH WATCHED_TABLES -- identifiers removed
763+
764+
parse
765+
EXPLAIN SHOW CHANGEFEED JOBS WITH WATCHED_TABLES
766+
----
767+
EXPLAIN SHOW CHANGEFEED JOBS WITH WATCHED_TABLES
768+
EXPLAIN SHOW CHANGEFEED JOBS WITH WATCHED_TABLES -- fully parenthesized
769+
EXPLAIN SHOW CHANGEFEED JOBS WITH WATCHED_TABLES -- literals removed
770+
EXPLAIN SHOW CHANGEFEED JOBS WITH WATCHED_TABLES -- identifiers removed
771+
740772
parse
741773
SHOW CLUSTER STATEMENTS
742774
----

0 commit comments

Comments
 (0)