Skip to content

Commit 27417f3

Browse files
craig[bot]fqaziyuzefovichdtjlinder
committed
148615: sql: add validation for CTAS / materialized views r=fqazi a=fqazi Previously, no validation existed when backfilling queries for CTAS statements or creating materialized view. This could lead to scenarios with unnoticed data loss if anything went wrong. To address this, this patch adds validation operations that backfill queries into tables to confirm that row counts match the source. Fixes: #144957 Release note: None 148871: cli: fix printing of the store spec r=yuzefovich a=yuzefovich This was recently broken in 4744a80. Epic: None Release note: None 148906: crdb_internal: replace crdb_internal.jobs vtable with view r=dt a=dt Release note: none. Epic: https://cockroachlabs.atlassian.net/browse/CRDB-48791 148964: nightly-stress: move engflow stress jobs to run on mesolite r=rickystewart a=jlinder Part of: DEVINF-1489 Release note: None Co-authored-by: Faizan Qazi <[email protected]> Co-authored-by: Yahor Yuzefovich <[email protected]> Co-authored-by: David Taylor <[email protected]> Co-authored-by: James H. Linder <[email protected]>
5 parents ba6e42f + 82cac4d + a6898dd + 8b2d340 + 846890f commit 27417f3

File tree

18 files changed

+382
-173
lines changed

18 files changed

+382
-173
lines changed

build/teamcity/cockroach/nightlies/stress_engflow_impl.sh

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@ mkdir -p artifacts
1717

1818
# NB: The certs are set up and cleaned up in unconditional build steps before
1919
# and after this build step.
20-
ENGFLOW_FLAGS="--config engflow --config crosslinux \
21-
--jobs 400 --tls_client_certificate=/home/agent/engflow/engflow.crt \
22-
--tls_client_key=/home/agent/engflow/engflow.key"
20+
ENGFLOW_FLAGS="--config engflowpublic --config crosslinux \
21+
--jobs 400 --tls_client_certificate=/home/agent/engflow/engflowpublic.crt \
22+
--tls_client_key=/home/agent/engflow/engflowpublic.key \
23+
--remote_execution_priority=-2"
2324

2425
BES_KEYWORDS_ARGS=
2526
if [ ! -z "${EXTRA_ISSUE_PARAMS:+$EXTRA_ISSUE_PARAMS}" ]
@@ -41,7 +42,7 @@ bazel test //pkg:all_tests $ENGFLOW_FLAGS --remote_download_minimal \
4142
bazel build //pkg/cmd/bazci/process-bep-file $ENGFLOW_FLAGS --bes_keywords helper-binary
4243
_bazel/bin/pkg/cmd/bazci/process-bep-file/process-bep-file_/process-bep-file \
4344
-eventsfile artifacts/eventstream \
44-
-cert /home/agent/engflow/engflow.crt -key /home/agent/engflow/engflow.key \
45+
-cert /home/agent/engflow/engflowpublic.crt -key /home/agent/engflow/engflowpublic.key \
4546
-extra "${EXTRA_ISSUE_PARAMS:+$EXTRA_ISSUE_PARAMS}" \
4647
-jsonoutfile test-results.json
4748

pkg/bench/rttanalysis/testdata/benchmark_expectations

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ exp,benchmark
6868
3,Jobs/jobs_page_type_filtered
6969
1-3,Jobs/jobs_page_type_filtered_no_matches
7070
4,Jobs/non_admin_crdb_internal.system_jobs
71-
4,Jobs/non_admin_show_jobs
71+
2,Jobs/non_admin_show_jobs
7272
12-15,Jobs/pause_job
7373
12-15,Jobs/resume_job
7474
3,Jobs/show_job

pkg/cli/start.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1240,7 +1240,7 @@ func reportServerInfo(
12401240
buf.Printf("external I/O path: \t<disabled>\n")
12411241
}
12421242
for i, spec := range serverCfg.Stores.Specs {
1243-
buf.Printf("store[%d]:\t%s\n", i, log.SafeManaged(spec))
1243+
buf.Printf("store[%d]:\t%s\n", i, log.SafeManaged(base.StoreSpecCmdLineString(spec)))
12441244
}
12451245

12461246
// Print the commong server identifiers.

