Skip to content

Commit 11df0f5

Browse files
committed
HARMONY-2016: Fix race condition in work reaper.
1 parent bba019b commit 11df0f5

File tree

4 files changed

+33
-30
lines changed

4 files changed

+33
-30
lines changed

services/cron-service/app/cronjobs/work-reaper.ts

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { subMinutes } from 'date-fns';
12
import { JobStatus, terminalStates } from '../../../harmony/app/models/job';
23
import {
34
deleteWorkItemsById, getWorkItemIdsByJobUpdateAgeAndStatus,
@@ -10,12 +11,12 @@ import env from '../util/env';
1011
import { CronJob } from './cronjob';
1112

1213
/**
13-
* Find work items that are older than notUpdatedForMinutes and delete them.
14-
* @param notUpdatedForMinutes - upper limit on the duration since the last update
14+
* Find work items that are older than updatedAtCutoff and delete them.
15+
* @param updatedAtCutoff - updatedAt cutoff time since the last update
1516
* @param jobStatus - a list of terminal job statuses
1617
* @returns Resolves when the request is complete
1718
*/
18-
async function deleteTerminalWorkItems(ctx: Context, notUpdatedForMinutes: number, jobStatus: JobStatus[]): Promise < void> {
19+
async function deleteTerminalWorkItems(ctx: Context, updatedAtCutoff: Date, jobStatus: JobStatus[]): Promise < void> {
1920
let done = false;
2021
let startingId = 0;
2122
let totalDeleted = 0;
@@ -26,7 +27,7 @@ async function deleteTerminalWorkItems(ctx: Context, notUpdatedForMinutes: numbe
2627
while (!done) {
2728
try {
2829
const workItemIds = await getWorkItemIdsByJobUpdateAgeAndStatus(
29-
db, notUpdatedForMinutes, jobStatus, startingId, batchSize,
30+
db, updatedAtCutoff, jobStatus, startingId, batchSize,
3031
);
3132
if (workItemIds.length > 0) {
3233
const numItemsDeleted = await deleteWorkItemsById(db, workItemIds);
@@ -51,12 +52,12 @@ async function deleteTerminalWorkItems(ctx: Context, notUpdatedForMinutes: numbe
5152

5253

5354
/**
54-
* Find workflow steps that are older than notUpdatedForMinutes and delete them.
55-
* @param notUpdatedForMinutes - upper limit on the duration since the last update
55+
* Find workflow steps that are older than updatedAtCutoff and delete them.
56+
* @param updatedAtCutoff - updatedAt cutoff time since the last update
5657
* @param jobStatus - a list of terminal job statuses
5758
* @returns Resolves when the request is complete
5859
*/
59-
async function deleteTerminalWorkflowSteps(ctx: Context, notUpdatedForMinutes: number, jobStatus: JobStatus[]): Promise < void> {
60+
async function deleteTerminalWorkflowSteps(ctx: Context, updatedAtCutoff: Date, jobStatus: JobStatus[]): Promise < void> {
6061
let done = false;
6162
let startingId = 0;
6263
let totalDeleted = 0;
@@ -67,7 +68,7 @@ async function deleteTerminalWorkflowSteps(ctx: Context, notUpdatedForMinutes: n
6768
while (!done) {
6869
try {
6970
const workflowSteps = await getWorkflowStepIdsByJobUpdateAgeAndStatus(
70-
db, notUpdatedForMinutes, jobStatus, startingId, batchSize,
71+
db, updatedAtCutoff, jobStatus, startingId, batchSize,
7172
);
7273
if (workflowSteps.length > 0) {
7374
const numItemsDeleted = await deleteWorkflowStepsById(db, workflowSteps);
@@ -99,14 +100,15 @@ export class WorkReaper extends CronJob {
99100
const { logger } = ctx;
100101
logger.debug('Running');
101102
try {
103+
const updatedAtCutoff = subMinutes(new Date(), env.reapableWorkAgeMinutes);
102104
await deleteTerminalWorkItems(
103105
ctx,
104-
env.reapableWorkAgeMinutes,
106+
updatedAtCutoff,
105107
terminalStates,
106108
);
107109
await deleteTerminalWorkflowSteps(
108110
ctx,
109-
env.reapableWorkAgeMinutes,
111+
updatedAtCutoff,
110112
terminalStates,
111113
);
112114
} catch (e) {

services/cron-service/test/work-reaper.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { describe, it } from 'mocha';
22
import { expect } from 'chai';
3+
import { subMinutes } from 'date-fns';
34
import MockDate from 'mockdate';
45
import { buildJob } from './helpers/jobs';
56
import { buildWorkflowStep } from './helpers/workflow-steps';
@@ -53,18 +54,19 @@ describe('WorkReaper-related functions', function () {
5354
describe('.getWorkItemIdsByJobUpdateAgeAndStatus', function () {
5455
it('returns the work items and steps of jobs that have not been updated for n minutes', async function () {
5556
MockDate.set(newDate);
57+
const updatedAtCutoff = subMinutes(new Date(), 60);
5658

5759
const itemIds = await getWorkItemIdsByJobUpdateAgeAndStatus(
5860
this.trx,
59-
60,
61+
updatedAtCutoff,
6062
[JobStatus.CANCELED, JobStatus.SUCCESSFUL, JobStatus.FAILED],
6163
);
6264
expect(itemIds.length).to.eql(3);
6365
expect(itemIds).to.have.same.members([1, 2, 3]);
6466

6567
const stepIds = await getWorkflowStepIdsByJobUpdateAgeAndStatus(
6668
this.trx,
67-
60,
69+
updatedAtCutoff,
6870
[JobStatus.CANCELED, JobStatus.SUCCESSFUL, JobStatus.FAILED],
6971
);
7072
expect(stepIds.length).to.eql(3);
@@ -75,17 +77,18 @@ describe('WorkReaper-related functions', function () {
7577

7678
it('returns no work items / steps when they were created recently', async function () {
7779
MockDate.set(oldDate);
80+
const updatedAtCutoff = subMinutes(new Date(), 60);
7881

7982
const itemIds = await getWorkItemIdsByJobUpdateAgeAndStatus(
8083
this.trx,
81-
60,
84+
updatedAtCutoff,
8285
[JobStatus.CANCELED, JobStatus.SUCCESSFUL],
8386
);
8487
expect(itemIds.length).to.eql(0);
8588

8689
const stepIds = await getWorkflowStepIdsByJobUpdateAgeAndStatus(
8790
this.trx,
88-
60,
91+
updatedAtCutoff,
8992
[JobStatus.CANCELED, JobStatus.SUCCESSFUL],
9093
);
9194
expect(stepIds.length).to.eql(0);
@@ -96,11 +99,12 @@ describe('WorkReaper-related functions', function () {
9699
describe('.deleteWorkItemsById', function () {
97100
it('deletes work items and steps by id', async function () {
98101
MockDate.set(newDate);
102+
const updatedAtCutoff = subMinutes(new Date(), 60);
99103

100104
// get the old work items
101105
const beforeDeletionItemIds = await getWorkItemIdsByJobUpdateAgeAndStatus(
102106
this.trx,
103-
60,
107+
updatedAtCutoff,
104108
[JobStatus.CANCELED, JobStatus.SUCCESSFUL, JobStatus.FAILED],
105109
);
106110
expect(beforeDeletionItemIds.length).to.eql(3);
@@ -110,15 +114,15 @@ describe('WorkReaper-related functions', function () {
110114
await deleteWorkItemsById(this.trx, beforeDeletionItemIds);
111115
const afterDeletionItemIds = await getWorkItemIdsByJobUpdateAgeAndStatus(
112116
this.trx,
113-
60,
117+
updatedAtCutoff,
114118
[JobStatus.CANCELED, JobStatus.SUCCESSFUL, JobStatus.FAILED],
115119
);
116120
expect(afterDeletionItemIds.length).to.eql(0);
117121

118122
// get the old steps
119123
const beforeDeletionStepIds = await getWorkflowStepIdsByJobUpdateAgeAndStatus(
120124
this.trx,
121-
60,
125+
updatedAtCutoff,
122126
[JobStatus.CANCELED, JobStatus.SUCCESSFUL, JobStatus.FAILED],
123127
);
124128
expect(beforeDeletionStepIds.length).to.eql(3);
@@ -128,7 +132,7 @@ describe('WorkReaper-related functions', function () {
128132
await deleteWorkflowStepsById(this.trx, beforeDeletionItemIds);
129133
const afterDeletionStepIds = await getWorkflowStepIdsByJobUpdateAgeAndStatus(
130134
this.trx,
131-
60,
135+
updatedAtCutoff,
132136
[JobStatus.CANCELED, JobStatus.SUCCESSFUL, JobStatus.FAILED],
133137
);
134138
expect(afterDeletionStepIds.length).to.eql(0);

services/harmony/app/models/work-item.ts

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -525,11 +525,11 @@ export async function getWorkItemsByJobIdAndStepIndex(
525525
}
526526

527527
/**
528-
* Get all work item ids associated with jobs that haven't been updated for a
529-
* certain amount of minutes and that have a particular JobStatus
528+
* Get all work item ids associated with jobs that haven't been updated
529+
* since updatedAtCutoff and that have a particular JobStatus
530530
* @param tx - the transaction to use for querying
531-
* @param notUpdatedForMinutes - jobs with updateAt older than notUpdatedForMinutes ago
532-
* will be joined with the returned work items
531+
* @param updatedAtCutoff - jobs with updatedAt older than updatedAtCutoff will be
532+
* joined with the returned work items
533533
* @param jobStatus - only jobs with this status will be joined
534534
* @param startingId - the work item id to begin the query with, i.e. query work items with id greater than startingId
535535
* @param batchSize - the batch size
@@ -538,16 +538,15 @@ export async function getWorkItemsByJobIdAndStepIndex(
538538
*/
539539
export async function getWorkItemIdsByJobUpdateAgeAndStatus(
540540
tx: Transaction,
541-
notUpdatedForMinutes: number,
541+
updatedAtCutoff: Date,
542542
jobStatus: JobStatus[],
543543
startingId = 0,
544544
batchSize = 2000,
545545
): Promise<number[]> {
546-
const pastDate = subMinutes(new Date(), notUpdatedForMinutes);
547546
const workItemIds = (await tx(`${WorkItem.table} as w`)
548547
.innerJoin(Job.table, 'w.jobID', '=', `${Job.table}.jobID`)
549548
.select(['w.id'])
550-
.where(`${Job.table}.updatedAt`, '<', pastDate)
549+
.where(`${Job.table}.updatedAt`, '<', updatedAtCutoff)
551550
.whereIn(`${Job.table}.status`, jobStatus)
552551
.where('w.id', '>', startingId)
553552
.orderBy('w.id', 'asc')

services/harmony/app/models/workflow-steps.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
/* eslint-disable @typescript-eslint/dot-notation */
22
import env from '../util/env';
3-
import { subMinutes } from 'date-fns';
43
import _ from 'lodash';
54
import { Transaction } from '../util/db';
65
import { Job, JobStatus } from './job';
@@ -234,7 +233,7 @@ export async function getWorkflowStepByJobIdServiceId(
234233
* Get all workflow step ids associated with jobs that haven't been updated for a
235234
* certain amount of minutes and that have a particular JobStatus
236235
* @param tx - the transaction to use for querying
237-
* @param notUpdatedForMinutes - jobs with updateAt older than notUpdatedForMinutes ago will be
236+
* @param updatedAtCutoff - jobs with updatedAt older than updatedAtCutoff will be
238237
* joined with the returned workflow steps
239238
* @param jobStatus - only jobs with this status will be joined
240239
* @param startingId - the workflow step id to begin the query with, i.e. query workflow steps
@@ -245,16 +244,15 @@ export async function getWorkflowStepByJobIdServiceId(
245244
*/
246245
export async function getWorkflowStepIdsByJobUpdateAgeAndStatus(
247246
tx: Transaction,
248-
notUpdatedForMinutes: number,
247+
updatedAtCutoff: Date,
249248
jobStatus: JobStatus[],
250249
startingId = 0,
251250
batchSize = 2000,
252251
): Promise<number[]> {
253-
const pastDate = subMinutes(new Date(), notUpdatedForMinutes);
254252
const workflowStepIds = (await tx(WorkflowStep.table)
255253
.innerJoin(Job.table, `${WorkflowStep.table}.jobID`, '=', `${Job.table}.jobID`)
256254
.select([`${WorkflowStep.table}.id`])
257-
.where(`${Job.table}.updatedAt`, '<', pastDate)
255+
.where(`${Job.table}.updatedAt`, '<', updatedAtCutoff)
258256
.whereIn(`${Job.table}.status`, jobStatus)
259257
.where(`${WorkflowStep.table}.id`, '>', startingId)
260258
.orderBy(`${WorkflowStep.table}.id`, 'asc')

0 commit comments

Comments
 (0)