Skip to content

Commit 1868efe

Browse files
committed
jobs: make GC of old jobs txn'l
A single logical job is now made up of many rows stored in many different tables, meaning that deletion of a job from the system should imply complete deletion of all of its rows in all of these tables. Previously we were not using a single txn to delete from all of the tables, meaning a job could have its row deleted from e.g. system.jobs but not from job_status, orphaning the status row. These orphan rows were not particularly problematic but are cruft which could build up over time as nothing would remove them once the main record in system.jobs was removed. Instead, we now identify all expired jobs in system jobs which we will delete using one txn, then delete each job in a txn that deletes all rows for that job in all tables. Release note: none. Epic: CRDB-51121. Fixes: #147552.
1 parent 71bcba2 commit 1868efe

File tree

3 files changed

+24
-92
lines changed

3 files changed

+24
-92
lines changed

pkg/jobs/adopt.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ const (
3737
`'` + string(StatePauseRequested) + `', ` +
3838
`'` + string(StateReverting) + `'`
3939

40+
terminalStateList = `'` + string(StateFailed) + `', ` +
41+
`'` + string(StateCanceled) + `', ` +
42+
`'` + string(StateSucceeded) + `'`
43+
4044
claimableStateTupleString = `(` + claimableStateList + `)`
4145

4246
nonTerminalStateList = claimableStateList + `, ` +

pkg/jobs/registry.go

Lines changed: 12 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
3030
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
3131
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
32-
"github.com/cockroachdb/cockroach/pkg/sql/types"
3332
"github.com/cockroachdb/cockroach/pkg/util/cidr"
3433
"github.com/cockroachdb/cockroach/pkg/util/envutil"
3534
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -1175,26 +1174,6 @@ FROM system.job_info
11751174
WHERE written < $1 AND job_id NOT IN (SELECT id FROM system.jobs)
11761175
LIMIT $2`
11771176

1178-
// The ordering is important as we keep track of the maximum ID we've seen.
1179-
const expiredJobsQueryWithJobInfoTable = `
1180-
WITH
1181-
latestpayload AS (
1182-
SELECT job_id, value
1183-
FROM system.job_info AS payload
1184-
WHERE job_id > $2 AND info_key = 'legacy_payload'
1185-
ORDER BY written desc
1186-
),
1187-
jobpage AS (
1188-
SELECT id, status
1189-
FROM system.jobs
1190-
WHERE (created < $1) and (id > $2)
1191-
ORDER BY id
1192-
LIMIT $3
1193-
)
1194-
SELECT distinct (id), latestpayload.value AS payload, status
1195-
FROM jobpage AS j
1196-
INNER JOIN latestpayload ON j.id = latestpayload.job_id`
1197-
11981177
// jobMetadataTables are all of the tables that have rows storing additional
11991178
// attributes or data about jobs beyond the core job record in system.jobs. All
12001179
// of these tables identity the job which own rows in them using a "job_id"
@@ -1209,86 +1188,31 @@ var jobMetadataTables = []string{"job_info", "job_progress", "job_progress_histo
12091188
func (r *Registry) cleanupOldJobsPage(
12101189
ctx context.Context, olderThan time.Time, minID jobspb.JobID, pageSize int,
12111190
) (done bool, maxID jobspb.JobID, retErr error) {
1212-
query := expiredJobsQueryWithJobInfoTable
1191+
query := "SELECT id, status, finished, $1 FROM system.jobs WHERE status in (" + terminalStateList + ") AND (finished < $1) and (id >= $2) ORDER BY id LIMIT $3"
12131192

12141193
it, err := r.db.Executor().QueryIterator(ctx, "gc-jobs", nil, /* txn */
12151194
query, olderThan, minID, pageSize)
12161195
if err != nil {
12171196
return false, 0, err
12181197
}
1219-
// We have to make sure to close the iterator since we might return from the
1220-
// for loop early (before Next() returns false).
1221-
defer func() { retErr = errors.CombineErrors(retErr, it.Close()) }()
1222-
toDelete := tree.NewDArray(types.Int)
1223-
oldMicros := timeutil.ToUnixMicros(olderThan)
1224-
1198+
var candidates []jobspb.JobID
12251199
var ok bool
1226-
var numRows int
12271200
for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
1228-
numRows++
1229-
row := it.Cur()
1230-
payload, err := UnmarshalPayload(row[1])
1231-
if err != nil {
1232-
return false, 0, err
1233-
}
1234-
remove := false
1235-
switch State(*row[2].(*tree.DString)) {
1236-
case StateSucceeded, StateCanceled, StateFailed:
1237-
remove = payload.FinishedMicros < oldMicros
1238-
}
1239-
if remove {
1240-
toDelete.Array = append(toDelete.Array, row[0])
1241-
}
1242-
}
1243-
if err != nil {
1244-
return false, 0, err
1201+
candidates = append(candidates, jobspb.JobID(tree.MustBeDInt(it.Cur()[0])))
12451202
}
1246-
if numRows == 0 {
1247-
return true, 0, nil
1203+
err = errors.CombineErrors(err, it.Close())
1204+
1205+
if err != nil || len(candidates) == 0 {
1206+
return len(candidates) == 0, 0, err
12481207
}
12491208

1250-
log.VEventf(ctx, 2, "read potentially expired jobs: %d", numRows)
1251-
if len(toDelete.Array) > 0 {
1252-
log.VEventf(ctx, 2, "attempting to clean up %d expired job records", len(toDelete.Array))
1253-
const stmt = `DELETE FROM system.jobs WHERE id = ANY($1)`
1254-
nDeleted, err := r.db.Executor().Exec(
1255-
ctx, "gc-jobs", nil /* txn */, stmt, toDelete,
1256-
)
1257-
if err != nil {
1258-
log.Warningf(ctx, "error cleaning up %d jobs: %v", len(toDelete.Array), err)
1259-
return false, 0, errors.Wrap(err, "deleting old jobs")
1260-
}
1261-
1262-
counts := make(map[string]int)
1263-
for _, tbl := range jobMetadataTables {
1264-
var deleted int
1265-
if err := r.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
1266-
// Tables other than job_info -- the 0th -- are only present if the txn is
1267-
// running at a version that includes them.
1268-
deleted, err = txn.Exec(ctx, redact.RedactableString("gc-job-"+tbl), txn.KV(),
1269-
"DELETE FROM system."+tbl+" WHERE job_id = ANY($1)", toDelete,
1270-
)
1271-
if err != nil {
1272-
return err
1273-
}
1274-
return nil
1275-
}); err != nil {
1276-
return false, 0, errors.Wrapf(err, "deleting old job metadata from %s", tbl)
1277-
}
1278-
counts[tbl] = deleted
1279-
}
1280-
if nDeleted > 0 {
1281-
log.Infof(ctx, "cleaned up %d expired job records (%d infos, %d progresses, %d progress_hists, %d statuses, %d messages)",
1282-
nDeleted, counts["job_info"], counts["job_progress"], counts["job_progress_history"], counts["job_status"], counts["job_message"])
1209+
log.VEventf(ctx, 2, "found %d expired jobs", len(candidates))
1210+
for _, id := range candidates {
1211+
if err := r.DeleteTerminalJobByID(ctx, id); err != nil {
1212+
return false, 0, err
12831213
}
12841214
}
1285-
// If we got as many rows as we asked for, there might be more.
1286-
morePages := numRows == pageSize
1287-
// Track the highest ID we encounter, so it can serve as the bottom of the
1288-
// next page.
1289-
lastRow := it.Cur()
1290-
maxID = jobspb.JobID(*(lastRow[0].(*tree.DInt)))
1291-
return !morePages, maxID, nil
1215+
return len(candidates) < pageSize, candidates[len(candidates)-1], nil
12921216
}
12931217

12941218
// DeleteTerminalJobByID deletes the given job ID if it is in a

pkg/jobs/registry_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,14 @@ INSERT INTO t."%s" VALUES('a', 'foo');
196196
t.Fatal(err)
197197
}
198198

199+
finishedOpt := &finished
200+
if finished == (time.Time{}) {
201+
finishedOpt = nil
202+
}
203+
199204
var id jobspb.JobID
200205
db.QueryRow(t,
201-
`INSERT INTO system.jobs (status, created, job_type) VALUES ($1, $2, 'SCHEMA CHANGE') RETURNING id`, state, created).Scan(&id)
206+
`INSERT INTO system.jobs (status, created, job_type, finished) VALUES ($1, $2, 'SCHEMA CHANGE', $3) RETURNING id`, state, created, finishedOpt).Scan(&id)
202207
db.Exec(t, `INSERT INTO system.job_info (job_id, info_key, value) VALUES ($1, $2, $3)`, id, GetLegacyPayloadKey(), payload)
203208
db.Exec(t, `INSERT INTO system.job_info (job_id, info_key, value) VALUES ($1, $2, $3)`, id, GetLegacyProgressKey(), progress)
204209
return strconv.Itoa(int(id))
@@ -251,8 +256,7 @@ INSERT INTO t."%s" VALUES('a', 'foo');
251256
if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, ts.Add(time.Minute*-10)); err != nil {
252257
t.Fatal(err)
253258
}
254-
db.CheckQueryResults(t, selectJobsQuery, [][]string{
255-
{oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newRevertFailedJob}})
259+
db.CheckQueryResults(t, selectJobsQuery, [][]string{{oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newRevertFailedJob}})
256260

257261
// Delete the revert failed, and running jobs for the next run of the
258262
// test.
@@ -300,7 +304,7 @@ func TestRegistryGCPagination(t *testing.T) {
300304
require.NoError(t, err)
301305
var jobID jobspb.JobID
302306
db.QueryRow(t,
303-
`INSERT INTO system.jobs (status, created) VALUES ($1, $2) RETURNING id`,
307+
`INSERT INTO system.jobs (status, created, finished) VALUES ($1, $2, $2::timestamptz) RETURNING id`,
304308
StateCanceled, timeutil.Now().Add(-time.Hour)).Scan(&jobID)
305309
db.Exec(t, `INSERT INTO system.job_info (job_id, info_key, value) VALUES ($1, $2, $3)`,
306310
jobID, GetLegacyPayloadKey(), payload)

0 commit comments

Comments
 (0)