Skip to content

Commit cb093a6

Browse files
craig[bot]dtstevendanna
committed
149476: crdb_internal: make system_jobs a view r=dt a=dt Release note: none. Epic: CRDB-48791. 149585: roachtest: use leader leases by default in perturbation tests r=tbg a=stevendanna Leader leases are now the default, so they should also be the default in the perturbation tests. Epic: none Release note: None Co-authored-by: David Taylor <[email protected]> Co-authored-by: Steven Danna <[email protected]>
3 parents 045a652 + 8cef7ea + ead6970 commit cb093a6

File tree

6 files changed

+43
-229
lines changed

6 files changed

+43
-229
lines changed

pkg/bench/rttanalysis/testdata/benchmark_expectations

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ exp,benchmark
6767
3,Jobs/jobs_page_latest_50
6868
3,Jobs/jobs_page_type_filtered
6969
1-3,Jobs/jobs_page_type_filtered_no_matches
70-
4,Jobs/non_admin_crdb_internal.system_jobs
70+
2,Jobs/non_admin_crdb_internal.system_jobs
7171
2,Jobs/non_admin_show_jobs
7272
12-15,Jobs/pause_job
7373
12-15,Jobs/resume_job

pkg/cmd/roachtest/tests/perturbation/framework.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ func (v variations) randomize(rng *rand.Rand) variations {
243243
func setup(p perturbation, acceptableChange float64) variations {
244244
v := variations{}
245245
v.workload = kvWorkload{}
246-
v.leaseType = registry.EpochLeases
246+
v.leaseType = registry.LeaderLeases
247247
v.blockSize = 4096
248248
v.splits = 10000
249249
v.numNodes = 12

pkg/sql/crdb_internal.go

Lines changed: 36 additions & 172 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
2525
"github.com/cockroachdb/cockroach/pkg/gossip"
2626
"github.com/cockroachdb/cockroach/pkg/jobs"
27-
"github.com/cockroachdb/cockroach/pkg/jobs/jobsauth"
2827
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2928
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
3029
"github.com/cockroachdb/cockroach/pkg/keys"
@@ -957,183 +956,48 @@ CREATE TABLE crdb_internal.leases (
957956
},
958957
}
959958

960-
const (
961-
// systemJobsAndJobInfoBaseQuery consults both the `system.jobs` and
962-
// `system.job_info` tables to return relevant information about a job.
963-
//
964-
// NB: Every job on creation writes a row each for its payload and progress to
965-
// the `system.job_info` table. For a given job there will always be at most
966-
// one row each for its payload and progress. This is because of the
967-
// `system.job_info` write semantics described `InfoStorage.Write`.
968-
// Theoretically, a job could have no rows corresponding to its progress and
969-
// so we perform a LEFT JOIN to get a NULL value when no progress row is
970-
// found.
971-
systemJobsAndJobInfoBaseQuery = `
972-
SELECT
973-
DISTINCT(id), status, created, payload.value AS payload, progress.value AS progress,
974-
created_by_type, created_by_id, claim_session_id, claim_instance_id, num_runs, last_run, job_type
975-
FROM
976-
system.jobs AS j
977-
LEFT JOIN system.job_info AS progress ON j.id = progress.job_id AND progress.info_key = 'legacy_progress'
978-
INNER JOIN system.job_info AS payload ON j.id = payload.job_id AND payload.info_key = 'legacy_payload'
979-
`
980-
systemJobsIDPredicate = ` WHERE id = $1`
981-
systemJobsTypePredicate = ` WHERE job_type = $1`
982-
systemJobsStatusPredicate = ` WHERE status = $1`
983-
)
984-
985-
type systemJobsPredicate int
986-
987-
const (
988-
noPredicate systemJobsPredicate = iota
989-
jobID
990-
jobType
991-
jobStatus
992-
)
993-
994-
func getInternalSystemJobsQuery(predicate systemJobsPredicate) string {
995-
switch predicate {
996-
case noPredicate:
997-
return systemJobsAndJobInfoBaseQuery
998-
case jobID:
999-
return systemJobsAndJobInfoBaseQuery + systemJobsIDPredicate
1000-
case jobType:
1001-
return systemJobsAndJobInfoBaseQuery + systemJobsTypePredicate
1002-
case jobStatus:
1003-
return systemJobsAndJobInfoBaseQuery + systemJobsStatusPredicate
1004-
}
1005-
1006-
return ""
1007-
}
1008-
1009959
// TODO(tbg): prefix with kv_.
1010-
var crdbInternalSystemJobsTable = virtualSchemaTable{
960+
var crdbInternalSystemJobsTable = virtualSchemaView{
1011961
schema: `
1012-
CREATE TABLE crdb_internal.system_jobs (
1013-
id INT8 NOT NULL,
1014-
status STRING NOT NULL,
1015-
created TIMESTAMP NOT NULL,
1016-
payload BYTES NOT NULL,
1017-
progress BYTES,
1018-
created_by_type STRING,
1019-
created_by_id INT,
1020-
claim_session_id BYTES,
1021-
claim_instance_id INT8,
1022-
num_runs INT8,
1023-
last_run TIMESTAMP,
1024-
job_type STRING,
1025-
INDEX (id),
1026-
INDEX (job_type),
1027-
INDEX (status)
1028-
)`,
962+
CREATE VIEW crdb_internal.system_jobs (
963+
id,
964+
status,
965+
created,
966+
payload,
967+
progress,
968+
created_by_type,
969+
created_by_id,
970+
claim_session_id,
971+
claim_instance_id,
972+
num_runs,
973+
last_run,
974+
job_type
975+
) AS (SELECT j.id, j.status, j.created, payload.value, progress.value,
976+
j.created_by_type, j.created_by_id, j.claim_session_id, j.claim_instance_id,
977+
j.num_runs, j.last_run, j.job_type
978+
FROM system.jobs AS j
979+
LEFT JOIN system.job_info AS progress ON j.id = progress.job_id AND progress.info_key = 'legacy_progress'
980+
INNER JOIN system.job_info AS payload ON j.id = payload.job_id AND payload.info_key = 'legacy_payload'
981+
WHERE crdb_internal.can_view_job(j.owner)
982+
)
983+
`,
1029984
comment: `wrapper over system.jobs with row access control (KV scan)`,
1030-
indexes: []virtualIndex{
1031-
{
1032-
populate: func(ctx context.Context, unwrappedConstraint tree.Datum, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) {
1033-
q := getInternalSystemJobsQuery(jobID)
1034-
targetType := tree.MustBeDInt(unwrappedConstraint)
1035-
return populateSystemJobsTableRows(ctx, p, addRow, q, targetType)
1036-
},
1037-
},
1038-
{
1039-
populate: func(ctx context.Context, unwrappedConstraint tree.Datum, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) {
1040-
q := getInternalSystemJobsQuery(jobType)
1041-
targetType := tree.MustBeDString(unwrappedConstraint)
1042-
return populateSystemJobsTableRows(ctx, p, addRow, q, targetType)
1043-
},
1044-
},
1045-
{
1046-
populate: func(ctx context.Context, unwrappedConstraint tree.Datum, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) {
1047-
q := getInternalSystemJobsQuery(jobStatus)
1048-
targetType := tree.MustBeDString(unwrappedConstraint)
1049-
return populateSystemJobsTableRows(ctx, p, addRow, q, targetType)
1050-
},
1051-
},
1052-
},
1053-
populate: func(ctx context.Context, p *planner, db catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error {
1054-
_, err := populateSystemJobsTableRows(ctx, p, addRow, getInternalSystemJobsQuery(noPredicate))
1055-
return err
985+
resultColumns: colinfo.ResultColumns{
986+
{Name: "id", Typ: types.Int},
987+
{Name: "status", Typ: types.String},
988+
{Name: "created", Typ: types.TimestampTZ},
989+
{Name: "payload", Typ: types.Bytes},
990+
{Name: "progress", Typ: types.Bytes},
991+
{Name: "created_by_type", Typ: types.String},
992+
{Name: "created_by_id", Typ: types.Int},
993+
{Name: "claim_session_id", Typ: types.Int},
994+
{Name: "claim_instance_id", Typ: types.Int},
995+
{Name: "num_runs", Typ: types.Int},
996+
{Name: "last_run", Typ: types.TimestampTZ},
997+
{Name: "job_type", Typ: types.String},
1056998
},
1057999
}
10581000

1059-
// populateSystemJobsTableRows calls addRow for all rows of the system.jobs table
1060-
// except for rows that the user does not have access to. It returns true
1061-
// if at least one row was generated.
1062-
func populateSystemJobsTableRows(
1063-
ctx context.Context,
1064-
p *planner,
1065-
addRow func(...tree.Datum) error,
1066-
query string,
1067-
params ...interface{},
1068-
) (result bool, retErr error) {
1069-
const jobIdIdx = 0
1070-
const jobPayloadIdx = 3
1071-
1072-
matched := false
1073-
1074-
// Note: we query system.jobs as root, so we must be careful about which rows we return.
1075-
it, err := p.InternalSQLTxn().QueryIteratorEx(ctx,
1076-
"system-jobs-scan",
1077-
p.Txn(),
1078-
sessiondata.NodeUserSessionDataOverride,
1079-
query,
1080-
params...,
1081-
)
1082-
if err != nil {
1083-
return matched, err
1084-
}
1085-
1086-
cleanup := func(ctx context.Context) {
1087-
if err := it.Close(); err != nil {
1088-
retErr = errors.CombineErrors(retErr, err)
1089-
}
1090-
}
1091-
defer cleanup(ctx)
1092-
1093-
globalPrivileges, err := jobsauth.GetGlobalJobPrivileges(ctx, p)
1094-
if err != nil {
1095-
return matched, err
1096-
}
1097-
1098-
for {
1099-
hasNext, err := it.Next(ctx)
1100-
if !hasNext || err != nil {
1101-
return matched, err
1102-
}
1103-
1104-
currentRow := it.Cur()
1105-
jobID, err := strconv.Atoi(currentRow[jobIdIdx].String())
1106-
if err != nil {
1107-
return matched, err
1108-
}
1109-
payloadBytes := currentRow[jobPayloadIdx]
1110-
payload, err := jobs.UnmarshalPayload(payloadBytes)
1111-
if err != nil {
1112-
return matched, wrapPayloadUnMarshalError(err, currentRow[jobIdIdx])
1113-
}
1114-
err = jobsauth.Authorize(
1115-
ctx, p, jobspb.JobID(jobID), payload.UsernameProto.Decode(), jobsauth.ViewAccess, globalPrivileges,
1116-
)
1117-
if err != nil {
1118-
// Filter out jobs which the user is not allowed to see.
1119-
if IsInsufficientPrivilegeError(err) {
1120-
continue
1121-
}
1122-
return matched, err
1123-
}
1124-
1125-
if err := addRow(currentRow...); err != nil {
1126-
return matched, err
1127-
}
1128-
matched = true
1129-
}
1130-
}
1131-
1132-
func wrapPayloadUnMarshalError(err error, jobID tree.Datum) error {
1133-
return errors.WithHintf(err, "could not decode the payload for job %s."+
1134-
" consider deleting this job from system.jobs", jobID)
1135-
}
1136-
11371001
var crdbInternalJobsView = virtualSchemaView{
11381002
// TODO(dt): the left-outer joins here in theory mean that if there are more
11391003
// than one row per job in status or progress the job row would need to be

pkg/sql/crdb_internal_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1327,7 +1327,7 @@ func TestInternalSystemJobsTableMirrorsSystemJobsTable(t *testing.T) {
13271327
assert.NoError(t, err)
13281328

13291329
tdb.Exec(t,
1330-
"INSERT INTO system.jobs (id, status, created) values ($1, $2, $3)",
1330+
"INSERT INTO system.jobs (id, status, created, owner) values ($1, $2, $3, 'root')",
13311331
1, jobs.StateRunning, timeutil.Now(),
13321332
)
13331333
tdb.Exec(t,
@@ -1336,9 +1336,9 @@ func TestInternalSystemJobsTableMirrorsSystemJobsTable(t *testing.T) {
13361336
)
13371337

13381338
tdb.Exec(t,
1339-
`INSERT INTO system.jobs (id, status, created, created_by_type, created_by_id,
1340-
claim_session_id, claim_instance_id, num_runs, last_run, job_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`,
1341-
2, jobs.StateRunning, timeutil.Now(), "created by", 2, []byte("claim session id"),
1339+
`INSERT INTO system.jobs (id, status, created, owner, created_by_type, created_by_id,
1340+
claim_session_id, claim_instance_id, num_runs, last_run, job_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)`,
1341+
2, jobs.StateRunning, timeutil.Now(), "root", "created by", 2, []byte("claim session id"),
13421342
2, 2, timeutil.Now(), jobspb.TypeImport.String(),
13431343
)
13441344
tdb.Exec(t,

pkg/sql/opt/exec/execbuilder/testdata/explain_analyze

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ quality of service: regular
176176
│ │ └── • virtual table
177177
│ │ sql nodes: <hidden>
178178
│ │ regions: <hidden>
179-
│ │ actual row count: 363
179+
│ │ actual row count: 360
180180
│ │ execution time: 0µs
181181
│ │ table: pg_class@primary
182182
│ │

pkg/sql/opt/exec/execbuilder/testdata/virtual

Lines changed: 0 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -482,45 +482,6 @@ vectorized: true
482482
└── • virtual table
483483
table: pg_type@primary
484484

485-
# Validate that the virtual index on 'status' works.
486-
# Vectorized execution may be on or off during tests, so exclude it from the output.
487-
query T
488-
SELECT info FROM [EXPLAIN SELECT count(*) FROM crdb_internal.system_jobs WHERE status = 'paused'] WHERE info NOT LIKE 'vectorized%'
489-
----
490-
distribution: local
491-
·
492-
• group (scalar)
493-
494-
└── • virtual table
495-
table: system_jobs@system_jobs_status_idx
496-
spans: [/'paused' - /'paused']
497-
498-
# Validate that the virtual index on 'status' works.
499-
# Vectorized execution may be on or off during tests, so exclude it from the output.
500-
query T
501-
SELECT info FROM [EXPLAIN SELECT count(*) FROM crdb_internal.system_jobs WHERE job_type = 'changefeed'] WHERE info NOT LIKE 'vectorized%'
502-
----
503-
distribution: local
504-
·
505-
• group (scalar)
506-
507-
└── • virtual table
508-
table: system_jobs@system_jobs_job_type_idx
509-
spans: [/'changefeed' - /'changefeed']
510-
511-
# Validate that the virtual index on 'status' works.
512-
# Vectorized execution may be on or off during tests, so exclude it from the output.
513-
query T
514-
SELECT info FROM [EXPLAIN SELECT count(*) FROM crdb_internal.system_jobs WHERE id = 1] WHERE info NOT LIKE 'vectorized%'
515-
----
516-
distribution: local
517-
·
518-
• group (scalar)
519-
520-
└── • virtual table
521-
table: system_jobs@system_jobs_id_idx
522-
spans: [/1 - /1]
523-
524485
# Regression test for PruneCols fix (#94890)
525486
statement ok
526487
SET testing_optimizer_disable_rule_probability = 1.0;
@@ -579,14 +540,3 @@ vectorized: true
579540
statement ok
580541
RESET disallow_full_table_scans;
581542

582-
# Verify that we're not fooling ourselves by showing that we can use a virtual
583-
# index when we actually can't (#108317).
584-
query T
585-
EXPLAIN SELECT * FROM crdb_internal.system_jobs WHERE id > 100;
586-
----
587-
distribution: local
588-
vectorized: true
589-
·
590-
• virtual table
591-
table: system_jobs@primary
592-
virtual table filter

0 commit comments

Comments
 (0)