pkg/crosscluster/physical/alter_replication_job_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,10 @@ func TestTenantReplicationStatus(t *testing.T) {
533533
_, status, err = getReplicationStatsAndStatus(ctx, registry, nil, jobspb.JobID(producerJobID))
534534
require.ErrorContains(t, err, "is not a stream ingestion job")
535535
require.Equal(t, "replication error", status)
536+
c.DestSysSQL.CheckQueryResults(t, "SELECT count(*) > 0 FROM crdb_internal.cluster_replication_spans", [][]string{{"true"}})
537+
c.DestSysSQL.Exec(t, fmt.Sprintf("CREATE USER %s", username.TestUser))
538+
noPrivs := sqlutils.MakeSQLRunner(c.DestSysServer.SQLConn(t, serverutils.User(username.TestUser)))
539+
noPrivs.CheckQueryResults(t, "SELECT count(*) > 0 FROM crdb_internal.cluster_replication_spans", [][]string{{"false"}})
536540
}
537541

538542
// TestAlterTenantHandleFutureProtectedTimestamp verifies that cutting over "TO

pkg/server/application_api/jobs_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,8 +355,8 @@ func TestAdminAPIJobsDetails(t *testing.T) {
355355
t.Fatal(err)
356356
}
357357
sqlDB.Exec(t,
358-
`INSERT INTO system.jobs (id, status, num_runs, last_run, job_type) VALUES ($1, $2, $3, $4, $5)`,
359-
job.id, job.status, job.numRuns, job.lastRun, payload.Type().String(),
358+
`INSERT INTO system.jobs (id, status, num_runs, last_run, job_type, owner) VALUES ($1, $2, $3, $4, $5, $6)`,
359+
job.id, job.status, job.numRuns, job.lastRun, payload.Type().String(), job.username.Normalized(),
360360
)
361361
sqlDB.Exec(t,
362362
`INSERT INTO system.job_info (job_id, info_key, value) VALUES ($1, $2, $3)`,

pkg/sql/crdb_internal.go

Lines changed: 77 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ var crdbInternal = virtualSchema{
162162
catconstants.CrdbInternalIndexSpansTableID: crdbInternalIndexSpansTable,
163163
catconstants.CrdbInternalIndexUsageStatisticsTableID: crdbInternalIndexUsageStatistics,
164164
catconstants.CrdbInternalInflightTraceSpanTableID: crdbInternalInflightTraceSpanTable,
165-
catconstants.CrdbInternalJobsTableID: crdbInternalJobsTable,
165+
catconstants.CrdbInternalJobsTableID: crdbInternalJobsView,
166166
catconstants.CrdbInternalSystemJobsTableID: crdbInternalSystemJobsTable,
167167
catconstants.CrdbInternalKVNodeStatusTableID: crdbInternalKVNodeStatusTable,
168168
catconstants.CrdbInternalKVStoreStatusTableID: crdbInternalKVStoreStatusTable,
@@ -1134,142 +1134,82 @@ func wrapPayloadUnMarshalError(err error, jobID tree.Datum) error {
11341134
" consider deleting this job from system.jobs", jobID)
11351135
}
11361136

1137-
const (
1138-
jobIDFilter = ` WHERE j.id = $1`
1139-
jobsStatusFilter = ` WHERE j.status = $1`
1140-
jobsTypeFilter = ` WHERE j.job_type = $1`
1141-
)
1142-
1143-
// TODO(tbg): prefix with kv_.
1144-
var crdbInternalJobsTable = virtualSchemaTable{
1145-
schema: `
1146-
CREATE TABLE crdb_internal.jobs (
1147-
job_id INT,
1148-
job_type STRING,
1149-
description STRING,
1150-
statement STRING,
1151-
user_name STRING,
1152-
status STRING,
1153-
running_status STRING,
1154-
created TIMESTAMPTZ,
1155-
finished TIMESTAMPTZ,
1156-
modified TIMESTAMPTZ,
1157-
fraction_completed FLOAT,
1158-
high_water_timestamp DECIMAL,
1159-
error STRING,
1160-
coordinator_id INT,
1161-
INDEX(job_id),
1162-
INDEX(status),
1163-
INDEX(job_type)
1137+
var crdbInternalJobsView = virtualSchemaView{
1138+
// TODO(dt): the left-outer joins here in theory mean that if there are more
1139+
// than one row per job in status or progress the job row would need to be
1140+
// repeated for each. While this is never the case in practice, it does mean
1141+
// the optimizer cannot elide the joins even if the progress or status is not
1142+
// actually being read from a query on the view. We could change the joins to
1143+
// be `LATERAL (... WHERE job_id = id LiMIT 1) ON true` which would promise to
1144+
// the optimizer that the join cannot change the number of rows, allowing it
1145+
// to elide it entirely if the joined columns are not used, however SHOW JOBS
1146+
// and other callers typically _are_ looking for progress and status so we
1147+
// likely will need the joins anyway, queries experts expressed some concern
1148+
// that it could be possible "lateral join prevents some other optimizations".
1149+
// Given we typically expect to need the joined columns anyway, these are left
1150+
// as left joins for now. NB: the `union`, needs to be `union all` to avoid
1151+
// materializing all of the columns even when they're not used, since a
1152+
// non-`all` union has to be prepared to compare them all to eliminate
1153+
// duplicates, even when in the vast majority of calls we expect the session
1154+
// jobs generator to be empty.
1155+
schema: `CREATE VIEW crdb_internal.jobs (
1156+
job_id,
1157+
job_type,
1158+
description,
1159+
statement,
1160+
user_name,
1161+
status,
1162+
running_status,
1163+
created,
1164+
finished,
1165+
modified,
1166+
fraction_completed,
1167+
high_water_timestamp,
1168+
error,
1169+
coordinator_id
1170+
) AS (
1171+
SELECT
1172+
j.id,
1173+
j.job_type,
1174+
coalesce(j.description, '') as description,
1175+
coalesce(j.description, '') as statement,
1176+
coalesce(j.owner, '') as user_name,
1177+
j.status as status,
1178+
s.status as running_status,
1179+
j.created::timestamptz,
1180+
j.finished,
1181+
greatest(j.created, j.finished, p.written, s.written)::timestamptz AS modified,
1182+
p.fraction as fraction_completed,
1183+
p.resolved as high_water_timestamp,
1184+
coalesce(j.error_msg, '') as error,
1185+
j.claim_instance_id as coordinator_id
1186+
FROM system.public.jobs AS j
1187+
LEFT OUTER JOIN system.public.job_progress AS p ON j.id = p.job_id
1188+
LEFT OUTER JOIN system.public.job_status AS s ON j.id = s.job_id
1189+
WHERE crdb_internal.can_view_job(j.owner)
1190+
UNION ALL
1191+
(SELECT job_id, job_type, description, description, user_name, 'pending',
1192+
NULL, now(), NULL, now(), NULL, NULL, NULL, NULL
1193+
FROM crdb_internal.session_pending_jobs()
1194+
)
11641195
)`,
1165-
comment: `decoded job metadata from crdb_internal.system_jobs (KV scan)`,
1166-
indexes: []virtualIndex{{
1167-
populate: func(ctx context.Context, unwrappedConstraint tree.Datum, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) {
1168-
targetID := tree.MustBeDInt(unwrappedConstraint)
1169-
return makeJobsTableRows(ctx, p, addRow, jobIDFilter, targetID)
1170-
},
1171-
}, {
1172-
populate: func(ctx context.Context, unwrappedConstraint tree.Datum, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) {
1173-
targetStatus := tree.MustBeDString(unwrappedConstraint)
1174-
return makeJobsTableRows(ctx, p, addRow, jobsStatusFilter, targetStatus)
1175-
},
1176-
}, {
1177-
populate: func(ctx context.Context, unwrappedConstraint tree.Datum, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) {
1178-
targetType := tree.MustBeDString(unwrappedConstraint)
1179-
return makeJobsTableRows(ctx, p, addRow, jobsTypeFilter, targetType)
1180-
},
1181-
}},
1182-
populate: func(ctx context.Context, p *planner, db catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error {
1183-
_, err := makeJobsTableRows(ctx, p, addRow, "")
1184-
return err
1196+
resultColumns: colinfo.ResultColumns{
1197+
{Name: "job_id", Typ: types.Int},
1198+
{Name: "job_type", Typ: types.String},
1199+
{Name: "description", Typ: types.String},
1200+
{Name: "statement", Typ: types.String},
1201+
{Name: "user_name", Typ: types.String},
1202+
{Name: "status", Typ: types.String},
1203+
{Name: "running_status", Typ: types.String},
1204+
{Name: "created", Typ: types.TimestampTZ},
1205+
{Name: "finished", Typ: types.TimestampTZ},
1206+
{Name: "modified", Typ: types.TimestampTZ},
1207+
{Name: "fraction_completed", Typ: types.Float},
1208+
{Name: "high_water_timestamp", Typ: types.Decimal},
1209+
{Name: "error", Typ: types.String},
1210+
{Name: "coordinator_id", Typ: types.Int},
11851211
},
1186-
}
1187-
1188-
func makeJobsTableRows(
1189-
ctx context.Context,
1190-
p *planner,
1191-
addRow func(...tree.Datum) error,
1192-
whereClause string,
1193-
params ...interface{},
1194-
) (emitted bool, retErr error) {
1195-
globalPrivileges, err := jobsauth.GetGlobalJobPrivileges(ctx, p)
1196-
if err != nil {
1197-
return false, err
1198-
}
1199-
1200-
query := `SELECT
1201-
j.id, j.job_type, coalesce(j.description, ''), coalesce(j.owner, ''), j.status as state,
1202-
s.status, j.created::timestamptz, j.finished, greatest(j.created, j.finished, p.written, s.written)::timestamptz AS last_modified,
1203-
p.fraction,
1204-
p.resolved,
1205-
j.error_msg,
1206-
j.claim_instance_id
1207-
FROM system.public.jobs AS j
1208-
LEFT OUTER JOIN system.public.job_progress AS p ON j.id = p.job_id
1209-
LEFT OUTER JOIN system.public.job_status AS s ON j.id = s.job_id
1210-
` + whereClause + `UNION
1211-
(SELECT job_id, job_type, description, user_name, 'pending',
1212-
NULL, now(), NULL, now(), NULL, NULL, NULL, NULL
1213-
FROM crdb_internal.session_pending_jobs())`
1214-
1215-
it, err := p.InternalSQLTxn().QueryIteratorEx(
1216-
ctx, "system-jobs-join", p.txn, sessiondata.NodeUserSessionDataOverride, query, params...)
1217-
if err != nil {
1218-
return emitted, err
1219-
}
1220-
defer func() {
1221-
if err := it.Close(); err != nil {
1222-
retErr = errors.CombineErrors(retErr, err)
1223-
}
1224-
}()
1225-
1226-
// Loop while we need to skip a row.
1227-
for {
1228-
ok, err := it.Next(ctx)
1229-
if err != nil || !ok {
1230-
return emitted, err
1231-
}
1232-
r := it.Cur()
1233-
id, typStr, desc, ownerStr, state, status, created, finished, modified, fraction, resolved, errorMsg, instanceID :=
1234-
r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8], r[9], r[10], r[11], r[12]
1235-
1236-
owner := username.MakeSQLUsernameFromPreNormalizedString(string(tree.MustBeDString(ownerStr)))
1237-
jobID := jobspb.JobID(tree.MustBeDInt(id))
1238-
1239-
if errorMsg == tree.DNull {
1240-
errorMsg = emptyString
1241-
}
1242-
1243-
if err := jobsauth.Authorize(
1244-
ctx, p, jobID, owner, jobsauth.ViewAccess, globalPrivileges,
1245-
); err != nil {
1246-
// Filter out jobs which the user is not allowed to see.
1247-
if IsInsufficientPrivilegeError(err) {
1248-
continue
1249-
}
1250-
return emitted, err
1251-
}
1252-
1253-
if err = addRow(
1254-
id,
1255-
typStr,
1256-
desc,
1257-
desc,
1258-
ownerStr,
1259-
state,
1260-
status,
1261-
created,
1262-
finished,
1263-
modified,
1264-
fraction,
1265-
resolved,
1266-
errorMsg,
1267-
instanceID,
1268-
); err != nil {
1269-
return emitted, err
1270-
}
1271-
emitted = true
1272-
}
1212+
comment: `decoded job metadata from various jobs tables`,
12731213
}
12741214

12751215
const crdbInternalKVProtectedTSTableQuery = `
@@ -9355,7 +9295,7 @@ var crdbInternalClusterReplicationResolvedView = virtualSchemaView{
93559295
SELECT
93569296
j.id AS job_id, jsonb_array_elements(crdb_internal.pb_to_json('progress', i.value)->'streamIngest'->'checkpoint'->'resolvedSpans') AS s
93579297
FROM system.jobs j LEFT JOIN system.job_info i ON j.id = i.job_id AND i.info_key = 'legacy_progress'
9358-
WHERE j.job_type = 'REPLICATION STREAM INGESTION'
9298+
WHERE j.job_type = 'REPLICATION STREAM INGESTION' AND pg_has_role(current_user, 'admin', 'member')
93599299
) SELECT
93609300
job_id,
93619301
crdb_internal.pretty_key(decode(s->'span'->>'key', 'base64'), 0) AS start_key,
@@ -9378,7 +9318,7 @@ var crdbInternalLogicalReplicationResolvedView = virtualSchemaView{
93789318
SELECT
93799319
j.id AS job_id, jsonb_array_elements(crdb_internal.pb_to_json('progress', i.value)->'LogicalReplication'->'checkpoint'->'resolvedSpans') AS s
93809320
FROM system.jobs j LEFT JOIN system.job_info i ON j.id = i.job_id AND i.info_key = 'legacy_progress'
9381-
WHERE j.job_type = 'LOGICAL REPLICATION'
9321+
WHERE j.job_type = 'LOGICAL REPLICATION' AND pg_has_role(current_user, 'admin', 'member')
93829322
) SELECT
93839323
job_id,
93849324
crdb_internal.pretty_key(decode(s->'span'->>'key', 'base64'), 0) AS start_key,

pkg/sql/crdb_internal_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1454,13 +1454,13 @@ func TestInternalSystemJobsAccess(t *testing.T) {
14541454
// worker goroutine returns "query canceled error".
14551455
//
14561456
// In particular, the following setup is used:
1457-
// - issue SHOW JOBS query which internally issues a query against
1458-
// crdb_internal.system_jobs virtual table
1459-
// - that virtual table is generated by issuing "system-jobs-scan" internal
1457+
// - issue SHOW ZONE CONFIGURATIONS query which internally issues a query against
1458+
// crdb_internal.zones virtual table
1459+
// - that virtual table is generated by issuing "crdb-internal-zones-table" internal
14601460
// query
1461-
// - during that "system-jobs-scan" query we're injecting the query canceled
1461+
// - during that "crdb-internal-zones-table" query we're injecting the query canceled
14621462
// error (in other words, the error is injected during the generation of
1463-
// crdb_internal.system_jobs virtual table).
1463+
// crdb_internal.zones virtual table).
14641464
//
14651465
// The injection is achieved by adding a callback to DistSQLReceiver.Push which
14661466
// replaces the first piece of metadata it sees with the error.
@@ -1482,7 +1482,7 @@ func TestVirtualTableDoesntHangOnQueryCanceledError(t *testing.T) {
14821482
return nil
14831483
}
14841484
opName, ok := sql.GetInternalOpName(ctx)
1485-
if !ok || !(opName == "system-jobs-scan" || opName == "system-jobs-join") {
1485+
if !ok || !(opName == "crdb-internal-zones-table") {
14861486
return nil
14871487
}
14881488
numCallbacksAdded.Add(1)
@@ -1504,7 +1504,7 @@ func TestVirtualTableDoesntHangOnQueryCanceledError(t *testing.T) {
15041504
sqlDB := sqlutils.MakeSQLRunner(db)
15051505

15061506
addCallback.Store(true)
1507-
sqlDB.ExpectErr(t, err.Error(), "SHOW JOBS")
1507+
sqlDB.ExpectErr(t, err.Error(), "SHOW ZONE CONFIGURATIONS")
15081508
addCallback.Store(false)
15091509

15101510
// Sanity check that the callback was added at least once.

pkg/sql/faketreeeval/evalctx.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,14 @@ func (ep *DummySessionAccessor) HasViewActivityOrViewActivityRedactedRole(
647647
return false, false, errors.WithStack(errEvalSessionVar)
648648
}
649649

650+
// HasViewAccessToJob implements SessionAccessor.
651+
func (ep *DummySessionAccessor) HasViewAccessToJob(
652+
ctx context.Context, owner username.SQLUsername,
653+
) bool {
654+
// This is a no-op in the dummy implementation.
655+
return false
656+
}
657+
650658
func (ep *DummySessionAccessor) ForEachSessionPendingJob(
651659
_ func(job jobspb.PendingJob) error,
652660
) error {

pkg/sql/jobs_collection.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,12 @@
66
package sql
77

88
import (
9+
"context"
10+
911
"github.com/cockroachdb/cockroach/pkg/jobs"
12+
"github.com/cockroachdb/cockroach/pkg/jobs/jobsauth"
1013
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
14+
"github.com/cockroachdb/cockroach/pkg/security/username"
1115
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1216
)
1317

@@ -100,3 +104,11 @@ func (p *planner) ForEachSessionPendingJob(fn func(job jobspb.PendingJob) error)
100104
})
101105
})
102106
}
107+
108+
func (p *planner) HasViewAccessToJob(ctx context.Context, owner username.SQLUsername) bool {
109+
privs, err := jobsauth.GetGlobalJobPrivileges(ctx, p)
110+
if err != nil {
111+
return false
112+
}
113+
return jobsauth.Authorize(ctx, p, jobspb.InvalidJobID, owner, jobsauth.ViewAccess, privs) == nil
114+
}

pkg/sql/logictest/testdata/logic_test/select

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -831,7 +831,7 @@ statement ok
831831
SELECT * FROM pg_class
832832

833833
statement ok
834-
SELECT * FROM crdb_internal.jobs
834+
SELECT * FROM crdb_internal.node_build_info
835835

836836
# Tests for 'large_full_scan_rows'.
837837
statement ok

0 commit comments

Comments
 (0)