-
Notifications
You must be signed in to change notification settings - Fork 1
better handling for stuck tasks: clear hanging tasks and avoid running tasks that were scheduled for a long time ago #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8327bde
c563ad4
b454f1f
ad196da
0f4ccb6
c79582e
e1dae7d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| name: Lint | ||
| on: | ||
| pull_request: | ||
| push: | ||
| jobs: | ||
| lint: | ||
| runs-on: ubuntu-latest | ||
| steps: | ||
| - uses: actions/checkout@v3 | ||
| - name: Setting up the node version | ||
| uses: actions/setup-node@v3 | ||
| with: | ||
| node-version: 20.19.0 | ||
| - name: setup project | ||
| run: npm i | ||
| - name: run lint | ||
| run: | | ||
| npm run lint |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| 'use strict'; | ||
|
|
||
| const config = require('@masteringjs/eslint-config'); | ||
| const { defineConfig } = require('eslint/config'); | ||
|
|
||
| module.exports = defineConfig([ | ||
| { | ||
| files: ['src/*.js'], | ||
| languageOptions: { | ||
| sourceType: 'commonjs', | ||
| globals: { | ||
| fetch: true, | ||
| setTimeout: true, | ||
| process: true, | ||
| console: true, | ||
| clearTimeout: true | ||
| } | ||
| }, | ||
| extends: [config] | ||
| } | ||
| ]); |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -8,13 +8,21 @@ const taskSchema = new mongoose.Schema({ | |||||
| type: String, | ||||||
| required: true | ||||||
| }, | ||||||
| // The time at which the task was scheduled to run. The task will start running at or after this time. | ||||||
| scheduledAt: { | ||||||
| type: Date, | ||||||
| required: true | ||||||
| }, | ||||||
| // If the task has not started running by this time, it will be marked as `scheduling_timed_out` | ||||||
| // and the next scheduled task will be created. | ||||||
| schedulingTimeoutAt: { | ||||||
| type: Date | ||||||
| }, | ||||||
| // The next time this task will be scheduled to run. | ||||||
| nextScheduledAt: { | ||||||
| type: Date | ||||||
| }, | ||||||
| // When this task is done, automatically schedule the next task for scheduledAt + repeatAfterMS | ||||||
| repeatAfterMS: { | ||||||
| type: Number | ||||||
| }, | ||||||
|
|
@@ -30,6 +38,10 @@ const taskSchema = new mongoose.Schema({ | |||||
| finishedRunningAt: { | ||||||
| type: Date | ||||||
| }, | ||||||
| // If this task is still running after this time, it will be marked as `timed_out`. | ||||||
| timeoutAt: { | ||||||
| type: Date | ||||||
| }, | ||||||
| previousTaskId: { | ||||||
| type: mongoose.ObjectId | ||||||
| }, | ||||||
|
|
@@ -42,7 +54,22 @@ const taskSchema = new mongoose.Schema({ | |||||
| status: { | ||||||
| type: String, | ||||||
| default: 'pending', | ||||||
| enum: ['pending', 'in_progress', 'succeeded', 'failed', 'cancelled'] | ||||||
| enum: [ | ||||||
| // Waiting to run | ||||||
| 'pending', | ||||||
| // Currently running | ||||||
| 'in_progress', | ||||||
| // Completed successfully | ||||||
| 'succeeded', | ||||||
| // Error occurred while executing the task | ||||||
| 'failed', | ||||||
| // Cancelled by user | ||||||
| 'cancelled', | ||||||
| // Task execution timed out | ||||||
| 'timed_out', | ||||||
| // Timed out waiting for a worker to pick up the task | ||||||
| 'scheduling_timed_out' | ||||||
| ] | ||||||
| }, | ||||||
| result: 'Mixed', | ||||||
| error: { | ||||||
|
|
@@ -63,8 +90,8 @@ taskSchema.methods.log = function log(message, extra) { | |||||
|
|
||||||
| taskSchema.statics.cancelTask = async function cancelTask(filter) { | ||||||
| if (filter != null) { | ||||||
| filter = { $and: [{ status: 'pending' }, filter] } | ||||||
| }; | ||||||
| filter = { $and: [{ status: 'pending' }, filter] }; | ||||||
| } | ||||||
| const task = await this.findOneAndUpdate(filter, { status: 'cancelled', cancelledAt: new Date() }, { returnDocument: 'after' }); | ||||||
| return task; | ||||||
| }; | ||||||
|
|
@@ -92,7 +119,7 @@ taskSchema.statics.startPolling = function startPolling(options) { | |||||
| doPoll.call(this); | ||||||
| this._cancel = () => { | ||||||
| cancelled = true; | ||||||
| clearTimeout(timeout) | ||||||
| clearTimeout(timeout); | ||||||
| }; | ||||||
| } | ||||||
| return this._cancel; | ||||||
|
|
@@ -101,6 +128,12 @@ taskSchema.statics.startPolling = function startPolling(options) { | |||||
| if (cancelled) { | ||||||
| return; | ||||||
| } | ||||||
|
|
||||||
| const Task = this; | ||||||
|
|
||||||
| // Expire tasks that have timed out (refactored to separate function) | ||||||
| await Task.expireTimedOutTasks(); | ||||||
|
|
||||||
| this._currentPoll = this.poll(pollOptions); | ||||||
| await this._currentPoll.then( | ||||||
| () => { | ||||||
|
|
@@ -114,12 +147,68 @@ taskSchema.statics.startPolling = function startPolling(options) { | |||||
| } | ||||||
| }; | ||||||
|
|
||||||
| // Refactor logic for expiring timed out tasks here | ||||||
| taskSchema.statics.expireTimedOutTasks = async function expireTimedOutTasks() { | ||||||
| const now = time.now(); | ||||||
| const Task = this; | ||||||
| while (true) { | ||||||
| const task = await Task.findOneAndUpdate( | ||||||
| { | ||||||
| status: 'in_progress', | ||||||
| startedRunningAt: { $exists: true }, | ||||||
| timeoutAt: { $exists: true, $lte: now } | ||||||
| }, | ||||||
| { | ||||||
| $set: { | ||||||
| status: 'timed_out', | ||||||
| finishedRunningAt: now | ||||||
| } | ||||||
| }, | ||||||
| { new: true } | ||||||
| ); | ||||||
|
|
||||||
| if (!task) { | ||||||
| break; | ||||||
| } | ||||||
|
|
||||||
| await _handleRepeatingTask(Task, task); | ||||||
| } | ||||||
| }; | ||||||
|
|
||||||
| taskSchema.statics.registerHandler = async function registerHandler(name, fn) { | ||||||
| this._handlers = this._handlers || new Map(); | ||||||
| this._handlers.set(name, fn); | ||||||
| return this; | ||||||
| }; | ||||||
|
|
||||||
| async function _handleRepeatingTask(Task, task) { | ||||||
| if (task.nextScheduledAt != null) { | ||||||
| const scheduledAt = new Date(task.nextScheduledAt); | ||||||
| return Task.create({ | ||||||
| name: task.name, | ||||||
| scheduledAt, | ||||||
| repeatAfterMS: task.repeatAfterMS, | ||||||
| params: task.params, | ||||||
| previousTaskId: task._id, | ||||||
| originalTaskId: task.originalTaskId || task._id, | ||||||
| timeoutMS: task.timeoutMS, | ||||||
| schedulingTimeoutAt: scheduledAt.valueOf() + 10 * 60 * 1000 | ||||||
| }); | ||||||
| } else if (task.repeatAfterMS != null) { | ||||||
| const scheduledAt = new Date(task.scheduledAt.valueOf() + task.repeatAfterMS); | ||||||
| return Task.create({ | ||||||
| name: task.name, | ||||||
| scheduledAt, | ||||||
| repeatAfterMS: task.repeatAfterMS, | ||||||
| params: task.params, | ||||||
| previousTaskId: task._id, | ||||||
| originalTaskId: task.originalTaskId || task._id, | ||||||
| timeoutMS: task.timeoutMS, | ||||||
| schedulingTimeoutAt: scheduledAt.valueOf() + 10 * 60 * 1000 | ||||||
|
||||||
| }); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| taskSchema.statics.registerHandlers = async function registerHandlers(obj, prefix) { | ||||||
| this._handlers = this._handlers || new Map(); | ||||||
| for (const key of Object.keys(obj)) { | ||||||
|
|
@@ -145,21 +234,23 @@ taskSchema.statics.poll = async function poll(opts) { | |||||
| const additionalParams = workerName ? { workerName } : {}; | ||||||
|
|
||||||
| while (true) { | ||||||
| let tasksInProgress = []; | ||||||
| const tasksInProgress = []; | ||||||
| for (let i = 0; i < parallel; ++i) { | ||||||
| const now = time.now(); | ||||||
| const task = await this.findOneAndUpdate( | ||||||
| { status: 'pending', scheduledAt: { $lte: now } }, | ||||||
| { status: 'in_progress', startedRunningAt: now, ...additionalParams }, | ||||||
| { | ||||||
| status: 'in_progress', | ||||||
| timeoutAt: new Date(now.valueOf() + 10 * 60 * 1000), // 10 minutes from startedRunningAt | ||||||
|
||||||
| ...additionalParams | ||||||
| }, | ||||||
| { new: false } | ||||||
| ); | ||||||
|
|
||||||
| if (task == null || task.status !== 'pending') { | ||||||
| break; | ||||||
| } | ||||||
|
|
||||||
| task.status = 'in_progress'; | ||||||
|
|
||||||
| tasksInProgress.push(this.execute(task)); | ||||||
| } | ||||||
|
|
||||||
|
|
@@ -176,6 +267,18 @@ taskSchema.statics.execute = async function(task) { | |||||
| return null; | ||||||
| } | ||||||
|
|
||||||
| task.status = 'in_progress'; | ||||||
| const now = time.now(); | ||||||
| task.startedRunningAt = now; | ||||||
|
|
||||||
| if (task.schedulingTimeoutAt && task.schedulingTimeoutAt < now) { | ||||||
| task.status = 'scheduling_timed_out'; | ||||||
| task.finishedRunningAt = now; | ||||||
| await task.save(); | ||||||
| await _handleRepeatingTask(this, task); | ||||||
| return task; | ||||||
| } | ||||||
|
|
||||||
| try { | ||||||
| let result = null; | ||||||
| if (typeof task.timeoutMS === 'number') { | ||||||
|
|
@@ -204,27 +307,7 @@ taskSchema.statics.execute = async function(task) { | |||||
| await task.save(); | ||||||
| } | ||||||
|
|
||||||
| if (task.nextScheduledAt != null) { | ||||||
| await this.create({ | ||||||
| name: task.name, | ||||||
| scheduledAt: new Date(task.nextScheduledAt), | ||||||
| repeatAfterMS: task.repeatAfterMS, | ||||||
| params: task.params, | ||||||
| previousTaskId: task._id, | ||||||
| originalTaskId: task.originalTaskId || task._id, | ||||||
| timeoutMS: task.timeoutMS | ||||||
| }); | ||||||
| } else if (task.repeatAfterMS != null) { | ||||||
| await this.create({ | ||||||
| name: task.name, | ||||||
| scheduledAt: new Date(task.scheduledAt.valueOf() + task.repeatAfterMS), | ||||||
| repeatAfterMS: task.repeatAfterMS, | ||||||
| params: task.params, | ||||||
| previousTaskId: task._id, | ||||||
| originalTaskId: task.originalTaskId || task._id, | ||||||
| timeoutMS: task.timeoutMS | ||||||
| }); | ||||||
| } | ||||||
| await _handleRepeatingTask(this, task); | ||||||
|
|
||||||
| return task; | ||||||
| }; | ||||||
|
|
@@ -241,6 +324,7 @@ taskSchema.statics.schedule = async function schedule(name, scheduledAt, params, | |||||
| scheduledAt, | ||||||
| params, | ||||||
| repeatAfterMS, | ||||||
| schedulingTimeoutAt: scheduledAt.valueOf() + 10 * 60 * 1000, | ||||||
|
||||||
| schedulingTimeoutAt: scheduledAt.valueOf() + 10 * 60 * 1000, | |
| schedulingTimeoutAt: scheduledAt.valueOf() + time.TEN_MINUTES_MS, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Magic number 10 * 60 * 1000 (10 minutes) is repeated multiple times. Consider extracting this into a named constant like
DEFAULT_SCHEDULING_TIMEOUT_MSto improve maintainability.