Skip to content
This repository was archived by the owner on Aug 6, 2025. It is now read-only.

Commit d32b7c3

Browse files
authored
DOP-2565: Create cron job to fail stuck jobs (#766)
1 parent 28ef394 commit d32b7c3

File tree

9 files changed

+213
-10
lines changed

9 files changed

+213
-10
lines changed

api/controllers/v1/jobs.ts

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,22 @@ export const HandleJobs = async (event: any = {}): Promise<any> => {
6262
// start the task , don't start the process before processing the notification
6363
const ecsServices = new ECSContainer(c, consoleLogger);
6464
const res = await ecsServices.execute(jobId);
65+
if (res) {
66+
await saveTaskId(jobId, res, consoleLogger);
67+
}
6568
consoleLogger.info(jobId, JSON.stringify(res));
6669
break;
6770
case JobStatus[JobStatus.inProgress]:
6871
queueUrl = c.get('jobUpdatesQueueUrl');
6972
await NotifyBuildProgress(jobId);
7073
break;
74+
case JobStatus[JobStatus.timedOut]:
75+
await NotifyBuildSummary(jobId);
76+
const taskId = body['taskId'];
77+
if (taskId) {
78+
await stopECSTask(taskId, consoleLogger);
79+
}
80+
break;
7181
case JobStatus[JobStatus.failed]:
7282
case JobStatus[JobStatus.completed]:
7383
queueUrl = c.get('jobUpdatesQueueUrl');
@@ -86,6 +96,44 @@ export const HandleJobs = async (event: any = {}): Promise<any> => {
8696
);
8797
};
8898

99+
export const FailStuckJobs = async () => {
100+
const client = new mongodb.MongoClient(c.get('dbUrl'));
101+
await client.connect();
102+
const db = client.db(c.get('dbName'));
103+
const consoleLogger = new ConsoleLogger();
104+
const jobRepository = new JobRepository(db, c, consoleLogger);
105+
106+
try {
107+
const hours = 8;
108+
await jobRepository.failStuckJobs(hours);
109+
} catch (err) {
110+
consoleLogger.error('FailStuckJobs', err);
111+
}
112+
};
113+
114+
async function saveTaskId(jobId: string, taskExecutionRes: any, consoleLogger: ConsoleLogger): Promise<void> {
115+
const taskArn = taskExecutionRes?.tasks[0]?.taskArn;
116+
if (!taskArn) return;
117+
118+
const client = new mongodb.MongoClient(c.get('dbUrl'));
119+
await client.connect();
120+
const db = client.db(c.get('dbName'));
121+
const jobRepository = new JobRepository(db, c, consoleLogger);
122+
123+
try {
124+
// Only interested in the actual task ID since the whole ARN might have sensitive information
125+
const taskId = taskArn.split('/').pop();
126+
await jobRepository.addTaskIdToJob(jobId, taskId);
127+
} catch (err) {
128+
consoleLogger.error('saveTaskId', err);
129+
}
130+
}
131+
132+
async function stopECSTask(taskId: string, consoleLogger: ConsoleLogger) {
133+
const ecs = new ECSContainer(c, consoleLogger);
134+
await ecs.stopZombieECSTask(taskId);
135+
}
136+
89137
async function retry(message: JobQueueMessage, consoleLogger: ConsoleLogger, url: string): Promise<any> {
90138
try {
91139
const tries = message.tries;
@@ -113,10 +161,6 @@ async function NotifyBuildSummary(jobId: string): Promise<any> {
113161
const jobRepository = new JobRepository(db, c, consoleLogger);
114162
// TODO: Make fullDocument be of type Job, validate existence
115163
const fullDocument = await jobRepository.getJobById(jobId);
116-
// TODO: Remove unused vars, and validate existing vars
117-
const branchesRepo = new BranchRepository(db, c, consoleLogger);
118-
const slackMsgs = fullDocument.comMessage;
119-
const jobTitle = fullDocument.title;
120164
const repoName = fullDocument.payload.repoName;
121165
const username = fullDocument.user;
122166
const slackConnector = new SlackConnector(consoleLogger, c);
@@ -126,7 +170,7 @@ async function NotifyBuildSummary(jobId: string): Promise<any> {
126170
consoleLogger.error(username, 'Entitlement failed');
127171
return;
128172
}
129-
const resp = await slackConnector.sendMessage(
173+
await slackConnector.sendMessage(
130174
await prepSummaryMessage(
131175
env,
132176
fullDocument,

serverless.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,11 @@ functions:
214214
- Effect: Allow
215215
Action:
216216
- "ecs:RunTask"
217+
- "ecs:StopTask"
218+
- "ecs:DescribeTasks"
217219
Resource:
218220
- arn:aws:ecs:${aws:region}:${aws:accountId}:task-definition/docs-worker-pool-${self:provider.stage}:*
221+
- arn:aws:ecs:${aws:region}:${aws:accountId}:task/docs-worker-pool-${self:provider.stage}/*
219222
- Effect: Allow
220223
Action:
221224
- ecs:DescribeTaskDefinition
@@ -238,6 +241,13 @@ functions:
238241
environment:
239242
<<: *webhook-env-core
240243

244+
v1FailStuckJobs:
245+
handler: api/controllers/v1/jobs.FailStuckJobs
246+
events:
247+
- schedule: rate(8 hours)
248+
environment:
249+
<<: *webhook-env-core
250+
241251
Outputs:
242252
JobsQueueURL:
243253
Description: Jobs Queue Url

src/entities/job.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ export enum JobStatus {
66
inProgress = 'inProgress',
77
completed = 'completed',
88
failed = 'failed',
9+
timedOut = 'timedOut',
910
}
1011

1112
// TODO: Formalize JobTypes

src/entities/queueMessage.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
1-
import { timingSafeEqual } from 'crypto';
21
import { JobStatus } from './job';
32

43
export class JobQueueMessage {
54
jobId: string;
65
jobStatus: JobStatus;
76
tries: number;
8-
constructor(jobId: string, status: JobStatus, tries = 0) {
7+
taskId?: string;
8+
9+
constructor(jobId: string, status: JobStatus, tries = 0, taskId?: string) {
910
this.jobId = jobId;
1011
this.jobStatus = status;
1112
this.tries = tries;
13+
14+
if (taskId) this.taskId = taskId;
1215
}
1316
}

src/repositories/baseRepository.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,40 @@ export abstract class BaseRepository {
7979
throw error;
8080
}
8181
}
82+
83+
protected async find(query: any, errorMsg: string, options?: mongodb.FindOptions): Promise<mongodb.FindCursor> {
84+
try {
85+
return this.promiseTimeoutS(this._config.get('MONGO_TIMEOUT_S'), this._collection.find(query, options), errorMsg);
86+
} catch (error) {
87+
this._logger.error(`${this._repoName}:find`, `Failed to find (${JSON.stringify(query)}) error: ${error}`);
88+
throw error;
89+
}
90+
}
91+
92+
protected async updateMany(query: any, update: any, errorMsg: string): Promise<boolean> {
93+
try {
94+
const updateResult = await this.promiseTimeoutS(
95+
this._config.get('MONGO_TIMEOUT_S'),
96+
this._collection.updateMany(query, update),
97+
errorMsg
98+
);
99+
// If no documents were found, this is not necessarily an error.
100+
if ((updateResult?.matchedCount ?? 0) < 1) {
101+
return false;
102+
}
103+
if ((updateResult?.modifiedCount ?? 0) < 1) {
104+
throw new DBError(`Failed to modify jobs with query ${JSON.stringify(query)}`);
105+
}
106+
} catch (error) {
107+
this._logger.error(
108+
`${this._repoName}:updateMany`,
109+
`Failed to update many with (${JSON.stringify(query)}) error: ${error}`
110+
);
111+
throw error;
112+
}
113+
return true;
114+
}
115+
82116
protected async updateOne(query: any, update: any, errorMsg: string): Promise<boolean> {
83117
try {
84118
const updateResult = await this.update(query, update, errorMsg);

src/repositories/jobRepository.ts

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,8 @@ export class JobRepository extends BaseRepository {
8989
return await this.findOneAndUpdateJob(query);
9090
}
9191

92-
async notify(jobId: string, url: string, status: JobStatus, delay: number) {
93-
await this._queueConnector.sendMessage(new JobQueueMessage(jobId, status), url, delay);
92+
async notify(jobId: string, url: string, status: JobStatus, delay: number, taskId?: string) {
93+
await this._queueConnector.sendMessage(new JobQueueMessage(jobId, status, 0, taskId), url, delay);
9494
}
9595

9696
async findOneAndUpdateJob(query): Promise<Job | null> {
@@ -205,4 +205,63 @@ export class JobRepository extends BaseRepository {
205205
}
206206
return bRet;
207207
}
208+
209+
async failStuckJobs(hours: number) {
210+
const hourInMS = 1000 * 60 * 60;
211+
const currentTime = new Date();
212+
const failReason = `Job timeout error: Job has been running for at least ${hours} hours`;
213+
214+
const query = {
215+
status: JobStatus.inProgress,
216+
startTime: {
217+
$lte: new Date(currentTime.getTime() - hourInMS * hours),
218+
},
219+
};
220+
const findOptions = {
221+
projection: {
222+
_id: 1,
223+
taskId: 1,
224+
},
225+
};
226+
const update = {
227+
$set: {
228+
status: JobStatus.timedOut,
229+
endTime: currentTime,
230+
error: { time: currentTime.toString(), reason: failReason },
231+
},
232+
};
233+
234+
// Mongo's updateMany does not return the IDs of documents changed, so we find them first
235+
const stuckJobsCursor = await this.find(query, `Mongo Timeout Error: Timed out finding stuck jobs.`, findOptions);
236+
const stuckJobs = await stuckJobsCursor.toArray();
237+
this._logger.info('failStuckJobs', `Found ${stuckJobs.length} jobs.`);
238+
// No stuck jobs found
239+
if (!stuckJobs.length) return;
240+
241+
// Since the same query is split into 2 operations, there's a very small chance
242+
// that there might be a mismatch between jobs updated in the db vs. in the queue.
243+
const bRet = await this.updateMany(query, update, `Mongo Timeout Error: Timed out updating stuck jobs.`);
244+
if (!bRet) {
245+
throw new DBError('failStuckJobs: Unable to update stuck jobs.');
246+
}
247+
248+
const jobUpdatesQueueUrl: string = this._config.get('jobUpdatesQueueUrl');
249+
await Promise.all(
250+
stuckJobs.map((stuckJob: any) => {
251+
const id: string = stuckJob._id.toString();
252+
return this.notify(id, jobUpdatesQueueUrl, JobStatus.timedOut, 0, stuckJob.taskId);
253+
})
254+
);
255+
}
256+
257+
async addTaskIdToJob(id: string, taskId: string): Promise<void> {
258+
const query = { _id: new objectId(id) };
259+
const update = {
260+
$set: {
261+
taskId: taskId,
262+
},
263+
};
264+
265+
await this.updateOne(query, update, `Mongo Timeout Error: Timed out while updating taskId for jobId: ${id}`);
266+
}
208267
}

src/services/containerServices.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,24 @@ export class ECSContainer implements IContainerServices {
8787
}
8888
return '';
8989
}
90+
91+
async stopZombieECSTask(taskId: string) {
92+
const clusterName = this._config.get<string>('taskDefinitionFamily');
93+
try {
94+
const res = await this._client.describeTasks({
95+
cluster: clusterName,
96+
tasks: [taskId],
97+
});
98+
99+
// Only stop ECS task if it's still running
100+
if (res.tasks?.[0].lastStatus === 'RUNNING') {
101+
await this._client.stopTask({
102+
cluster: clusterName,
103+
task: taskId,
104+
});
105+
}
106+
} catch (error) {
107+
throw error;
108+
}
109+
}
90110
}

tests/unit/repositories/jobRepository.test.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { JobRepository } from '../../../src/repositories/jobRepository';
22
import { getBuildJobPlain } from '../../data/jobDef';
33
import { DBRepositoryHelper } from '../../utils/repositoryHelper';
44
import { TestDataProvider } from '../../data/data';
5+
import { ObjectId } from 'mongodb';
56

67
describe('Job Repository Tests', () => {
78
let jobRepo: JobRepository;
@@ -179,6 +180,14 @@ describe('Job Repository Tests', () => {
179180
});
180181
});
181182

183+
describe('failStuckJobs Tests', () => {
184+
test('failStuckJobs succeeds', async () => {
185+
setupForFindSuccess();
186+
setupForUpdateManySuccess();
187+
await expect(jobRepo.failStuckJobs(8)).resolves.toEqual(undefined);
188+
});
189+
});
190+
182191
function validateSuccessfulUpdate(testData: any) {
183192
expect(dbRepoHelper.collection.updateOne).toBeCalledTimes(1);
184193
expect(dbRepoHelper.collection.updateOne).toBeCalledWith(testData.query, testData.update);
@@ -190,4 +199,21 @@ describe('Job Repository Tests', () => {
190199
jest.spyOn(jobRepo, 'notify').mockResolvedValueOnce(true);
191200
dbRepoHelper.config.get.calledWith('MONGO_TIMEOUT_S').mockReturnValueOnce(1);
192201
}
202+
203+
function setupForFindSuccess() {
204+
dbRepoHelper.collection.find.mockReturnValueOnce({
205+
toArray: () => [
206+
{ _id: new ObjectId(), status: 'inProgress' },
207+
{ _id: new ObjectId(), status: 'inQueue' },
208+
{ _id: new ObjectId(), status: 'inProgress' },
209+
],
210+
});
211+
dbRepoHelper.config.get.calledWith('MONGO_TIMEOUT_S').mockReturnValueOnce(1);
212+
}
213+
214+
function setupForUpdateManySuccess() {
215+
dbRepoHelper.collection.updateMany.mockReturnValueOnce({ matchedCount: 2, modifiedCount: 2 });
216+
jest.spyOn(jobRepo, 'notify').mockResolvedValue(true);
217+
dbRepoHelper.config.get.calledWith('MONGO_TIMEOUT_S').mockReturnValueOnce(1);
218+
}
193219
});

tests/utils/repositoryHelper.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { IConfig } from 'config';
22
import { mockDeep } from 'jest-mock-extended';
3-
import { Db } from 'mongodb';
3+
import { Db, FindCursor, FindOptions } from 'mongodb';
44
import { JobRepository } from '../../src/repositories/jobRepository';
55
import { RepoEntitlementsRepository } from '../../src/repositories/repoEntitlementsRepository';
66
import { RepoBranchesRepository } from '../../src/repositories/repoBranchesRepository';
@@ -11,6 +11,8 @@ export class DBRepositoryHelper {
1111
logger: ILogger;
1212
db: Db;
1313
updateOne: (query: any, update: any, errorMsg: string) => Promise<boolean>;
14+
updateMany: (query: any, update: any, errorMsg: string) => Promise<boolean>;
15+
find: (query: any, errorMsg: string, options?: FindOptions) => Promise<FindCursor>;
1416
findOne: (query: any, errorMsg: string) => Promise<any>;
1517
findOneAndUpdate: (query: any, update: any, options: any, errorMsg: string) => Promise<any>;
1618
collection: any;
@@ -29,10 +31,14 @@ export class DBRepositoryHelper {
2931
this.logger = mockDeep<ILogger>();
3032
this.db = mockDeep<Db>();
3133
this.updateOne = jest.fn();
34+
this.updateMany = jest.fn();
35+
this.find = jest.fn();
3236
this.findOne = jest.fn();
3337
this.findOneAndUpdate = jest.fn();
3438
this.collection = {
3539
updateOne: this.updateOne,
40+
updateMany: this.updateMany,
41+
find: this.find,
3642
findOne: this.findOne,
3743
findOneAndUpdate: this.findOneAndUpdate,
3844
};

0 commit comments

Comments
 (0)