Skip to content

Commit b9a92b1

Browse files
committed
jobs: avoid crdb_internal.system_jobs in gc-jobs
The crdb_internal.system_jobs is a virtual table that joins information from the jobs table and the jobs_info table. For the previous query, SELECT id, payload, status FROM "".crdb_internal.system_jobs WHERE (created < $1) AND (id > $2) ORDER BY id LIMIT $3 this is a little suboptimal because: - We don't make use of the progress column so any read of that is useless. - While the crdb_internal.virtual table has a virtual index on job id, and EXPLAIN will even claim that it will be used: • limit │ count: 100 │ └── • filter │ filter: created < '2023-07-20 07:29:01.17001' │ └── • virtual table table: system_jobs@system_jobs_id_idx spans: [/101 - ] This is actually a lie. A virtual index can only handle single-key spans. As a result the unconstrained query is used: WITH latestpayload AS (SELECT job_id, value FROM system.job_info AS payload WHERE info_key = 'legacy_payload' ORDER BY written DESC), latestprogress AS (SELECT job_id, value FROM system.job_info AS progress WHERE info_key = 'legacy_progress' ORDER BY written DESC) SELECT DISTINCT(id), status, created, payload.value AS payload, progress.value AS progress, created_by_type, created_by_id, claim_session_id, claim_instance_id, num_runs, last_run, job_type FROM system.jobs AS j INNER JOIN latestpayload AS payload ON j.id = payload.job_id LEFT JOIN latestprogress AS progress ON j.id = progress.job_id which has a full scan of the jobs table and 2 full scans of the info table: • distinct │ distinct on: id, value, value │ └── • merge join │ equality: (job_id) = (id) │ ├── • render │ │ │ └── • filter │ │ estimated row count: 7,318 │ │ filter: info_key = 'legacy_payload' │ │ │ └── • scan │ estimated row count: 14,648 (100% of the table; stats collected 39 minutes ago; using stats forecast for 2 hours in the future) │ table: job_info@primary │ spans: FULL SCAN │ └── • merge join (right outer) │ equality: (job_id) = (id) │ right cols are key │ ├── • render │ │ │ └── • filter │ │ estimated row count: 7,317 │ │ filter: info_key = 'legacy_progress' │ │ │ └── • scan │ estimated row count: 14,648 (100% of the table; stats collected 39 minutes ago; using stats forecast for 2 hours in the future) │ table: job_info@primary │ spans: FULL SCAN │ └── • scan missing stats table: jobs@primary spans: FULL SCAN Because of the limit, I don't think this ends up being as bad as it looks. But it still isn't great. In this PR, we replace crdb_internal.jobs with a query that removes the join on the unused progress field and also constrains the query of the job_info table. • distinct │ distinct on: id, value │ └── • merge join │ equality: (job_id) = (id) │ right cols are key │ ├── • render │ │ │ └── • filter │ │ estimated row count: 7,318 │ │ filter: info_key = 'legacy_payload' │ │ │ └── • scan │ estimated row count: 14,646 (100% of the table; stats collected 45 minutes ago; using stats forecast for 2 hours in the future) │ table: job_info@primary │ spans: [/101/'legacy_payload' - ] │ └── • render │ └── • limit │ count: 100 │ └── • filter │ filter: created < '2023-07-20 07:29:01.17001' │ └── • scan missing stats table: jobs@primary spans: [/101 - ] In a local example, this does seem faster: > SELECT id, payload, status, created > FROM "".crdb_internal.system_jobs > WHERE (created < '2023-07-20 07:29:01.17001') AND (id > 100) ORDER BY id LIMIT 100; id | payload | status | created -----+---------+--------+---------- (0 rows) Time: 183ms total (execution 183ms / network 0ms) > WITH > latestpayload AS ( > SELECT job_id, value > FROM system.job_info AS payload > WHERE job_id > 100 AND info_key = 'legacy_payload' > ORDER BY written desc > ), > jobpage AS ( > SELECT id, status, created > FROM system.jobs > WHERE (created < '2023-07-20 07:29:01.17001') and (id > 100) > ORDER BY id > LIMIT 100 > ) > SELECT distinct (id), latestpayload.value AS payload, status > FROM jobpage AS j > INNER JOIN latestpayload ON j.id = latestpayload.job_id; id | payload | status -----+---------+--------- (0 rows) Time: 43ms total (execution 42ms / network 0ms) Release note: None Epic: none
1 parent e57e974 commit b9a92b1

