Skip to content

Commit b2e185d

Browse files
craig[bot]dt
andcommitted
Merge #148464
148464: jobs: use builtin to synthesize pending session job rows in vtable r=dt a=dt This adds a builtin to enumerate jobs pending creation in the current session. It then adds a UNION with a call to this builtin, rather than directly accessing the session in the Go code, to the body of the crdb_internal.jobs vtable, bringing the vtable closer to being just a pure query that can be replaced by a delegate or view. Additionally, deprecated columns in the vtable, which were always null, are removed. Release note: none. Epic: none.a Co-authored-by: David Taylor <[email protected]>
2 parents 988d66f + 02a02ec commit b2e185d

File tree

13 files changed

+157
-117
lines changed

13 files changed

+157
-117
lines changed

pkg/cli/zip_table_registry.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -382,25 +382,21 @@ var zipInternalTablesPerCluster = DebugZipTableRegistry{
382382
},
383383
// `statement` column can contain customer URI params such as
384384
// AWS_ACCESS_KEY_ID.
385-
// `error`, `execution_errors`, and `execution_events` columns contain
386-
// error text that may contain sensitive data.
385+
// `error` column contains error text that may contain sensitive data.
387386
"crdb_internal.jobs": {
388387
nonSensitiveCols: NonSensitiveColumns{
389388
"job_id",
390389
"job_type",
391390
"description",
392391
"user_name",
393-
"descriptor_ids",
394392
"status",
395393
"running_status",
396394
"created",
397-
"started",
398395
"finished",
399396
"modified",
400397
"fraction_completed",
401398
"high_water_timestamp",
402399
"coordinator_id",
403-
"trace_id",
404400
},
405401
},
406402
"crdb_internal.system_jobs": {

pkg/jobs/jobspb/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ go_library(
1515
deps = [
1616
"//pkg/base",
1717
"//pkg/roachpb",
18+
"//pkg/security/username",
1819
"//pkg/sql/catalog/catpb",
1920
"//pkg/sql/catalog/descpb",
2021
"//pkg/sql/protoreflect",

pkg/jobs/jobspb/jobs.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"slices"
1212

1313
"github.com/cockroachdb/cockroach/pkg/roachpb"
14+
"github.com/cockroachdb/cockroach/pkg/security/username"
1415
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
1516
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1617
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
@@ -23,6 +24,14 @@ type JobID = catpb.JobID
2324
// InvalidJobID is the zero value for JobID corresponding to no job.
2425
const InvalidJobID = catpb.InvalidJobID
2526

27+
// PendingJob represents a job that is pending creation (e.g. in a session).
28+
type PendingJob struct {
29+
JobID JobID
30+
Type Type
31+
Description string
32+
Username username.SQLUsername
33+
}
34+
2635
// ToText implements the ProtobinExecutionDetailFile interface.
2736
func (t *TraceData) ToText() []byte {
2837
rec := tracingpb.Recording(t.CollectedSpans)

pkg/server/admin.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2271,7 +2271,6 @@ SELECT
22712271
fraction_completed,
22722272
high_water_timestamp,
22732273
error,
2274-
execution_events::string,
22752274
coordinator_id
22762275
FROM crdb_internal.jobs
22772276
WHERE true`) // Simplifies filter construction below.
@@ -2368,7 +2367,6 @@ func scanRowIntoJob(scanner resultScanner, row tree.Datums, job *serverpb.JobRes
23682367
var fractionCompletedOrNil *float32
23692368
var highwaterOrNil *apd.Decimal
23702369
var runningStatusOrNil *string
2371-
var executionFailuresOrNil *string
23722370
var coordinatorOrNil *int64
23732371
if err := scanner.ScanAll(
23742372
row,
@@ -2385,7 +2383,6 @@ func scanRowIntoJob(scanner resultScanner, row tree.Datums, job *serverpb.JobRes
23852383
&fractionCompletedOrNil,
23862384
&highwaterOrNil,
23872385
&job.Error,
2388-
&executionFailuresOrNil,
23892386
&coordinatorOrNil,
23902387
); err != nil {
23912388
return errors.Wrap(err, "scan")
@@ -2438,7 +2435,7 @@ func jobHelper(
24382435
const query = `
24392436
SELECT job_id, job_type, description, statement, user_name, status,
24402437
running_status, created, finished, modified,
2441-
fraction_completed, high_water_timestamp, error, execution_events::string, coordinator_id
2438+
fraction_completed, high_water_timestamp, error, coordinator_id
24422439
FROM crdb_internal.jobs
24432440
WHERE job_id = $1`
24442441
row, cols, err := sqlServer.internalExecutor.QueryRowExWithCols(

pkg/sql/crdb_internal.go

Lines changed: 46 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -1152,20 +1152,15 @@ CREATE TABLE crdb_internal.jobs (
11521152
description STRING,
11531153
statement STRING,
11541154
user_name STRING,
1155-
descriptor_ids INT[],
11561155
status STRING,
11571156
running_status STRING,
11581157
created TIMESTAMPTZ,
1159-
started TIMESTAMPTZ,
11601158
finished TIMESTAMPTZ,
11611159
modified TIMESTAMPTZ,
11621160
fraction_completed FLOAT,
11631161
high_water_timestamp DECIMAL,
11641162
error STRING,
11651163
coordinator_id INT,
1166-
trace_id INT,
1167-
execution_errors STRING[],
1168-
execution_events JSONB,
11691164
INDEX(job_id),
11701165
INDEX(status),
11711166
INDEX(job_type)
@@ -1223,8 +1218,11 @@ j.error_msg,
12231218
j.claim_instance_id
12241219
FROM system.public.jobs AS j
12251220
LEFT OUTER JOIN system.public.job_progress AS p ON j.id = p.job_id
1226-
LEFT OUTER JOIN system.public.job_status AS s ON j.id = s.job_id
1227-
` + whereClause
1221+
LEFT OUTER JOIN system.public.job_status AS s ON j.id = s.job_id
1222+
` + whereClause + `UNION
1223+
(SELECT job_id, job_type, description, user_name, 'pending',
1224+
NULL, now(), NULL, now(), NULL, NULL, NULL, NULL
1225+
FROM crdb_internal.session_pending_jobs())`
12281226

12291227
it, err := p.InternalSQLTxn().QueryIteratorEx(
12301228
ctx, "system-jobs-join", p.txn, sessiondata.NodeUserSessionDataOverride, query, params...)
@@ -1237,108 +1235,60 @@ LEFT OUTER JOIN system.public.job_status AS s ON j.id = s.job_id
12371235
}
12381236
}()
12391237

1240-
sessionJobs := make([]*jobs.Record, 0, p.extendedEvalCtx.jobs.numToCreate())
1241-
uniqueJobs := make(map[*jobs.Record]struct{})
1242-
if err := p.extendedEvalCtx.jobs.forEachToCreate(func(job *jobs.Record) error {
1243-
if _, ok := uniqueJobs[job]; ok {
1244-
return nil
1245-
}
1246-
sessionJobs = append(sessionJobs, job)
1247-
uniqueJobs[job] = struct{}{}
1248-
return nil
1249-
}); err != nil {
1250-
return emitted, err
1251-
}
1252-
12531238
// Loop while we need to skip a row.
12541239
for {
12551240
ok, err := it.Next(ctx)
1256-
if err != nil {
1241+
if err != nil || !ok {
12571242
return emitted, err
12581243
}
1259-
// We will read the columns from the query on joined jobs tables into a wide
1260-
// row, and then copy the values from read rows into named variables to then
1261-
// use when emitting our output row. If we need to synthesize rows for jobs
1262-
// pending creation in the session, we'll do so in those same named vars to
1263-
// keep things organized.
1264-
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12
1265-
var id, typStr, desc, ownerStr, state, status, created, finished, modified, fraction, resolved, errorMsg, instanceID tree.Datum
1244+
r := it.Cur()
1245+
id, typStr, desc, ownerStr, state, status, created, finished, modified, fraction, resolved, errorMsg, instanceID :=
1246+
r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8], r[9], r[10], r[11], r[12]
12661247

1267-
if ok {
1268-
r := it.Cur()
1269-
id, typStr, desc, ownerStr, state, status, created, finished, modified, fraction, resolved, errorMsg, instanceID =
1270-
r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8], r[9], r[10], r[11], r[12]
1248+
owner := username.MakeSQLUsernameFromPreNormalizedString(string(tree.MustBeDString(ownerStr)))
1249+
jobID := jobspb.JobID(tree.MustBeDInt(id))
1250+
typ, err := jobspb.TypeFromString(string(tree.MustBeDString(typStr)))
1251+
if err != nil {
1252+
return emitted, err
1253+
}
12711254

1272-
owner := username.MakeSQLUsernameFromPreNormalizedString(string(tree.MustBeDString(ownerStr)))
1273-
jobID := jobspb.JobID(tree.MustBeDInt(id))
1274-
typ, err := jobspb.TypeFromString(string(tree.MustBeDString(typStr)))
1255+
getLegacyPayloadForAuth := func(ctx context.Context) (*jobspb.Payload, error) {
1256+
if !enablePerJobDetailedAuthLookups.Get(&p.EvalContext().Settings.SV) {
1257+
return nil, errLegacyPerJobAuthDisabledSentinel
1258+
}
1259+
if p.EvalContext().Settings.Version.IsActive(ctx, clusterversion.V25_1) {
1260+
log.Warningf(ctx, "extended job access control based on job-specific details is deprecated and can make SHOW JOBS less performant; consider disabling %s",
1261+
enablePerJobDetailedAuthLookups.Name())
1262+
p.BufferClientNotice(ctx,
1263+
pgnotice.Newf("extended job access control based on job-specific details has been deprecated and can make SHOW JOBS less performant; consider disabling %s",
1264+
enablePerJobDetailedAuthLookups.Name()))
1265+
}
1266+
payload := &jobspb.Payload{}
1267+
infoStorage := jobs.InfoStorageForJob(p.InternalSQLTxn(), jobID)
1268+
payloadBytes, exists, err := infoStorage.GetLegacyPayload(ctx, "getLegacyPayload-for-custom-auth")
12751269
if err != nil {
1276-
return emitted, err
1270+
return nil, err
12771271
}
1278-
1279-
getLegacyPayloadForAuth := func(ctx context.Context) (*jobspb.Payload, error) {
1280-
if !enablePerJobDetailedAuthLookups.Get(&p.EvalContext().Settings.SV) {
1281-
return nil, errLegacyPerJobAuthDisabledSentinel
1282-
}
1283-
if p.EvalContext().Settings.Version.IsActive(ctx, clusterversion.V25_1) {
1284-
log.Warningf(ctx, "extended job access control based on job-specific details is deprecated and can make SHOW JOBS less performant; consider disabling %s",
1285-
enablePerJobDetailedAuthLookups.Name())
1286-
p.BufferClientNotice(ctx,
1287-
pgnotice.Newf("extended job access control based on job-specific details has been deprecated and can make SHOW JOBS less performant; consider disabling %s",
1288-
enablePerJobDetailedAuthLookups.Name()))
1289-
}
1290-
payload := &jobspb.Payload{}
1291-
infoStorage := jobs.InfoStorageForJob(p.InternalSQLTxn(), jobID)
1292-
payloadBytes, exists, err := infoStorage.GetLegacyPayload(ctx, "getLegacyPayload-for-custom-auth")
1293-
if err != nil {
1294-
return nil, err
1295-
}
1296-
if !exists {
1297-
return nil, errors.New("job payload not found in system.job_info")
1298-
}
1299-
if err := protoutil.Unmarshal(payloadBytes, payload); err != nil {
1300-
return nil, err
1301-
}
1302-
return payload, nil
1272+
if !exists {
1273+
return nil, errors.New("job payload not found in system.job_info")
13031274
}
1304-
if errorMsg == tree.DNull {
1305-
errorMsg = emptyString
1275+
if err := protoutil.Unmarshal(payloadBytes, payload); err != nil {
1276+
return nil, err
13061277
}
1278+
return payload, nil
1279+
}
1280+
if errorMsg == tree.DNull {
1281+
errorMsg = emptyString
1282+
}
13071283

1308-
if err := jobsauth.AuthorizeAllowLegacyAuth(
1309-
ctx, p, jobID, getLegacyPayloadForAuth, owner, typ, jobsauth.ViewAccess, globalPrivileges,
1310-
); err != nil {
1311-
// Filter out jobs which the user is not allowed to see.
1312-
if IsInsufficientPrivilegeError(err) {
1313-
continue
1314-
}
1315-
return emitted, err
1316-
}
1317-
} else if !ok {
1318-
if len(sessionJobs) == 0 {
1319-
return emitted, nil
1284+
if err := jobsauth.AuthorizeAllowLegacyAuth(
1285+
ctx, p, jobID, getLegacyPayloadForAuth, owner, typ, jobsauth.ViewAccess, globalPrivileges,
1286+
); err != nil {
1287+
// Filter out jobs which the user is not allowed to see.
1288+
if IsInsufficientPrivilegeError(err) {
1289+
continue
13201290
}
1321-
job := sessionJobs[len(sessionJobs)-1]
1322-
sessionJobs = sessionJobs[:len(sessionJobs)-1]
1323-
payloadType, err := jobspb.DetailsType(jobspb.WrapPayloadDetails(job.Details))
1324-
if err != nil {
1325-
return emitted, err
1326-
}
1327-
// synthesize the fields we'd read from the jobs table if this job were in it.
1328-
id, typStr, desc, ownerStr, state, status, created, finished, modified, fraction, resolved, errorMsg, instanceID =
1329-
tree.NewDInt(tree.DInt(job.JobID)),
1330-
tree.NewDString(payloadType.String()),
1331-
tree.NewDString(job.Description),
1332-
tree.NewDString(job.Username.Normalized()),
1333-
tree.NewDString(string(jobs.StatePending)),
1334-
tree.DNull,
1335-
tree.MustMakeDTimestampTZ(p.txn.ReadTimestamp().GoTime(), time.Microsecond),
1336-
tree.DNull,
1337-
tree.MustMakeDTimestampTZ(p.txn.ReadTimestamp().GoTime(), time.Microsecond),
1338-
tree.NewDFloat(tree.DFloat(0)),
1339-
tree.DZeroDecimal,
1340-
tree.DNull,
1341-
tree.NewDInt(tree.DInt(p.extendedEvalCtx.ExecCfg.JobRegistry.ID()))
1291+
return emitted, err
13421292
}
13431293

13441294
if err = addRow(
@@ -1347,20 +1297,15 @@ LEFT OUTER JOIN system.public.job_status AS s ON j.id = s.job_id
13471297
desc,
13481298
desc,
13491299
ownerStr,
1350-
tree.DNull, // deperecated "descriptor_ids"
13511300
state,
13521301
status,
13531302
created,
1354-
created, // deprecated "started" field.
13551303
finished,
13561304
modified,
13571305
fraction,
13581306
resolved,
13591307
errorMsg,
13601308
instanceID,
1361-
tree.DNull, // deprecated "trace_id" field.
1362-
tree.DNull, // deprecated "executionErrors" field.
1363-
tree.DNull, // deprecated "executionEvents" field.
13641309
); err != nil {
13651310
return emitted, err
13661311
}

pkg/sql/delegate/show_changefeed_jobs.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ SELECT
3737
status,
3838
running_status,
3939
created,
40-
started,
40+
created as started,
4141
finished,
4242
modified,
4343
high_water_timestamp,

pkg/sql/delegate/show_jobs.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,10 @@ func constructSelectQuery(n *tree.ShowJobs) string {
2828
baseQuery.WriteString(`description, statement, `)
2929
}
3030
baseQuery.WriteString(`user_name, status, running_status, `)
31-
baseQuery.WriteString(`date_trunc('second', created) as created, date_trunc('second', started) as started, `)
31+
baseQuery.WriteString(`date_trunc('second', created) as created, date_trunc('second', created) as started, `)
3232
baseQuery.WriteString(`date_trunc('second', finished) as finished, date_trunc('second', modified) as modified, `)
3333
baseQuery.WriteString(`fraction_completed, error, coordinator_id`)
3434

35-
if n.Jobs != nil {
36-
baseQuery.WriteString(`, trace_id, execution_errors`)
37-
}
38-
3935
// Check if there are any SHOW JOBS options that we need to add columns for.
4036
if n.Options != nil {
4137
if n.Options.ExecutionDetails {

pkg/sql/faketreeeval/evalctx.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,13 @@ func (ep *DummySessionAccessor) HasViewActivityOrViewActivityRedactedRole(
647647
return false, false, errors.WithStack(errEvalSessionVar)
648648
}
649649

650+
func (ep *DummySessionAccessor) ForEachSessionPendingJob(
651+
_ func(job jobspb.PendingJob) error,
652+
) error {
653+
// This is a no-op in the dummy implementation.
654+
return nil
655+
}
656+
650657
// DummyClientNoticeSender implements the eval.ClientNoticeSender interface.
651658
type DummyClientNoticeSender struct{}
652659

pkg/sql/jobs_collection.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,21 @@ func (j *txnJobsCollection) forEachToCreate(fn func(jobRecord *jobs.Record) erro
8282
}
8383
return nil
8484
}
85+
86+
func (p *planner) ForEachSessionPendingJob(fn func(job jobspb.PendingJob) error) error {
87+
if p.extendedEvalCtx.jobs == nil {
88+
return nil
89+
}
90+
return p.extendedEvalCtx.jobs.forEachToCreate(func(r *jobs.Record) error {
91+
payloadType, err := jobspb.DetailsType(jobspb.WrapPayloadDetails(r.Details))
92+
if err != nil {
93+
return err
94+
}
95+
return fn(jobspb.PendingJob{
96+
JobID: r.JobID,
97+
Description: r.Description,
98+
Username: r.Username,
99+
Type: payloadType,
100+
})
101+
})
102+
}

pkg/sql/opt/exec/execbuilder/testdata/explain

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ distribution: local
121121
vectorized: true
122122
·
123123
• sort
124-
│ order: -column26,-started
124+
│ order: -column20,-created
125125
126126
└── • render
127127

0 commit comments

Comments
 (0)