Skip to content

Commit b29e5c5

Browse files
committed
crdb_internal: replace crdb_internal.jobs vtable with view
Release note: none. Epic: https://cockroachlabs.atlassian.net/browse/CRDB-48791.
1 parent cff945f commit b29e5c5

File tree

8 files changed

+183
-162
lines changed

8 files changed

+183
-162
lines changed

pkg/bench/rttanalysis/testdata/benchmark_expectations

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ exp,benchmark
6868
3,Jobs/jobs_page_type_filtered
6969
1-3,Jobs/jobs_page_type_filtered_no_matches
7070
4,Jobs/non_admin_crdb_internal.system_jobs
71-
4,Jobs/non_admin_show_jobs
71+
2,Jobs/non_admin_show_jobs
7272
12-15,Jobs/pause_job
7373
12-15,Jobs/resume_job
7474
3,Jobs/show_job

pkg/server/application_api/jobs_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,8 +355,8 @@ func TestAdminAPIJobsDetails(t *testing.T) {
355355
t.Fatal(err)
356356
}
357357
sqlDB.Exec(t,
358-
`INSERT INTO system.jobs (id, status, num_runs, last_run, job_type) VALUES ($1, $2, $3, $4, $5)`,
359-
job.id, job.status, job.numRuns, job.lastRun, payload.Type().String(),
358+
`INSERT INTO system.jobs (id, status, num_runs, last_run, job_type, owner) VALUES ($1, $2, $3, $4, $5, $6)`,
359+
job.id, job.status, job.numRuns, job.lastRun, payload.Type().String(), job.username.Normalized(),
360360
)
361361
sqlDB.Exec(t,
362362
`INSERT INTO system.job_info (job_id, info_key, value) VALUES ($1, $2, $3)`,

pkg/sql/crdb_internal.go

Lines changed: 75 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ var crdbInternal = virtualSchema{
162162
catconstants.CrdbInternalIndexSpansTableID: crdbInternalIndexSpansTable,
163163
catconstants.CrdbInternalIndexUsageStatisticsTableID: crdbInternalIndexUsageStatistics,
164164
catconstants.CrdbInternalInflightTraceSpanTableID: crdbInternalInflightTraceSpanTable,
165-
catconstants.CrdbInternalJobsTableID: crdbInternalJobsTable,
165+
catconstants.CrdbInternalJobsTableID: crdbInternalJobsView,
166166
catconstants.CrdbInternalSystemJobsTableID: crdbInternalSystemJobsTable,
167167
catconstants.CrdbInternalKVNodeStatusTableID: crdbInternalKVNodeStatusTable,
168168
catconstants.CrdbInternalKVStoreStatusTableID: crdbInternalKVStoreStatusTable,
@@ -1134,142 +1134,82 @@ func wrapPayloadUnMarshalError(err error, jobID tree.Datum) error {
11341134
" consider deleting this job from system.jobs", jobID)
11351135
}
11361136

1137-
const (
1138-
jobIDFilter = ` WHERE j.id = $1`
1139-
jobsStatusFilter = ` WHERE j.status = $1`
1140-
jobsTypeFilter = ` WHERE j.job_type = $1`
1141-
)
1142-
1143-
// TODO(tbg): prefix with kv_.
1144-
var crdbInternalJobsTable = virtualSchemaTable{
1145-
schema: `
1146-
CREATE TABLE crdb_internal.jobs (
1147-
job_id INT,
1148-
job_type STRING,
1149-
description STRING,
1150-
statement STRING,
1151-
user_name STRING,
1152-
status STRING,
1153-
running_status STRING,
1154-
created TIMESTAMPTZ,
1155-
finished TIMESTAMPTZ,
1156-
modified TIMESTAMPTZ,
1157-
fraction_completed FLOAT,
1158-
high_water_timestamp DECIMAL,
1159-
error STRING,
1160-
coordinator_id INT,
1161-
INDEX(job_id),
1162-
INDEX(status),
1163-
INDEX(job_type)
1137+
var crdbInternalJobsView = virtualSchemaView{
1138+
// TODO(dt): the left-outer joins here in theory mean that if there are more
1139+
// than one row per job in status or progress the job row would need to be
1140+
// repeated for each. While this is never the case in practice, it does mean
1141+
// the optimizer cannot elide the joins even if the progress or status is not
1142+
// actually being read from a query on the view. We could change the joins to
1143+
// be `LATERAL (... WHERE job_id = id LiMIT 1) ON true` which would promise to
1144+
// the optimizer that the join cannot change the number of rows, allowing it
1145+
// to elide it entirely if the joined columns are not used, however SHOW JOBS
1146+
// and other callers typically _are_ looking for progress and status so we
1147+
// likely will need the joins anyway, queries experts expressed some concern
1148+
// that it could be possible "lateral join prevents some other optimizations".
1149+
// Given we typically expect to need the joined columns anyway, these are left
1150+
// as left joins for now. NB: the `union`, needs to be `union all` to avoid
1151+
// materializing all of the columns even when they're not used, since a
1152+
// non-`all` union has to be prepared to compare them all to eliminate
1153+
// duplicates, even when in the vast majority of calls we expect the session
1154+
// jobs generator to be empty.
1155+
schema: `CREATE VIEW crdb_internal.jobs (
1156+
job_id,
1157+
job_type,
1158+
description,
1159+
statement,
1160+
user_name,
1161+
status,
1162+
running_status,
1163+
created,
1164+
finished,
1165+
modified,
1166+
fraction_completed,
1167+
high_water_timestamp,
1168+
error,
1169+
coordinator_id
1170+
) AS (
1171+
SELECT
1172+
j.id,
1173+
j.job_type,
1174+
coalesce(j.description, '') as description,
1175+
coalesce(j.description, '') as statement,
1176+
coalesce(j.owner, '') as user_name,
1177+
j.status as status,
1178+
s.status as running_status,
1179+
j.created::timestamptz,
1180+
j.finished,
1181+
greatest(j.created, j.finished, p.written, s.written)::timestamptz AS modified,
1182+
p.fraction as fraction_completed,
1183+
p.resolved as high_water_timestamp,
1184+
coalesce(j.error_msg, '') as error,
1185+
j.claim_instance_id as coordinator_id
1186+
FROM system.public.jobs AS j
1187+
LEFT OUTER JOIN system.public.job_progress AS p ON j.id = p.job_id
1188+
LEFT OUTER JOIN system.public.job_status AS s ON j.id = s.job_id
1189+
WHERE crdb_internal.can_view_job(j.owner)
1190+
UNION ALL
1191+
(SELECT job_id, job_type, description, description, user_name, 'pending',
1192+
NULL, now(), NULL, now(), NULL, NULL, NULL, NULL
1193+
FROM crdb_internal.session_pending_jobs()
1194+
)
11641195
)`,
1165-
comment: `decoded job metadata from crdb_internal.system_jobs (KV scan)`,
1166-
indexes: []virtualIndex{{
1167-
populate: func(ctx context.Context, unwrappedConstraint tree.Datum, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) {
1168-
targetID := tree.MustBeDInt(unwrappedConstraint)
1169-
return makeJobsTableRows(ctx, p, addRow, jobIDFilter, targetID)
1170-
},
1171-
}, {
1172-
populate: func(ctx context.Context, unwrappedConstraint tree.Datum, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) {
1173-
targetStatus := tree.MustBeDString(unwrappedConstraint)
1174-
return makeJobsTableRows(ctx, p, addRow, jobsStatusFilter, targetStatus)
1175-
},
1176-
}, {
1177-
populate: func(ctx context.Context, unwrappedConstraint tree.Datum, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) {
1178-
targetType := tree.MustBeDString(unwrappedConstraint)
1179-
return makeJobsTableRows(ctx, p, addRow, jobsTypeFilter, targetType)
1180-
},
1181-
}},
1182-
populate: func(ctx context.Context, p *planner, db catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error {
1183-
_, err := makeJobsTableRows(ctx, p, addRow, "")
1184-
return err
1196+
resultColumns: colinfo.ResultColumns{
1197+
{Name: "job_id", Typ: types.Int},
1198+
{Name: "job_type", Typ: types.String},
1199+
{Name: "description", Typ: types.String},
1200+
{Name: "statement", Typ: types.String},
1201+
{Name: "user_name", Typ: types.String},
1202+
{Name: "status", Typ: types.String},
1203+
{Name: "running_status", Typ: types.String},
1204+
{Name: "created", Typ: types.TimestampTZ},
1205+
{Name: "finished", Typ: types.TimestampTZ},
1206+
{Name: "modified", Typ: types.TimestampTZ},
1207+
{Name: "fraction_completed", Typ: types.Float},
1208+
{Name: "high_water_timestamp", Typ: types.Decimal},
1209+
{Name: "error", Typ: types.String},
1210+
{Name: "coordinator_id", Typ: types.Int},
11851211
},
1186-
}
1187-
1188-
func makeJobsTableRows(
1189-
ctx context.Context,
1190-
p *planner,
1191-
addRow func(...tree.Datum) error,
1192-
whereClause string,
1193-
params ...interface{},
1194-
) (emitted bool, retErr error) {
1195-
globalPrivileges, err := jobsauth.GetGlobalJobPrivileges(ctx, p)
1196-
if err != nil {
1197-
return false, err
1198-
}
1199-
1200-
query := `SELECT
1201-
j.id, j.job_type, coalesce(j.description, ''), coalesce(j.owner, ''), j.status as state,
1202-
s.status, j.created::timestamptz, j.finished, greatest(j.created, j.finished, p.written, s.written)::timestamptz AS last_modified,
1203-
p.fraction,
1204-
p.resolved,
1205-
j.error_msg,
1206-
j.claim_instance_id
1207-
FROM system.public.jobs AS j
1208-
LEFT OUTER JOIN system.public.job_progress AS p ON j.id = p.job_id
1209-
LEFT OUTER JOIN system.public.job_status AS s ON j.id = s.job_id
1210-
` + whereClause + `UNION
1211-
(SELECT job_id, job_type, description, user_name, 'pending',
1212-
NULL, now(), NULL, now(), NULL, NULL, NULL, NULL
1213-
FROM crdb_internal.session_pending_jobs())`
1214-
1215-
it, err := p.InternalSQLTxn().QueryIteratorEx(
1216-
ctx, "system-jobs-join", p.txn, sessiondata.NodeUserSessionDataOverride, query, params...)
1217-
if err != nil {
1218-
return emitted, err
1219-
}
1220-
defer func() {
1221-
if err := it.Close(); err != nil {
1222-
retErr = errors.CombineErrors(retErr, err)
1223-
}
1224-
}()
1225-
1226-
// Loop while we need to skip a row.
1227-
for {
1228-
ok, err := it.Next(ctx)
1229-
if err != nil || !ok {
1230-
return emitted, err
1231-
}
1232-
r := it.Cur()
1233-
id, typStr, desc, ownerStr, state, status, created, finished, modified, fraction, resolved, errorMsg, instanceID :=
1234-
r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8], r[9], r[10], r[11], r[12]
1235-
1236-
owner := username.MakeSQLUsernameFromPreNormalizedString(string(tree.MustBeDString(ownerStr)))
1237-
jobID := jobspb.JobID(tree.MustBeDInt(id))
1238-
1239-
if errorMsg == tree.DNull {
1240-
errorMsg = emptyString
1241-
}
1242-
1243-
if err := jobsauth.Authorize(
1244-
ctx, p, jobID, owner, jobsauth.ViewAccess, globalPrivileges,
1245-
); err != nil {
1246-
// Filter out jobs which the user is not allowed to see.
1247-
if IsInsufficientPrivilegeError(err) {
1248-
continue
1249-
}
1250-
return emitted, err
1251-
}
1252-
1253-
if err = addRow(
1254-
id,
1255-
typStr,
1256-
desc,
1257-
desc,
1258-
ownerStr,
1259-
state,
1260-
status,
1261-
created,
1262-
finished,
1263-
modified,
1264-
fraction,
1265-
resolved,
1266-
errorMsg,
1267-
instanceID,
1268-
); err != nil {
1269-
return emitted, err
1270-
}
1271-
emitted = true
1272-
}
1212+
comment: `decoded job metadata from various jobs tables`,
12731213
}
12741214

12751215
const crdbInternalKVProtectedTSTableQuery = `

pkg/sql/crdb_internal_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1453,13 +1453,13 @@ func TestInternalSystemJobsAccess(t *testing.T) {
14531453
// worker goroutine returns "query canceled error".
14541454
//
14551455
// In particular, the following setup is used:
1456-
// - issue SHOW JOBS query which internally issues a query against
1457-
// crdb_internal.system_jobs virtual table
1458-
// - that virtual table is generated by issuing "system-jobs-scan" internal
1456+
// - issue SHOW ZONE CONFIGURATIONS query which internally issues a query against
1457+
// crdb_internal.zones virtual table
1458+
// - that virtual table is generated by issuing "crdb-internal-zones-table" internal
14591459
// query
1460-
// - during that "system-jobs-scan" query we're injecting the query canceled
1460+
// - during that "crdb-internal-zones-table" query we're injecting the query canceled
14611461
// error (in other words, the error is injected during the generation of
1462-
// crdb_internal.system_jobs virtual table).
1462+
// crdb_internal.zones virtual table).
14631463
//
14641464
// The injection is achieved by adding a callback to DistSQLReceiver.Push which
14651465
// replaces the first piece of metadata it sees with the error.
@@ -1481,7 +1481,7 @@ func TestVirtualTableDoesntHangOnQueryCanceledError(t *testing.T) {
14811481
return nil
14821482
}
14831483
opName, ok := sql.GetInternalOpName(ctx)
1484-
if !ok || !(opName == "system-jobs-scan" || opName == "system-jobs-join") {
1484+
if !ok || !(opName == "crdb-internal-zones-table") {
14851485
return nil
14861486
}
14871487
numCallbacksAdded.Add(1)
@@ -1503,7 +1503,7 @@ func TestVirtualTableDoesntHangOnQueryCanceledError(t *testing.T) {
15031503
sqlDB := sqlutils.MakeSQLRunner(db)
15041504

15051505
addCallback.Store(true)
1506-
sqlDB.ExpectErr(t, err.Error(), "SHOW JOBS")
1506+
sqlDB.ExpectErr(t, err.Error(), "SHOW ZONE CONFIGURATIONS")
15071507
addCallback.Store(false)
15081508

15091509
// Sanity check that the callback was added at least once.

pkg/sql/logictest/testdata/logic_test/select

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -831,7 +831,7 @@ statement ok
831831
SELECT * FROM pg_class
832832

833833
statement ok
834-
SELECT * FROM crdb_internal.jobs
834+
SELECT * FROM crdb_internal.node_build_info
835835

836836
# Tests for 'large_full_scan_rows'.
837837
statement ok

pkg/sql/opt/exec/execbuilder/testdata/explain

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,13 +121,45 @@ distribution: local
121121
vectorized: true
122122
·
123123
• sort
124-
│ order: -column20,-created
124+
│ order: -column70,-created
125125
126126
└── • render
127127
128+
└── • union all
128129
129-
└── • virtual table
130-
table: jobs@primary
130+
├── • render
131+
│ │
132+
│ └── • merge join (left outer)
133+
│ │ equality: (id) = (job_id)
134+
│ │
135+
│ ├── • merge join (right outer)
136+
│ │ │ equality: (job_id) = (id)
137+
│ │ │ right cols are key
138+
│ │ │
139+
│ │ ├── • scan
140+
│ │ │ missing stats
141+
│ │ │ table: job_progress@primary
142+
│ │ │ spans: FULL SCAN
143+
│ │ │
144+
│ │ │
145+
│ │ └── • scan
146+
│ │ missing stats
147+
│ │ table: jobs@primary
148+
│ │ spans: FULL SCAN
149+
│ │
150+
│ └── • scan
151+
│ missing stats
152+
│ table: job_status@primary
153+
│ spans: FULL SCAN
154+
155+
└── • render
156+
157+
│ estimated row count: 3
158+
159+
└── • project set
160+
│ estimated row count: 10
161+
162+
└── • emptyrow
131163

132164
statement ok
133165
CREATE INDEX a ON foo(x)

pkg/sql/opt/exec/execbuilder/testdata/explain_analyze

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ quality of service: regular
176176
│ │ └── • virtual table
177177
│ │ sql nodes: <hidden>
178178
│ │ regions: <hidden>
179-
│ │ actual row count: 366
179+
│ │ actual row count: 363
180180
│ │ execution time: 0µs
181181
│ │ table: pg_class@primary
182182
│ │

0 commit comments

Comments
 (0)