Skip to content

Commit 02a02ec

Browse files
committed
sql: use session_pending_jobs builtin in jobs vtable
This brings the crdb_internal.jobs vtable closer to being a pure query that could be replaced by a delegate or view, with only the auth check logic now still requiring a Go impl. Release note: none. Epic: none.
1 parent ace866f commit 02a02ec

File tree

1 file changed

+46
-91
lines changed

1 file changed

+46
-91
lines changed

pkg/sql/crdb_internal.go

Lines changed: 46 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -1218,8 +1218,11 @@ j.error_msg,
12181218
j.claim_instance_id
12191219
FROM system.public.jobs AS j
12201220
LEFT OUTER JOIN system.public.job_progress AS p ON j.id = p.job_id
1221-
LEFT OUTER JOIN system.public.job_status AS s ON j.id = s.job_id
1222-
` + whereClause
1221+
LEFT OUTER JOIN system.public.job_status AS s ON j.id = s.job_id
1222+
` + whereClause + `UNION
1223+
(SELECT job_id, job_type, description, user_name, 'pending',
1224+
NULL, now(), NULL, now(), NULL, NULL, NULL, NULL
1225+
FROM crdb_internal.session_pending_jobs())`
12231226

12241227
it, err := p.InternalSQLTxn().QueryIteratorEx(
12251228
ctx, "system-jobs-join", p.txn, sessiondata.NodeUserSessionDataOverride, query, params...)
@@ -1232,108 +1235,60 @@ LEFT OUTER JOIN system.public.job_status AS s ON j.id = s.job_id
12321235
}
12331236
}()
12341237

