@@ -24,7 +24,6 @@ import (
24
24
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
25
25
"github.com/cockroachdb/cockroach/pkg/gossip"
26
26
"github.com/cockroachdb/cockroach/pkg/jobs"
27
- "github.com/cockroachdb/cockroach/pkg/jobs/jobsauth"
28
27
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
29
28
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
30
29
"github.com/cockroachdb/cockroach/pkg/keys"
@@ -957,183 +956,48 @@ CREATE TABLE crdb_internal.leases (
957
956
},
958
957
}
959
958
960
- const (
961
- // systemJobsAndJobInfoBaseQuery consults both the `system.jobs` and
962
- // `system.job_info` tables to return relevant information about a job.
963
- //
964
- // NB: Every job on creation writes a row each for its payload and progress to
965
- // the `system.job_info` table. For a given job there will always be at most
966
- // one row each for its payload and progress. This is because of the
967
- // `system.job_info` write semantics described `InfoStorage.Write`.
968
- // Theoretically, a job could have no rows corresponding to its progress and
969
- // so we perform a LEFT JOIN to get a NULL value when no progress row is
970
- // found.
971
- systemJobsAndJobInfoBaseQuery = `
972
- SELECT
973
- DISTINCT(id), status, created, payload.value AS payload, progress.value AS progress,
974
- created_by_type, created_by_id, claim_session_id, claim_instance_id, num_runs, last_run, job_type
975
- FROM
976
- system.jobs AS j
977
- LEFT JOIN system.job_info AS progress ON j.id = progress.job_id AND progress.info_key = 'legacy_progress'
978
- INNER JOIN system.job_info AS payload ON j.id = payload.job_id AND payload.info_key = 'legacy_payload'
979
- `
980
- systemJobsIDPredicate = ` WHERE id = $1`
981
- systemJobsTypePredicate = ` WHERE job_type = $1`
982
- systemJobsStatusPredicate = ` WHERE status = $1`
983
- )
984
-
985
- type systemJobsPredicate int
986
-
987
- const (
988
- noPredicate systemJobsPredicate = iota
989
- jobID
990
- jobType
991
- jobStatus
992
- )
993
-
994
- func getInternalSystemJobsQuery (predicate systemJobsPredicate ) string {
995
- switch predicate {
996
- case noPredicate :
997
- return systemJobsAndJobInfoBaseQuery
998
- case jobID :
999
- return systemJobsAndJobInfoBaseQuery + systemJobsIDPredicate
1000
- case jobType :
1001
- return systemJobsAndJobInfoBaseQuery + systemJobsTypePredicate
1002
- case jobStatus :
1003
- return systemJobsAndJobInfoBaseQuery + systemJobsStatusPredicate
1004
- }
1005
-
1006
- return ""
1007
- }
1008
-
1009
959
// TODO(tbg): prefix with kv_.
1010
- var crdbInternalSystemJobsTable = virtualSchemaTable {
960
+ var crdbInternalSystemJobsTable = virtualSchemaView {
1011
961
schema : `
1012
- CREATE TABLE crdb_internal.system_jobs (
1013
- id INT8 NOT NULL,
1014
- status STRING NOT NULL,
1015
- created TIMESTAMP NOT NULL,
1016
- payload BYTES NOT NULL,
1017
- progress BYTES,
1018
- created_by_type STRING,
1019
- created_by_id INT,
1020
- claim_session_id BYTES,
1021
- claim_instance_id INT8,
1022
- num_runs INT8,
1023
- last_run TIMESTAMP,
1024
- job_type STRING,
1025
- INDEX (id),
1026
- INDEX (job_type),
1027
- INDEX (status)
1028
- )` ,
962
+ CREATE VIEW crdb_internal.system_jobs (
963
+ id,
964
+ status,
965
+ created,
966
+ payload,
967
+ progress,
968
+ created_by_type,
969
+ created_by_id,
970
+ claim_session_id,
971
+ claim_instance_id,
972
+ num_runs,
973
+ last_run,
974
+ job_type
975
+ ) AS (SELECT j.id, j.status, j.created, payload.value, progress.value,
976
+ j.created_by_type, j.created_by_id, j.claim_session_id, j.claim_instance_id,
977
+ j.num_runs, j.last_run, j.job_type
978
+ FROM system.jobs AS j
979
+ LEFT JOIN system.job_info AS progress ON j.id = progress.job_id AND progress.info_key = 'legacy_progress'
980
+ INNER JOIN system.job_info AS payload ON j.id = payload.job_id AND payload.info_key = 'legacy_payload'
981
+ WHERE crdb_internal.can_view_job(j.owner)
982
+ )
983
+ ` ,
1029
984
comment : `wrapper over system.jobs with row access control (KV scan)` ,
1030
- indexes : []virtualIndex {
1031
- {
1032
- populate : func (ctx context.Context , unwrappedConstraint tree.Datum , p * planner , _ catalog.DatabaseDescriptor , addRow func (... tree.Datum ) error ) (matched bool , err error ) {
1033
- q := getInternalSystemJobsQuery (jobID )
1034
- targetType := tree .MustBeDInt (unwrappedConstraint )
1035
- return populateSystemJobsTableRows (ctx , p , addRow , q , targetType )
1036
- },
1037
- },
1038
- {
1039
- populate : func (ctx context.Context , unwrappedConstraint tree.Datum , p * planner , _ catalog.DatabaseDescriptor , addRow func (... tree.Datum ) error ) (matched bool , err error ) {
1040
- q := getInternalSystemJobsQuery (jobType )
1041
- targetType := tree .MustBeDString (unwrappedConstraint )
1042
- return populateSystemJobsTableRows (ctx , p , addRow , q , targetType )
1043
- },
1044
- },
1045
- {
1046
- populate : func (ctx context.Context , unwrappedConstraint tree.Datum , p * planner , _ catalog.DatabaseDescriptor , addRow func (... tree.Datum ) error ) (matched bool , err error ) {
1047
- q := getInternalSystemJobsQuery (jobStatus )
1048
- targetType := tree .MustBeDString (unwrappedConstraint )
1049
- return populateSystemJobsTableRows (ctx , p , addRow , q , targetType )
1050
- },
1051
- },
1052
- },
1053
- populate : func (ctx context.Context , p * planner , db catalog.DatabaseDescriptor , addRow func (... tree.Datum ) error ) error {
1054
- _ , err := populateSystemJobsTableRows (ctx , p , addRow , getInternalSystemJobsQuery (noPredicate ))
1055
- return err
985
+ resultColumns : colinfo.ResultColumns {
986
+ {Name : "id" , Typ : types .Int },
987
+ {Name : "status" , Typ : types .String },
988
+ {Name : "created" , Typ : types .TimestampTZ },
989
+ {Name : "payload" , Typ : types .Bytes },
990
+ {Name : "progress" , Typ : types .Bytes },
991
+ {Name : "created_by_type" , Typ : types .String },
992
+ {Name : "created_by_id" , Typ : types .Int },
993
+ {Name : "claim_session_id" , Typ : types .Int },
994
+ {Name : "claim_instance_id" , Typ : types .Int },
995
+ {Name : "num_runs" , Typ : types .Int },
996
+ {Name : "last_run" , Typ : types .TimestampTZ },
997
+ {Name : "job_type" , Typ : types .String },
1056
998
},
1057
999
}
1058
1000
1059
- // populateSystemJobsTableRows calls addRow for all rows of the system.jobs table
1060
- // except for rows that the user does not have access to. It returns true
1061
- // if at least one row was generated.
1062
- func populateSystemJobsTableRows (
1063
- ctx context.Context ,
1064
- p * planner ,
1065
- addRow func (... tree.Datum ) error ,
1066
- query string ,
1067
- params ... interface {},
1068
- ) (result bool , retErr error ) {
1069
- const jobIdIdx = 0
1070
- const jobPayloadIdx = 3
1071
-
1072
- matched := false
1073
-
1074
- // Note: we query system.jobs as root, so we must be careful about which rows we return.
1075
- it , err := p .InternalSQLTxn ().QueryIteratorEx (ctx ,
1076
- "system-jobs-scan" ,
1077
- p .Txn (),
1078
- sessiondata .NodeUserSessionDataOverride ,
1079
- query ,
1080
- params ... ,
1081
- )
1082
- if err != nil {
1083
- return matched , err
1084
- }
1085
-
1086
- cleanup := func (ctx context.Context ) {
1087
- if err := it .Close (); err != nil {
1088
- retErr = errors .CombineErrors (retErr , err )
1089
- }
1090
- }
1091
- defer cleanup (ctx )
1092
-
1093
- globalPrivileges , err := jobsauth .GetGlobalJobPrivileges (ctx , p )
1094
- if err != nil {
1095
- return matched , err
1096
- }
1097
-
1098
- for {
1099
- hasNext , err := it .Next (ctx )
1100
- if ! hasNext || err != nil {
1101
- return matched , err
1102
- }
1103
-
1104
- currentRow := it .Cur ()
1105
- jobID , err := strconv .Atoi (currentRow [jobIdIdx ].String ())
1106
- if err != nil {
1107
- return matched , err
1108
- }
1109
- payloadBytes := currentRow [jobPayloadIdx ]
1110
- payload , err := jobs .UnmarshalPayload (payloadBytes )
1111
- if err != nil {
1112
- return matched , wrapPayloadUnMarshalError (err , currentRow [jobIdIdx ])
1113
- }
1114
- err = jobsauth .Authorize (
1115
- ctx , p , jobspb .JobID (jobID ), payload .UsernameProto .Decode (), jobsauth .ViewAccess , globalPrivileges ,
1116
- )
1117
- if err != nil {
1118
- // Filter out jobs which the user is not allowed to see.
1119
- if IsInsufficientPrivilegeError (err ) {
1120
- continue
1121
- }
1122
- return matched , err
1123
- }
1124
-
1125
- if err := addRow (currentRow ... ); err != nil {
1126
- return matched , err
1127
- }
1128
- matched = true
1129
- }
1130
- }
1131
-
1132
- func wrapPayloadUnMarshalError (err error , jobID tree.Datum ) error {
1133
- return errors .WithHintf (err , "could not decode the payload for job %s." +
1134
- " consider deleting this job from system.jobs" , jobID )
1135
- }
1136
-
1137
1001
var crdbInternalJobsView = virtualSchemaView {
1138
1002
// TODO(dt): the left-outer joins here in theory mean that if there are more
1139
1003
// than one row per job in status or progress the job row would need to be
0 commit comments