@@ -80,7 +80,6 @@ import (
80
80
"github.com/cockroachdb/cockroach/pkg/sql/sem/idxtype"
81
81
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
82
82
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
83
- "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
84
83
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
85
84
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil"
86
85
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/sslocal"
@@ -958,14 +957,6 @@ CREATE TABLE crdb_internal.leases (
958
957
},
959
958
}
960
959
961
- func tsOrNull (micros int64 ) (tree.Datum , error ) {
962
- if micros == 0 {
963
- return tree .DNull , nil
964
- }
965
- ts := timeutil .Unix (0 , micros * time .Microsecond .Nanoseconds ())
966
- return tree .MakeDTimestampTZ (ts , time .Microsecond )
967
- }
968
-
969
960
const (
970
961
// systemJobsAndJobInfoBaseQuery consults both the `system.jobs` and
971
962
// `system.job_info` tables to return relevant information about a job.
@@ -1147,11 +1138,6 @@ func wrapPayloadUnMarshalError(err error, jobID tree.Datum) error {
1147
1138
}
1148
1139
1149
1140
const (
1150
- jobsQuery = `SELECT id, status, created::timestamptz, payload, progress, claim_session_id, claim_instance_id FROM crdb_internal.system_jobs j`
1151
- // Note that we are querying crdb_internal.system_jobs instead of system.jobs directly.
1152
- // The former has access control built in and will filter out jobs that the
1153
- // user is not allowed to see.
1154
- jobsQFrom = ` `
1155
1141
jobIDFilter = ` WHERE j.id = $1`
1156
1142
jobsStatusFilter = ` WHERE j.status = $1`
1157
1143
jobsTypeFilter = ` WHERE j.job_type = $1`
@@ -1207,223 +1193,6 @@ CREATE TABLE crdb_internal.jobs (
1207
1193
},
1208
1194
}
1209
1195
1210
- var useOldJobsVTable = settings .RegisterBoolSetting (
1211
- settings .ApplicationLevel ,
1212
- "sql.jobs.legacy_vtable.enabled" ,
1213
- "cause the crdb_internal.jobs vtable to be produced from the legacy payload info records" ,
1214
- false , // TODO(dt): flip this once we add permissive auth checks.
1215
- )
1216
-
1217
- // makeJobsTableRows calls addRow for each job. It returns true if addRow was called
1218
- // successfully at least once.
1219
- func makeJobsTableRows (
1220
- ctx context.Context ,
1221
- p * planner ,
1222
- addRow func (... tree.Datum ) error ,
1223
- queryFilterSuffix string ,
1224
- params ... interface {},
1225
- ) (matched bool , err error ) {
1226
-
1227
- v , err := p .InternalSQLTxn ().GetSystemSchemaVersion (ctx )
1228
- if err != nil {
1229
- return false , err
1230
- }
1231
- if ! v .AtLeast (clusterversion .V25_1 .Version ()) || useOldJobsVTable .Get (& p .EvalContext ().Settings .SV ) {
1232
- query := jobsQuery + queryFilterSuffix
1233
- return makeLegacyJobsTableRows (ctx , p , addRow , query , params ... )
1234
- }
1235
- return makeJobBasedJobsTableRows (ctx , p , addRow , queryFilterSuffix , params ... )
1236
- }
1237
-
1238
- func makeLegacyJobsTableRows (
1239
- ctx context.Context ,
1240
- p * planner ,
1241
- addRow func (... tree.Datum ) error ,
1242
- query string ,
1243
- params ... interface {},
1244
- ) (matched bool , err error ) {
1245
- // We use QueryIteratorEx here and specify the current user
1246
- // instead of using InternalExecutor.QueryIterator because
1247
- // the latter is being deprecated for sometimes executing
1248
- // the query as the root user.
1249
- it , err := p .InternalSQLTxn ().QueryIteratorEx (
1250
- ctx , "crdb-internal-jobs-table" , p .txn ,
1251
- sessiondata.InternalExecutorOverride {User : p .User ()},
1252
- query , params ... )
1253
- if err != nil {
1254
- return matched , err
1255
- }
1256
-
1257
- cleanup := func (ctx context.Context ) {
1258
- if err := it .Close (); err != nil {
1259
- // TODO(yuzefovich): this error should be propagated further up
1260
- // and not simply being logged. Fix it (#61123).
1261
- //
1262
- // Doing that as a return parameter would require changes to
1263
- // `planNode.Close` signature which is a bit annoying. One other
1264
- // possible solution is to panic here and catch the error
1265
- // somewhere.
1266
- log .Warningf (ctx , "error closing an iterator: %v" , err )
1267
- }
1268
- }
1269
- defer cleanup (ctx )
1270
-
1271
- sessionJobs := make ([]* jobs.Record , 0 , p .extendedEvalCtx .jobs .numToCreate ())
1272
- uniqueJobs := make (map [* jobs.Record ]struct {})
1273
- if err := p .extendedEvalCtx .jobs .forEachToCreate (func (job * jobs.Record ) error {
1274
- if _ , ok := uniqueJobs [job ]; ok {
1275
- return nil
1276
- }
1277
- sessionJobs = append (sessionJobs , job )
1278
- uniqueJobs [job ] = struct {}{}
1279
- return nil
1280
- }); err != nil {
1281
- return matched , err
1282
- }
1283
-
1284
- // Loop while we need to skip a row.
1285
- for {
1286
- ok , err := it .Next (ctx )
1287
- if err != nil {
1288
- return matched , err
1289
- }
1290
- var id , status , created , payloadBytes , progressBytes , sessionIDBytes ,
1291
- instanceID tree.Datum
1292
- if ok {
1293
- r := it .Cur ()
1294
- id , status , created , payloadBytes , progressBytes , sessionIDBytes , instanceID =
1295
- r [0 ], r [1 ], r [2 ], r [3 ], r [4 ], r [5 ], r [6 ]
1296
- } else if ! ok {
1297
- if len (sessionJobs ) == 0 {
1298
- return matched , nil
1299
- }
1300
- job := sessionJobs [len (sessionJobs )- 1 ]
1301
- sessionJobs = sessionJobs [:len (sessionJobs )- 1 ]
1302
- // Convert the job into datums, where protobufs will be intentionally,
1303
- // marshalled.
1304
- id = tree .NewDInt (tree .DInt (job .JobID ))
1305
- status = tree .NewDString (string (jobs .StatePending ))
1306
- created = tree .MustMakeDTimestampTZ (timeutil .Unix (0 , p .txn .ReadTimestamp ().WallTime ), time .Microsecond )
1307
- progressBytes , payloadBytes , err = getPayloadAndProgressFromJobsRecord (p , job )
1308
- if err != nil {
1309
- return matched , err
1310
- }
1311
- sessionIDBytes = tree .NewDBytes (tree .DBytes (p .extendedEvalCtx .SessionID .GetBytes ()))
1312
- instanceID = tree .NewDInt (tree .DInt (p .extendedEvalCtx .ExecCfg .JobRegistry .ID ()))
1313
- }
1314
-
1315
- var jobType , description , statement , user , descriptorIDs , started , runningStatus ,
1316
- finished , modified , fractionCompleted , highWaterTimestamp , errorStr , coordinatorID ,
1317
- traceID , executionErrors , executionEvents = tree .DNull , tree .DNull , tree .DNull ,
1318
- tree .DNull , tree .DNull , tree .DNull , tree .DNull , tree .DNull , tree .DNull , tree .DNull ,
1319
- tree .DNull , tree .DNull , tree .DNull , tree .DNull , tree .DNull , tree .DNull
1320
-
1321
- // Extract data from the payload.
1322
- payload , err := jobs .UnmarshalPayload (payloadBytes )
1323
- if err != nil {
1324
- return matched , wrapPayloadUnMarshalError (err , id )
1325
- }
1326
-
1327
- // We filter out masked rows before we allocate all the
1328
- // datums. Needless allocate when not necessary.
1329
- sqlUsername := payload .UsernameProto .Decode ()
1330
- if sessionID , ok := sessionIDBytes .(* tree.DBytes ); ok {
1331
- if isAlive , err := p .EvalContext ().SQLLivenessReader .IsAlive (
1332
- ctx , sqlliveness .SessionID (* sessionID ),
1333
- ); err != nil {
1334
- // Silently swallow the error for checking for liveness.
1335
- } else if instanceID , ok := instanceID .(* tree.DInt ); ok && isAlive {
1336
- coordinatorID = instanceID
1337
- }
1338
- }
1339
-
1340
- // TODO(jayant): we can select the job_type as a column
1341
- // rather than decoding the payload. This would allow us
1342
- // to create a virtual index on it.
1343
- jobType = tree .NewDString (payload .Type ().String ())
1344
- description = tree .NewDString (payload .Description )
1345
- statement = tree .NewDString (strings .Join (payload .Statement , "; " ))
1346
- user = tree .NewDString (sqlUsername .Normalized ())
1347
- descriptorIDsArr := tree .NewDArray (types .Int )
1348
- for _ , descID := range payload .DescriptorIDs {
1349
- if err := descriptorIDsArr .Append (tree .NewDInt (tree .DInt (int (descID )))); err != nil {
1350
- return matched , err
1351
- }
1352
- }
1353
- descriptorIDs = descriptorIDsArr
1354
- started , err = tsOrNull (payload .StartedMicros )
1355
- if err != nil {
1356
- return matched , err
1357
- }
1358
- finished , err = tsOrNull (payload .FinishedMicros )
1359
- if err != nil {
1360
- return matched , err
1361
- }
1362
- errorStr = tree .NewDString (payload .Error )
1363
-
1364
- // Extract data from the progress field.
1365
- if progressBytes != tree .DNull {
1366
- progress , err := jobs .UnmarshalProgress (progressBytes )
1367
- if err != nil {
1368
- baseErr := ""
1369
- if s , ok := errorStr .(* tree.DString ); ok {
1370
- baseErr = string (* s )
1371
- if baseErr != "" {
1372
- baseErr += "\n "
1373
- }
1374
- }
1375
- errorStr = tree .NewDString (fmt .Sprintf ("%serror decoding progress: %v" , baseErr , err ))
1376
- } else {
1377
- // Progress contains either fractionCompleted for traditional jobs,
1378
- // or the highWaterTimestamp for change feeds.
1379
- if highwater := progress .GetHighWater (); highwater != nil {
1380
- highWaterTimestamp = eval .TimestampToDecimalDatum (* highwater )
1381
- } else {
1382
- fractionCompleted = tree .NewDFloat (tree .DFloat (progress .GetFractionCompleted ()))
1383
- }
1384
- modified , err = tsOrNull (progress .ModifiedMicros )
1385
- if err != nil {
1386
- return matched , err
1387
- }
1388
-
1389
- if s , ok := status .(* tree.DString ); ok {
1390
- if jobs .State (* s ) == jobs .StateRunning && len (progress .StatusMessage ) > 0 {
1391
- runningStatus = tree .NewDString (progress .StatusMessage )
1392
- } else if jobs .State (* s ) == jobs .StatePaused && payload != nil && payload .PauseReason != "" {
1393
- errorStr = tree .NewDString (fmt .Sprintf ("%s: %s" , jobs .PauseRequestExplained , payload .PauseReason ))
1394
- }
1395
- }
1396
- traceID = tree .NewDInt (tree .DInt (progress .TraceID ))
1397
- }
1398
- }
1399
-
1400
- if err = addRow (
1401
- id ,
1402
- jobType ,
1403
- description ,
1404
- statement ,
1405
- user ,
1406
- descriptorIDs ,
1407
- status ,
1408
- runningStatus ,
1409
- created ,
1410
- started ,
1411
- finished ,
1412
- modified ,
1413
- fractionCompleted ,
1414
- highWaterTimestamp ,
1415
- errorStr ,
1416
- coordinatorID ,
1417
- traceID ,
1418
- executionErrors ,
1419
- executionEvents ,
1420
- ); err != nil {
1421
- return matched , err
1422
- }
1423
- matched = true
1424
- }
1425
- }
1426
-
1427
1196
var enablePerJobDetailedAuthLookups = settings .RegisterBoolSetting (
1428
1197
settings .ApplicationLevel ,
1429
1198
"sql.jobs.legacy_per_job_access_via_details.enabled" ,
@@ -1433,7 +1202,7 @@ var enablePerJobDetailedAuthLookups = settings.RegisterBoolSetting(
1433
1202
1434
1203
var errLegacyPerJobAuthDisabledSentinel = pgerror .Newf (pgcode .InsufficientPrivilege , "legacy job access based on details is disabled" )
1435
1204
1436
- func makeJobBasedJobsTableRows (
1205
+ func makeJobsTableRows (
1437
1206
ctx context.Context ,
1438
1207
p * planner ,
1439
1208
addRow func (... tree.Datum ) error ,
0 commit comments