File tree

3 files changed

+66
-31
lines changed

3 files changed

+66
-31
lines changed

pkg/jobs/helpers_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ func (j *Job) TestingCurrentStatus(ctx context.Context) (Status, error) {
124124
const (
125125
AdoptQuery = claimQuery
126126
CancelQuery = pauseAndCancelUpdate
127-
GcQuery = expiredJobsQuery
128127
RemoveClaimsQuery = removeClaimsForDeadSessionsQuery
129128
ProcessJobsQuery = processQueryWithBackoff
130129
IntervalBaseSettingKey = intervalBaseSettingKey

pkg/jobs/registry.go

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1256,20 +1256,45 @@ func (r *Registry) cleanupOldJobs(ctx context.Context, olderThan time.Time) erro
12561256

12571257
// The ordering is important as we keep track of the maximum ID we've seen.
12581258
const expiredJobsQuery = `
1259-
SELECT id, payload, status, created FROM "".crdb_internal.system_jobs
1259+
SELECT id, payload, status FROM "".crdb_internal.system_jobs
12601260
WHERE (created < $1) AND (id > $2)
12611261
ORDER BY id
1262-
LIMIT $3
1263-
`
1262+
LIMIT $3`
1263+
1264+
const expiredJobsQueryWithJobInfoTable = `
1265+
WITH
1266+
latestpayload AS (
1267+
SELECT job_id, value
1268+
FROM system.job_info AS payload
1269+
WHERE job_id > $2 AND info_key = 'legacy_payload'
1270+
ORDER BY written desc
1271+
),
1272+
jobpage AS (
1273+
SELECT id, status
1274+
FROM system.jobs
1275+
WHERE (created < $1) and (id > $2)
1276+
ORDER BY id
1277+
LIMIT $3
1278+
)
1279+
SELECT distinct (id), latestpayload.value AS payload, status
1280+
FROM jobpage AS j
1281+
INNER JOIN latestpayload ON j.id = latestpayload.job_id`
12641282

12651283
// cleanupOldJobsPage deletes up to cleanupPageSize job rows with ID > minID.
12661284
// minID is supposed to be the maximum ID returned by the previous page (0 if no
12671285
// previous page).
12681286
func (r *Registry) cleanupOldJobsPage(
12691287
ctx context.Context, olderThan time.Time, minID jobspb.JobID, pageSize int,
12701288
) (done bool, maxID jobspb.JobID, retErr error) {
1289+
var query string
1290+
if r.settings.Version.IsActive(ctx, clusterversion.V23_1JobInfoTableIsBackfilled) {
1291+
query = expiredJobsQueryWithJobInfoTable
1292+
} else {
1293+
query = expiredJobsQuery
1294+
}
1295+
12711296
it, err := r.db.Executor().QueryIterator(ctx, "gc-jobs", nil, /* txn */
1272-
expiredJobsQuery, olderThan, minID, pageSize)
1297+
query, olderThan, minID, pageSize)
12731298
if err != nil {
12741299
return false, 0, err
12751300
}

pkg/jobs/registry_external_test.go

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -198,12 +198,13 @@ func TestRegistrySettingUpdate(t *testing.T) {
198198
}
199199

200200
for _, test := range [...]struct {
201-
name string // Test case ID.
202-
setting string // Cluster setting key.
203-
value interface{} // Duration when expecting a large number of job runs.
204-
matchStmt string // SQL statement to match to identify the target job.
205-
initCount int // Initial number of jobs to ignore at the beginning of the test.
206-
toOverride *settings.DurationSetting
201+
name string // Test case ID.
202+
setting string // Cluster setting key.
203+
value interface{} // Duration when expecting a large number of job runs.
204+
matchStmt string // SQL statement to match to identify the target job.
205+
matchAppName string
206+
initCount int // Initial number of jobs to ignore at the beginning of the test.
207+
toOverride *settings.DurationSetting
207208
}{
208209
{
209210
name: "adopt setting",
@@ -238,33 +239,44 @@ func TestRegistrySettingUpdate(t *testing.T) {
238239
toOverride: jobs.CancelIntervalSetting,
239240
},
240241
{
241-
name: "gc setting",
242-
setting: jobs.GcIntervalSettingKey,
243-
value: shortDuration,
244-
matchStmt: jobs.GcQuery,
245-
initCount: 0,
246-
toOverride: jobs.GcIntervalSetting,
242+
name: "gc setting",
243+
setting: jobs.GcIntervalSettingKey,
244+
value: shortDuration,
245+
matchAppName: "$ internal-gc-jobs",
246+
initCount: 0,
247+
toOverride: jobs.GcIntervalSetting,
247248
},
248249
{
249-
name: "gc setting with base",
250-
setting: jobs.IntervalBaseSettingKey,
251-
value: shortDurationBase,
252-
matchStmt: jobs.GcQuery,
253-
initCount: 0,
254-
toOverride: jobs.GcIntervalSetting,
250+
name: "gc setting with base",
251+
setting: jobs.IntervalBaseSettingKey,
252+
value: shortDurationBase,
253+
matchAppName: "$ internal-gc-jobs",
254+
initCount: 0,
255+
toOverride: jobs.GcIntervalSetting,
255256
},
256257
} {
257258
t.Run(test.name, func(t *testing.T) {
258259
ctx := context.Background()
259-
// Replace multiple white spaces with a single space, remove the last ';', and
260-
// trim leading and trailing spaces.
261-
matchStmt := strings.TrimSpace(regexp.MustCompile(`(\s+|;+)`).ReplaceAllString(test.matchStmt, " "))
260+
var stmtMatcher func(*sessiondata.SessionData, string) bool
261+
if test.matchAppName != "" {
262+
stmtMatcher = func(sd *sessiondata.SessionData, _ string) bool {
263+
return sd.ApplicationName == test.matchAppName
264+
}
265+
} else {
266+
// Replace multiple white spaces with a single space, remove the last ';', and
267+
// trim leading and trailing spaces.
268+
matchStmt := strings.TrimSpace(regexp.MustCompile(`(\s+|;+)`).ReplaceAllString(test.matchStmt, " "))
269+
stmtMatcher = func(_ *sessiondata.SessionData, stmt string) bool {
270+
return stmt == matchStmt
271+
}
272+
}
273+
262274
var seen = int32(0)
263-
stmtFilter := func(ctxt context.Context, _ *sessiondata.SessionData, stmt string, err error) {
275+
stmtFilter := func(_ context.Context, sd *sessiondata.SessionData, stmt string, err error) {
264276
if err != nil {
265277
return
266278
}
267-
if stmt == matchStmt {
279+
if stmtMatcher(sd, stmt) {
268280
atomic.AddInt32(&seen, 1)
269281
}
270282
}
@@ -327,13 +339,12 @@ func TestGCDurationControl(t *testing.T) {
327339
//
328340
// Replace multiple white spaces with a single space, remove the last ';', and
329341
// trim leading and trailing spaces.
330-
gcStmt := strings.TrimSpace(regexp.MustCompile(`(\s+|;+)`).ReplaceAllString(jobs.GcQuery, " "))
331342
var seen = int32(0)
332-
stmtFilter := func(ctxt context.Context, _ *sessiondata.SessionData, stmt string, err error) {
343+
stmtFilter := func(_ context.Context, sd *sessiondata.SessionData, _ string, err error) {
333344
if err != nil {
334345
return
335346
}
336-
if stmt == gcStmt {
347+
if sd.ApplicationName == "$ internal-gc-jobs" {
337348
atomic.AddInt32(&seen, 1)
338349
}
339350
}

0 commit comments

Comments
 (0)