Skip to content

Commit f01d098

Browse files
craig[bot]log-headdhartunian
committed
151131: sql/delegate: show changefeed jobs option for db-level changefeeds to r=andyyang890,KeithCh a=log-head SHOW CHANGEFEED JOBS currently returns in a field all of the tables that a changefeed job is watching. With db-level changefeeds, often there could be a large number of tables watched in the database, so we want to default to not listing any of the tables the db-level changefeed watches. Adding a new option WITH WATCHED_TABLES to cause the full list of tables to be returned. In addition, a new field, database_name, is added to SHOW CHANGEFEED JOBS to show the name of the database, in the case of a db-level changefeed. It will be null for table-level changefeeds. Epic: [CRDB-1421](https://cockroachlabs.atlassian.net/browse/CRDB-1421) Fixes: #148023 Release note (sql change): A new field is added to SHOW CHANGEFEED JOBS. database_name gives the name of the database that a db-level changefeed is on. For a table-level changefeed, it will be null. SHOW CHANGEFEED JOBS field full_table_names is changed for db-level changefeeds. By default, the list of watched tables in full_table_names will be empty for a db-level changefeed, giving the total number of watched tables. A new option WITH WATCHED_TABLES is added that will make SHOW CHANGEFEED JOBS return the full list of target tables, like with non-db level changefeeds. 154206: ui: add transaction diagnostics api client and page r=kyle-a-wong a=dhartunian This change introduces a transaction diagnostics request page that lists oustanding and completed requests along with the transaction IDs or fingerprints. If the bundle has not yet been captured, we can only show the transactionId, otherwise we'll have the fingeprint available to display as well. In both cases, the ID or fingerprint will link to the corresponding SQL Activity page for that Transaction. Resolves: [CRDB-53543](https://cockroachlabs.atlassian.net/browse/CRDB-53543) Epic: [CRDB-53541](https://cockroachlabs.atlassian.net/browse/CRDB-53541) Release note (db console change): The statement diagnostics page in Advanced Debug now also displays any active transaction diagnostics requests in a separate tab Co-authored-by: Matthew Lougheed <[email protected]> Co-authored-by: David Hartunian <[email protected]>
3 parents 8504d75 + 12e6e15 + d58f436 commit f01d098

File tree

22 files changed

+1610
-167
lines changed

22 files changed

+1610
-167
lines changed

docs/generated/sql/bnf/show_jobs.bnf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@ show_jobs_stmt ::=
33
| 'SHOW' 'JOBS'
44
| 'SHOW' 'JOBS' 'WITH' show_job_options_list
55
| 'SHOW' 'CHANGEFEED' 'JOBS'
6+
| 'SHOW' 'CHANGEFEED' 'JOBS' 'WITH' 'WATCHED_TABLES'
67
| 'SHOW' 'JOBS' select_stmt
78
| 'SHOW' 'JOBS' select_stmt 'WITH' show_job_options_list
89
| 'SHOW' 'JOBS' 'WHEN' 'COMPLETE' select_stmt
910
| 'SHOW' 'JOBS' for_schedules_clause
1011
| 'SHOW' 'CHANGEFEED' 'JOBS' select_stmt
12+
| 'SHOW' 'CHANGEFEED' 'JOBS' select_stmt 'WITH' 'WATCHED_TABLES'
1113
| 'SHOW' 'JOB' job_id
1214
| 'SHOW' 'JOB' job_id 'WITH' show_job_options_list
1315
| 'SHOW' 'CHANGEFEED' 'JOB' job_id
16+
| 'SHOW' 'CHANGEFEED' 'JOB' job_id 'WITH' 'WATCHED_TABLES'
1417
| 'SHOW' 'JOB' 'WHEN' 'COMPLETE' job_id

docs/generated/sql/bnf/stmt_block.bnf

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -934,14 +934,17 @@ show_jobs_stmt ::=
934934
| 'SHOW' 'JOBS'
935935
| 'SHOW' 'JOBS' 'WITH' show_job_options_list
936936
| 'SHOW' 'CHANGEFEED' 'JOBS'
937+
| 'SHOW' 'CHANGEFEED' 'JOBS' 'WITH' 'WATCHED_TABLES'
937938
| 'SHOW' 'JOBS' select_stmt
938939
| 'SHOW' 'JOBS' select_stmt 'WITH' show_job_options_list
939940
| 'SHOW' 'JOBS' 'WHEN' 'COMPLETE' select_stmt
940941
| 'SHOW' 'JOBS' for_schedules_clause
941942
| 'SHOW' 'CHANGEFEED' 'JOBS' select_stmt
943+
| 'SHOW' 'CHANGEFEED' 'JOBS' select_stmt 'WITH' 'WATCHED_TABLES'
942944
| 'SHOW' 'JOB' a_expr
943945
| 'SHOW' 'JOB' a_expr 'WITH' show_job_options_list
944946
| 'SHOW' 'CHANGEFEED' 'JOB' a_expr
947+
| 'SHOW' 'CHANGEFEED' 'JOB' a_expr 'WITH' 'WATCHED_TABLES'
945948
| 'SHOW' 'JOB' 'WHEN' 'COMPLETE' a_expr
946949

947950
show_locality_stmt ::=
@@ -1593,6 +1596,7 @@ unreserved_keyword ::=
15931596
| 'VISIBILITY'
15941597
| 'VOLATILE'
15951598
| 'VOTERS'
1599+
| 'WATCHED_TABLES'
15961600
| 'WITHIN'
15971601
| 'WITHOUT'
15981602
| 'WRITE'
@@ -4596,6 +4600,7 @@ bare_label_keywords ::=
45964600
| 'VISIBILITY'
45974601
| 'VOLATILE'
45984602
| 'VOTERS'
4603+
| 'WATCHED_TABLES'
45994604
| 'WHEN'
46004605
| 'WORK'
46014606
| 'WRITE'

pkg/ccl/changefeedccl/show_changefeed_jobs_test.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,102 @@ func (d *fakeResumer) CollectProfile(context.Context, interface{}) error {
5757
return nil
5858
}
5959

60+
func TestShowChangefeedJobsDatabaseLevel(t *testing.T) {
61+
defer leaktest.AfterTest(t)()
62+
defer log.Scope(t).Close(t)
63+
64+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
65+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
66+
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
67+
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`)
68+
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b STRING)`)
69+
sqlDB.Exec(t, `INSERT INTO bar VALUES (1, 'initial')`)
70+
71+
tcf := feed(t, f, `CREATE CHANGEFEED FOR d.foo, d.bar`)
72+
defer closeFeed(t, tcf)
73+
assertPayloads(t, tcf, []string{
74+
`foo: [0]->{"after": {"a": 0, "b": "initial"}}`,
75+
`bar: [1]->{"after": {"a": 1, "b": "initial"}}`,
76+
})
77+
waitForJobState(sqlDB, t, tcf.(cdctest.EnterpriseTestFeed).JobID(), jobs.StateRunning)
78+
79+
dbcf := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d`)
80+
defer closeFeed(t, dbcf)
81+
assertPayloads(t, dbcf, []string{
82+
`foo: [0]->{"after": {"a": 0, "b": "initial"}}`,
83+
`bar: [1]->{"after": {"a": 1, "b": "initial"}}`,
84+
})
85+
waitForJobState(sqlDB, t, dbcf.(cdctest.EnterpriseTestFeed).JobID(), jobs.StateRunning)
86+
87+
t.Run("without watched tables", func(t *testing.T) {
88+
var numRows int
89+
sqlDB.QueryRow(t, `select count(*) from [SHOW CHANGEFEED JOBS]`).Scan(&numRows)
90+
require.Equal(t, 2, numRows)
91+
rowResults := sqlDB.Query(t, `select job_id, full_table_names, database_name from [SHOW CHANGEFEED JOBS]`)
92+
for rowResults.Next() {
93+
err := rowResults.Err()
94+
if err != nil {
95+
t.Fatal(err)
96+
}
97+
var jobID jobspb.JobID
98+
var fullTableNames []uint8
99+
var databaseName gosql.NullString
100+
err = rowResults.Scan(&jobID, &fullTableNames, &databaseName)
101+
if err != nil {
102+
t.Fatal(err)
103+
}
104+
if jobID == dbcf.(cdctest.EnterpriseTestFeed).JobID() {
105+
require.Equal(t, "{}", string(fullTableNames))
106+
require.True(t, databaseName.Valid)
107+
require.Equal(t, "d", databaseName.String)
108+
} else if jobID == tcf.(cdctest.EnterpriseTestFeed).JobID() {
109+
if string(fullTableNames) != "{d.public.foo,d.public.bar}" && string(fullTableNames) != "{d.public.bar,d.public.foo}" {
110+
t.Fatalf("Unexpected full table names: %s", string(fullTableNames))
111+
}
112+
require.False(t, databaseName.Valid)
113+
} else {
114+
t.Fatalf("Unexpected job ID: %d", jobID)
115+
}
116+
}
117+
})
118+
119+
t.Run("with watched tables", func(t *testing.T) {
120+
var numRows int
121+
sqlDB.QueryRow(t, `select count(*) from [SHOW CHANGEFEED JOBS WITH WATCHED_TABLES]`).Scan(&numRows)
122+
require.Equal(t, 2, numRows)
123+
rowResults := sqlDB.Query(t, `select job_id, full_table_names, database_name from [SHOW CHANGEFEED JOBS WITH WATCHED_TABLES]`)
124+
for rowResults.Next() {
125+
err := rowResults.Err()
126+
if err != nil {
127+
t.Fatal(err)
128+
}
129+
var jobID jobspb.JobID
130+
var fullTableNames []uint8
131+
var databaseName gosql.NullString
132+
err = rowResults.Scan(&jobID, &fullTableNames, &databaseName)
133+
if err != nil {
134+
t.Fatal(err)
135+
}
136+
if jobID == dbcf.(cdctest.EnterpriseTestFeed).JobID() || jobID == tcf.(cdctest.EnterpriseTestFeed).JobID() {
137+
if string(fullTableNames) != "{d.public.foo,d.public.bar}" && string(fullTableNames) != "{d.public.bar,d.public.foo}" {
138+
t.Fatalf("Unexpected full table names: %s", string(fullTableNames))
139+
}
140+
} else {
141+
t.Fatalf("Unexpected job ID: %d", jobID)
142+
}
143+
if jobID == dbcf.(cdctest.EnterpriseTestFeed).JobID() {
144+
require.True(t, databaseName.Valid)
145+
require.Equal(t, "d", databaseName.String)
146+
} else if jobID == tcf.(cdctest.EnterpriseTestFeed).JobID() {
147+
require.False(t, databaseName.Valid)
148+
} else {
149+
t.Fatalf("Unexpected job ID: %d", jobID)
150+
}
151+
}
152+
})
153+
}
154+
cdcTest(t, testFn, feedTestEnterpriseSinks)
155+
}
60156
func TestShowChangefeedJobsBasic(t *testing.T) {
61157
defer leaktest.AfterTest(t)()
62158
defer log.Scope(t).Close(t)

pkg/sql/delegate/show_changefeed_jobs.go

Lines changed: 61 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package delegate
88
import (
99
"fmt"
1010

11+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1112
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
1213
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
1314
)
@@ -29,6 +30,56 @@ WITH payload AS (
2930
FROM
3031
crdb_internal.system_jobs
3132
WHERE job_type = 'CHANGEFEED'%s
33+
),
34+
spec_details AS (
35+
SELECT
36+
id,
37+
json_array_elements(changefeed_details->'target_specifications') ? 'type' AND (json_array_elements(changefeed_details->'target_specifications')->'type')::int = %d AS is_db, -- database level changefeed
38+
json_array_elements(changefeed_details->'target_specifications')->>'descriptor_id' AS descriptor_id,
39+
changefeed_details->'tables' AS tables
40+
FROM payload
41+
),
42+
db_targets AS (
43+
SELECT
44+
sd.id,
45+
CASE
46+
WHEN %t
47+
THEN array_agg(concat(t.database_name,'.',t.schema_name,'.',t.name))
48+
ELSE
49+
ARRAY[]::STRING[]
50+
END AS names
51+
FROM spec_details sd
52+
INNER JOIN crdb_internal.tables t ON sd.descriptor_id::int = t.parent_id
53+
WHERE sd.is_db
54+
GROUP BY sd.id
55+
),
56+
table_targets AS (
57+
SELECT
58+
sd.id,
59+
array_agg(distinct concat(t.database_name,'.',t.schema_name, '.', t.name)) AS names
60+
FROM spec_details sd
61+
CROSS JOIN LATERAL json_each(sd.tables) AS j(table_id, val)
62+
INNER JOIN crdb_internal.tables t ON t.table_id = j.table_id::INT
63+
WHERE NOT sd.is_db
64+
GROUP BY sd.id
65+
),
66+
targets AS (
67+
SELECT
68+
id,
69+
names
70+
FROM db_targets
71+
UNION ALL
72+
SELECT
73+
id,
74+
names
75+
FROM table_targets
76+
),
77+
database as (
78+
SELECT
79+
sd.id,
80+
name
81+
FROM spec_details sd
82+
INNER JOIN crdb_internal.databases d ON sd.descriptor_id::int = d.id
3283
)
3384
SELECT
3485
job_id,
@@ -37,34 +88,27 @@ SELECT
3788
status,
3889
running_status,
3990
created,
40-
created as started,
91+
created AS started,
4192
finished,
4293
modified,
4394
high_water_timestamp,
44-
hlc_to_timestamp(high_water_timestamp) as readable_high_water_timestamptz,
95+
hlc_to_timestamp(high_water_timestamp) AS readable_high_water_timestamptz,
4596
error,
4697
replace(
4798
changefeed_details->>'sink_uri',
4899
'\u0026', '&'
49100
) AS sink_uri,
50-
ARRAY (
51-
SELECT
52-
concat(
53-
database_name, '.', schema_name, '.',
54-
name
55-
)
56-
FROM
57-
crdb_internal.tables
58-
WHERE
59-
table_id = ANY (SELECT key::INT FROM json_each(changefeed_details->'tables'))
60-
) AS full_table_names,
101+
targets.names AS full_table_names,
61102
changefeed_details->'opts'->>'topics' AS topics,
62-
COALESCE(changefeed_details->'opts'->>'format','json') AS format
103+
COALESCE(changefeed_details->'opts'->>'format','json') AS format,
104+
database.name AS database_name
63105
FROM
64106
crdb_internal.jobs
65-
INNER JOIN payload ON id = job_id`
107+
INNER JOIN targets ON job_id = targets.id
108+
INNER JOIN payload ON job_id = payload.id
109+
LEFT JOIN database ON job_id = database.id
110+
`
66111
)
67-
68112
var whereClause, innerWhereClause, orderbyClause string
69113
if n.Jobs == nil {
70114
// The query intends to present:
@@ -80,7 +124,7 @@ FROM
80124
innerWhereClause = fmt.Sprintf(` AND id in (%s)`, n.Jobs.String())
81125
}
82126

83-
selectClause := fmt.Sprintf(baseSelectClause, innerWhereClause)
127+
selectClause := fmt.Sprintf(baseSelectClause, innerWhereClause, jobspb.ChangefeedTargetSpecification_DATABASE, n.IncludeWatchedTables)
84128
sqlStmt := fmt.Sprintf("%s %s %s", selectClause, whereClause, orderbyClause)
85129

86130
return d.parse(sqlStmt)

pkg/sql/parser/sql.y

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1099,7 +1099,7 @@ func (u *sqlSymUnion) changefeedFilterOption() *tree.ChangefeedFilterOption {
10991099
%token <str> VIEWCLUSTERSETTING VIRTUAL VISIBLE INVISIBLE VISIBILITY VOLATILE VOTERS
11001100
%token <str> VIRTUAL_CLUSTER_NAME VIRTUAL_CLUSTER
11011101

1102-
%token <str> WHEN WHERE WINDOW WITH WITHIN WITHOUT WORK WRITE
1102+
%token <str> WATCHED_TABLES WHEN WHERE WINDOW WITH WITHIN WITHOUT WORK WRITE
11031103

11041104
%token <str> YEAR
11051105

@@ -9498,6 +9498,10 @@ show_jobs_stmt:
94989498
{
94999499
$$.val = &tree.ShowChangefeedJobs{}
95009500
}
9501+
| SHOW CHANGEFEED JOBS WITH WATCHED_TABLES
9502+
{
9503+
$$.val = &tree.ShowChangefeedJobs{IncludeWatchedTables: true}
9504+
}
95019505
| SHOW AUTOMATIC JOBS error // SHOW HELP: SHOW JOBS
95029506
| SHOW JOBS error // SHOW HELP: SHOW JOBS
95039507
| SHOW CHANGEFEED JOBS error // SHOW HELP: SHOW JOBS
@@ -9524,6 +9528,13 @@ show_jobs_stmt:
95249528
{
95259529
$$.val = &tree.ShowChangefeedJobs{Jobs: $4.slct()}
95269530
}
9531+
| SHOW CHANGEFEED JOBS select_stmt WITH WATCHED_TABLES
9532+
{
9533+
$$.val = &tree.ShowChangefeedJobs{
9534+
Jobs: $4.slct(),
9535+
IncludeWatchedTables: true,
9536+
}
9537+
}
95279538
| SHOW JOBS select_stmt error // SHOW HELP: SHOW JOBS
95289539
| SHOW JOB a_expr
95299540
{
@@ -9550,6 +9561,15 @@ show_jobs_stmt:
95509561
},
95519562
}
95529563
}
9564+
| SHOW CHANGEFEED JOB a_expr WITH WATCHED_TABLES
9565+
{
9566+
$$.val = &tree.ShowChangefeedJobs{
9567+
Jobs: &tree.Select{
9568+
Select: &tree.ValuesClause{Rows: []tree.Exprs{tree.Exprs{$4.expr()}}},
9569+
},
9570+
IncludeWatchedTables: true,
9571+
}
9572+
}
95539573
| SHOW JOB WHEN COMPLETE a_expr
95549574
{
95559575
$$.val = &tree.ShowJobs{
@@ -18939,6 +18959,7 @@ unreserved_keyword:
1893918959
| VISIBILITY
1894018960
| VOLATILE
1894118961
| VOTERS
18962+
| WATCHED_TABLES
1894218963
| WITHIN
1894318964
| WITHOUT
1894418965
| WRITE
@@ -19553,6 +19574,7 @@ bare_label_keywords:
1955319574
| VISIBILITY
1955419575
| VOLATILE
1955519576
| VOTERS
19577+
| WATCHED_TABLES
1955619578
| WHEN
1955719579
| WORK
1955819580
| WRITE

pkg/sql/parser/testdata/changefeed

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,4 +221,4 @@ CREATE CHANGEFEED FOR DATABASE d1 INCLUDE TABLES foo, bar INTO '*****' -- normal
221221
CREATE CHANGEFEED FOR DATABASE d1 INCLUDE TABLES foo, bar INTO ('*****') -- fully parenthesized
222222
CREATE CHANGEFEED FOR DATABASE d1 INCLUDE TABLES foo, bar INTO '_' -- literals removed
223223
CREATE CHANGEFEED FOR DATABASE _ INCLUDE TABLES _, _ INTO '*****' -- identifiers removed
224-
CREATE CHANGEFEED FOR DATABASE d1 INCLUDE TABLES foo, bar INTO 'foo' -- passwords exposed
224+
CREATE CHANGEFEED FOR DATABASE d1 INCLUDE TABLES foo, bar INTO 'foo' -- passwords exposed

pkg/sql/parser/testdata/show

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,22 @@ EXPLAIN SHOW CHANGEFEED JOBS VALUES ((1234)) -- fully parenthesized
721721
EXPLAIN SHOW CHANGEFEED JOBS VALUES (_) -- literals removed
722722
EXPLAIN SHOW CHANGEFEED JOBS VALUES (1234) -- identifiers removed
723723

724+
parse
725+
SHOW CHANGEFEED JOB 1234 WITH WATCHED_TABLES
726+
----
727+
SHOW CHANGEFEED JOBS VALUES (1234) WITH WATCHED_TABLES -- normalized!
728+
SHOW CHANGEFEED JOBS VALUES ((1234)) WITH WATCHED_TABLES -- fully parenthesized
729+
SHOW CHANGEFEED JOBS VALUES (_) WITH WATCHED_TABLES -- literals removed
730+
SHOW CHANGEFEED JOBS VALUES (1234) WITH WATCHED_TABLES -- identifiers removed
731+
732+
parse
733+
EXPLAIN SHOW CHANGEFEED JOB 1234 WITH WATCHED_TABLES
734+
----
735+
EXPLAIN SHOW CHANGEFEED JOBS VALUES (1234) WITH WATCHED_TABLES -- normalized!
736+
EXPLAIN SHOW CHANGEFEED JOBS VALUES ((1234)) WITH WATCHED_TABLES -- fully parenthesized
737+
EXPLAIN SHOW CHANGEFEED JOBS VALUES (_) WITH WATCHED_TABLES -- literals removed
738+
EXPLAIN SHOW CHANGEFEED JOBS VALUES (1234) WITH WATCHED_TABLES -- identifiers removed
739+
724740
parse
725741
SHOW CHANGEFEED JOBS
726742
----
@@ -737,6 +753,22 @@ EXPLAIN SHOW CHANGEFEED JOBS -- fully parenthesized
737753
EXPLAIN SHOW CHANGEFEED JOBS -- literals removed
738754
EXPLAIN SHOW CHANGEFEED JOBS -- identifiers removed
739755

756+
parse
757+
SHOW CHANGEFEED JOBS WITH WATCHED_TABLES
758+
----
759+
SHOW CHANGEFEED JOBS WITH WATCHED_TABLES
760+
SHOW CHANGEFEED JOBS WITH WATCHED_TABLES -- fully parenthesized
761+
SHOW CHANGEFEED JOBS WITH WATCHED_TABLES -- literals removed
762+
SHOW CHANGEFEED JOBS WITH WATCHED_TABLES -- identifiers removed
763+
764+
parse
765+
EXPLAIN SHOW CHANGEFEED JOBS WITH WATCHED_TABLES
766+
----
767+
EXPLAIN SHOW CHANGEFEED JOBS WITH WATCHED_TABLES
768+
EXPLAIN SHOW CHANGEFEED JOBS WITH WATCHED_TABLES -- fully parenthesized
769+
EXPLAIN SHOW CHANGEFEED JOBS WITH WATCHED_TABLES -- literals removed
770+
EXPLAIN SHOW CHANGEFEED JOBS WITH WATCHED_TABLES -- identifiers removed
771+
740772
parse
741773
SHOW CLUSTER STATEMENTS
742774
----

0 commit comments

Comments
 (0)