1235-
sessionJobs := make([]*jobs.Record, 0, p.extendedEvalCtx.jobs.numToCreate())
1236-
uniqueJobs := make(map[*jobs.Record]struct{})
1237-
if err := p.extendedEvalCtx.jobs.forEachToCreate(func(job *jobs.Record) error {
1238-
if _, ok := uniqueJobs[job]; ok {
1239-
return nil
1240-
}
1241-
sessionJobs = append(sessionJobs, job)
1242-
uniqueJobs[job] = struct{}{}
1243-
return nil
1244-
}); err != nil {
1245-
return emitted, err
1246-
}
1247-
12481238
// Loop while we need to skip a row.
12491239
for {
12501240
ok, err := it.Next(ctx)
1251-
if err != nil {
1241+
if err != nil || !ok {
12521242
return emitted, err
12531243
}
1254-
// We will read the columns from the query on joined jobs tables into a wide
1255-
// row, and then copy the values from read rows into named variables to then
1256-
// use when emitting our output row. If we need to synthesize rows for jobs
1257-
// pending creation in the session, we'll do so in those same named vars to
1258-
// keep things organized.
1259-
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12
1260-
var id, typStr, desc, ownerStr, state, status, created, finished, modified, fraction, resolved, errorMsg, instanceID tree.Datum
1244+
r := it.Cur()
1245+
id, typStr, desc, ownerStr, state, status, created, finished, modified, fraction, resolved, errorMsg, instanceID :=
1246+
r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8], r[9], r[10], r[11], r[12]
12611247

1262-
if ok {
1263-
r := it.Cur()
1264-
id, typStr, desc, ownerStr, state, status, created, finished, modified, fraction, resolved, errorMsg, instanceID =
1265-
r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8], r[9], r[10], r[11], r[12]
1248+
owner := username.MakeSQLUsernameFromPreNormalizedString(string(tree.MustBeDString(ownerStr)))
1249+
jobID := jobspb.JobID(tree.MustBeDInt(id))
1250+
typ, err := jobspb.TypeFromString(string(tree.MustBeDString(typStr)))
1251+
if err != nil {
1252+
return emitted, err
1253+
}
12661254

1267-
owner := username.MakeSQLUsernameFromPreNormalizedString(string(tree.MustBeDString(ownerStr)))
1268-
jobID := jobspb.JobID(tree.MustBeDInt(id))
1269-
typ, err := jobspb.TypeFromString(string(tree.MustBeDString(typStr)))
1255+
getLegacyPayloadForAuth := func(ctx context.Context) (*jobspb.Payload, error) {
1256+
if !enablePerJobDetailedAuthLookups.Get(&p.EvalContext().Settings.SV) {
1257+
return nil, errLegacyPerJobAuthDisabledSentinel
1258+
}
1259+
if p.EvalContext().Settings.Version.IsActive(ctx, clusterversion.V25_1) {
1260+
log.Warningf(ctx, "extended job access control based on job-specific details is deprecated and can make SHOW JOBS less performant; consider disabling %s",
1261+
enablePerJobDetailedAuthLookups.Name())
1262+
p.BufferClientNotice(ctx,
1263+
pgnotice.Newf("extended job access control based on job-specific details has been deprecated and can make SHOW JOBS less performant; consider disabling %s",
1264+
enablePerJobDetailedAuthLookups.Name()))
1265+
}
1266+
payload := &jobspb.Payload{}
1267+
infoStorage := jobs.InfoStorageForJob(p.InternalSQLTxn(), jobID)
1268+
payloadBytes, exists, err := infoStorage.GetLegacyPayload(ctx, "getLegacyPayload-for-custom-auth")
12701269
if err != nil {
1271-
return emitted, err
1270+
return nil, err
12721271
}
1273-
1274-
getLegacyPayloadForAuth := func(ctx context.Context) (*jobspb.Payload, error) {
1275-
if !enablePerJobDetailedAuthLookups.Get(&p.EvalContext().Settings.SV) {
1276-
return nil, errLegacyPerJobAuthDisabledSentinel
1277-
}
1278-
if p.EvalContext().Settings.Version.IsActive(ctx, clusterversion.V25_1) {
1279-
log.Warningf(ctx, "extended job access control based on job-specific details is deprecated and can make SHOW JOBS less performant; consider disabling %s",
1280-
enablePerJobDetailedAuthLookups.Name())
1281-
p.BufferClientNotice(ctx,
1282-
pgnotice.Newf("extended job access control based on job-specific details has been deprecated and can make SHOW JOBS less performant; consider disabling %s",
1283-
enablePerJobDetailedAuthLookups.Name()))
1284-
}
1285-
payload := &jobspb.Payload{}
1286-
infoStorage := jobs.InfoStorageForJob(p.InternalSQLTxn(), jobID)
1287-
payloadBytes, exists, err := infoStorage.GetLegacyPayload(ctx, "getLegacyPayload-for-custom-auth")
1288-
if err != nil {
1289-
return nil, err
1290-
}
1291-
if !exists {
1292-
return nil, errors.New("job payload not found in system.job_info")
1293-
}
1294-
if err := protoutil.Unmarshal(payloadBytes, payload); err != nil {
1295-
return nil, err
1296-
}
1297-
return payload, nil
1272+
if !exists {
1273+
return nil, errors.New("job payload not found in system.job_info")
12981274
}
1299-
if errorMsg == tree.DNull {
1300-
errorMsg = emptyString
1275+
if err := protoutil.Unmarshal(payloadBytes, payload); err != nil {
1276+
return nil, err
13011277
}
1278+
return payload, nil
1279+
}
1280+
if errorMsg == tree.DNull {
1281+
errorMsg = emptyString
1282+
}
13021283

1303-
if err := jobsauth.AuthorizeAllowLegacyAuth(
1304-
ctx, p, jobID, getLegacyPayloadForAuth, owner, typ, jobsauth.ViewAccess, globalPrivileges,
1305-
); err != nil {
1306-
// Filter out jobs which the user is not allowed to see.
1307-
if IsInsufficientPrivilegeError(err) {
1308-
continue
1309-
}
1310-
return emitted, err
1311-
}
1312-
} else if !ok {
1313-
if len(sessionJobs) == 0 {
1314-
return emitted, nil
1284+
if err := jobsauth.AuthorizeAllowLegacyAuth(
1285+
ctx, p, jobID, getLegacyPayloadForAuth, owner, typ, jobsauth.ViewAccess, globalPrivileges,
1286+
); err != nil {
1287+
// Filter out jobs which the user is not allowed to see.
1288+
if IsInsufficientPrivilegeError(err) {
1289+
continue
13151290
}
1316-
job := sessionJobs[len(sessionJobs)-1]
1317-
sessionJobs = sessionJobs[:len(sessionJobs)-1]
1318-
payloadType, err := jobspb.DetailsType(jobspb.WrapPayloadDetails(job.Details))
1319-
if err != nil {
1320-
return emitted, err
1321-
}
1322-
// synthesize the fields we'd read from the jobs table if this job were in it.
1323-
id, typStr, desc, ownerStr, state, status, created, finished, modified, fraction, resolved, errorMsg, instanceID =
1324-
tree.NewDInt(tree.DInt(job.JobID)),
1325-
tree.NewDString(payloadType.String()),
1326-
tree.NewDString(job.Description),
1327-
tree.NewDString(job.Username.Normalized()),
1328-
tree.NewDString(string(jobs.StatePending)),
1329-
tree.DNull,
1330-
tree.MustMakeDTimestampTZ(p.txn.ReadTimestamp().GoTime(), time.Microsecond),
1331-
tree.DNull,
1332-
tree.MustMakeDTimestampTZ(p.txn.ReadTimestamp().GoTime(), time.Microsecond),
1333-
tree.NewDFloat(tree.DFloat(0)),
1334-
tree.DZeroDecimal,
1335-
tree.DNull,
1336-
tree.NewDInt(tree.DInt(p.extendedEvalCtx.ExecCfg.JobRegistry.ID()))
1291+
return emitted, err
13371292
}
13381293

13391294
if err = addRow(

0 commit comments

Comments
 (0)