diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..f550a4e --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,18 @@ +name: Lint +on: + pull_request: + push: +jobs: + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Setting up the node version + uses: actions/setup-node@v3 + with: + node-version: 20.19.0 + - name: setup project + run: npm i + - name: run lint + run: | + npm run lint diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b2ed7db..951ca8d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -11,9 +11,9 @@ jobs: strategy: fail-fast: false matrix: - node: [18, 20] - os: [ubuntu-20.04] - mongo: [5.0.8] + node: [22] + os: [ubuntu-22.04] + mongo: [8.2.0] name: Node ${{ matrix.node }} MongoDB ${{ matrix.mongo }} steps: - uses: actions/checkout@a12a3943b4bdde767164f792f33f40b04645d846 # v3 @@ -27,11 +27,10 @@ jobs: - name: Setup run: | - wget -q https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-ubuntu2004-${{ matrix.mongo }}.tgz - tar xf mongodb-linux-x86_64-ubuntu2004-${{ matrix.mongo }}.tgz + wget -q https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-ubuntu2204-${{ matrix.mongo }}.tgz + tar xf mongodb-linux-x86_64-ubuntu2204-${{ matrix.mongo }}.tgz mkdir -p ./data/db/27017 ./data/db/27000 - ./mongodb-linux-x86_64-ubuntu2004-${{ matrix.mongo }}/bin/mongod --setParameter ttlMonitorSleepSecs=1 --fork --dbpath ./data/db/27017 --syslog --port 27017 + ./mongodb-linux-x86_64-ubuntu2204-${{ matrix.mongo }}/bin/mongod --setParameter ttlMonitorSleepSecs=1 --fork --dbpath ./data/db/27017 --syslog --port 27017 sleep 2 - mongod --version - echo `pwd`/mongodb-linux-x86_64-ubuntu2004-${{ matrix.mongo }}/bin >> $GITHUB_PATH + echo `pwd`/mongodb-linux-x86_64-ubuntu2204-${{ matrix.mongo }}/bin >> $GITHUB_PATH - run: npm test diff --git a/eslint.config.js b/eslint.config.js new file mode 100644 index 0000000..bdfef10 --- /dev/null +++ b/eslint.config.js @@ -0,0 +1,21 @@ +'use strict'; + +const config = require('@masteringjs/eslint-config'); +const { defineConfig } = require('eslint/config'); + +module.exports = defineConfig([ + { + files: ['src/*.js'], + languageOptions: { + sourceType: 'commonjs', + globals: { + fetch: true, + setTimeout: true, + process: true, + console: true, + clearTimeout: true + } + }, + extends: [config] + } +]); diff --git a/package.json b/package.json index 076df89..e378089 100644 --- a/package.json +++ b/package.json @@ -3,6 +3,7 @@ "version": "0.3.0", "private": false, "scripts": { + "lint": "eslint .", "test": "mocha test/*.test.js" }, "repository": { @@ -16,6 +17,8 @@ "mongoose": "^6.7.0 || 7.x || 8.x" }, "devDependencies": { + "@masteringjs/eslint-config": "0.1.1", + "eslint": "9.30.0", "mocha": "10.1.0", "mongoose": "8.x", "sinon": "15.2.0" diff --git a/src/taskSchema.js b/src/taskSchema.js index f791d2f..b15e20b 100644 --- a/src/taskSchema.js +++ b/src/taskSchema.js @@ -8,13 +8,21 @@ const taskSchema = new mongoose.Schema({ type: String, required: true }, + // The time at which the task was scheduled to run. The task will start running at or after this time. scheduledAt: { type: Date, required: true }, + // If the task has not started running by this time, it will be marked as `scheduling_timed_out` + // and the next scheduled task will be created. + schedulingTimeoutAt: { + type: Date + }, + // The next time this task will be scheduled to run. nextScheduledAt: { type: Date }, + // When this task is done, automatically schedule the next task for scheduledAt + repeatAfterMS repeatAfterMS: { type: Number }, @@ -30,6 +38,10 @@ const taskSchema = new mongoose.Schema({ finishedRunningAt: { type: Date }, + // If this task is still running after this time, it will be marked as `timed_out`. + timeoutAt: { + type: Date + }, previousTaskId: { type: mongoose.ObjectId }, @@ -42,7 +54,22 @@ const taskSchema = new mongoose.Schema({ status: { type: String, default: 'pending', - enum: ['pending', 'in_progress', 'succeeded', 'failed', 'cancelled'] + enum: [ + // Waiting to run + 'pending', + // Currently running + 'in_progress', + // Completed successfully + 'succeeded', + // Error occurred while executing the task + 'failed', + // Cancelled by user + 'cancelled', + // Task execution timed out + 'timed_out', + // Timed out waiting for a worker to pick up the task + 'scheduling_timed_out' + ] }, result: 'Mixed', error: { @@ -63,8 +90,8 @@ taskSchema.methods.log = function log(message, extra) { taskSchema.statics.cancelTask = async function cancelTask(filter) { if (filter != null) { - filter = { $and: [{ status: 'pending' }, filter] } - }; + filter = { $and: [{ status: 'pending' }, filter] }; + } const task = await this.findOneAndUpdate(filter, { status: 'cancelled', cancelledAt: new Date() }, { returnDocument: 'after' }); return task; }; @@ -92,7 +119,7 @@ taskSchema.statics.startPolling = function startPolling(options) { doPoll.call(this); this._cancel = () => { cancelled = true; - clearTimeout(timeout) + clearTimeout(timeout); }; } return this._cancel; @@ -101,6 +128,12 @@ taskSchema.statics.startPolling = function startPolling(options) { if (cancelled) { return; } + + const Task = this; + + // Expire tasks that have timed out (refactored to separate function) + await Task.expireTimedOutTasks(); + this._currentPoll = this.poll(pollOptions); await this._currentPoll.then( () => { @@ -114,12 +147,68 @@ taskSchema.statics.startPolling = function startPolling(options) { } }; +// Refactor logic for expiring timed out tasks here +taskSchema.statics.expireTimedOutTasks = async function expireTimedOutTasks() { + const now = time.now(); + const Task = this; + while (true) { + const task = await Task.findOneAndUpdate( + { + status: 'in_progress', + startedRunningAt: { $exists: true }, + timeoutAt: { $exists: true, $lte: now } + }, + { + $set: { + status: 'timed_out', + finishedRunningAt: now + } + }, + { new: true } + ); + + if (!task) { + break; + } + + await _handleRepeatingTask(Task, task); + } +}; + taskSchema.statics.registerHandler = async function registerHandler(name, fn) { this._handlers = this._handlers || new Map(); this._handlers.set(name, fn); return this; }; +async function _handleRepeatingTask(Task, task) { + if (task.nextScheduledAt != null) { + const scheduledAt = new Date(task.nextScheduledAt); + return Task.create({ + name: task.name, + scheduledAt, + repeatAfterMS: task.repeatAfterMS, + params: task.params, + previousTaskId: task._id, + originalTaskId: task.originalTaskId || task._id, + timeoutMS: task.timeoutMS, + schedulingTimeoutAt: scheduledAt.valueOf() + 10 * 60 * 1000 + }); + } else if (task.repeatAfterMS != null) { + const scheduledAt = new Date(task.scheduledAt.valueOf() + task.repeatAfterMS); + return Task.create({ + name: task.name, + scheduledAt, + repeatAfterMS: task.repeatAfterMS, + params: task.params, + previousTaskId: task._id, + originalTaskId: task.originalTaskId || task._id, + timeoutMS: task.timeoutMS, + schedulingTimeoutAt: scheduledAt.valueOf() + 10 * 60 * 1000 + }); + } +} + taskSchema.statics.registerHandlers = async function registerHandlers(obj, prefix) { this._handlers = this._handlers || new Map(); for (const key of Object.keys(obj)) { @@ -145,12 +234,16 @@ taskSchema.statics.poll = async function poll(opts) { const additionalParams = workerName ? { workerName } : {}; while (true) { - let tasksInProgress = []; + const tasksInProgress = []; for (let i = 0; i < parallel; ++i) { const now = time.now(); const task = await this.findOneAndUpdate( { status: 'pending', scheduledAt: { $lte: now } }, - { status: 'in_progress', startedRunningAt: now, ...additionalParams }, + { + status: 'in_progress', + timeoutAt: new Date(now.valueOf() + 10 * 60 * 1000), // 10 minutes from startedRunningAt + ...additionalParams + }, { new: false } ); @@ -158,8 +251,6 @@ taskSchema.statics.poll = async function poll(opts) { break; } - task.status = 'in_progress'; - tasksInProgress.push(this.execute(task)); } @@ -176,6 +267,18 @@ taskSchema.statics.execute = async function(task) { return null; } + task.status = 'in_progress'; + const now = time.now(); + task.startedRunningAt = now; + + if (task.schedulingTimeoutAt && task.schedulingTimeoutAt < now) { + task.status = 'scheduling_timed_out'; + task.finishedRunningAt = now; + await task.save(); + await _handleRepeatingTask(this, task); + return task; + } + try { let result = null; if (typeof task.timeoutMS === 'number') { @@ -204,27 +307,7 @@ taskSchema.statics.execute = async function(task) { await task.save(); } - if (task.nextScheduledAt != null) { - await this.create({ - name: task.name, - scheduledAt: new Date(task.nextScheduledAt), - repeatAfterMS: task.repeatAfterMS, - params: task.params, - previousTaskId: task._id, - originalTaskId: task.originalTaskId || task._id, - timeoutMS: task.timeoutMS - }); - } else if (task.repeatAfterMS != null) { - await this.create({ - name: task.name, - scheduledAt: new Date(task.scheduledAt.valueOf() + task.repeatAfterMS), - repeatAfterMS: task.repeatAfterMS, - params: task.params, - previousTaskId: task._id, - originalTaskId: task.originalTaskId || task._id, - timeoutMS: task.timeoutMS - }); - } + await _handleRepeatingTask(this, task); return task; }; @@ -241,6 +324,7 @@ taskSchema.statics.schedule = async function schedule(name, scheduledAt, params, scheduledAt, params, repeatAfterMS, + schedulingTimeoutAt: scheduledAt.valueOf() + 10 * 60 * 1000, ...options }); }; diff --git a/test/task.test.js b/test/task.test.js index 17848d3..525d341 100644 --- a/test/task.test.js +++ b/test/task.test.js @@ -195,6 +195,8 @@ describe('Task', function() { assert.equal(task.status, 'succeeded'); assert.equal(task.workerName, 'taco'); assert.strictEqual(task.result, 42); + + cancel(); }); it('catches errors in task', async function() { @@ -241,4 +243,155 @@ describe('Task', function() { assert.equal(task.error.message, 'Task timed out after 50 ms'); assert.equal(task.finishedRunningAt.valueOf(), now.valueOf()); }); + + it('expires timed out tasks and handles repeats', async function() { + Task.registerHandler('timedOutJob', async () => { + // handler intentionally does nothing (we'll simulate a timeout) + }); + + // Simulate a task that was started but "timed out" previously + const scheduledAt = time.now(); + const startedRunningAt = new Date(scheduledAt.valueOf() - 20000); + const timeoutMS = 10000; // 10s timeout + const timeoutAt = new Date(startedRunningAt.valueOf() + timeoutMS); + + let timedOutTask = await Task.create({ + name: 'timedOutJob', + scheduledAt, + startedRunningAt, + timeoutAt, + status: 'in_progress', + timeoutMS, + params: { foo: 'bar' } + }); + + // Now simulate time after timeoutAt + sinon.restore(); + sinon.stub(time, 'now').callsFake(() => + // now after the timeoutAt + new Date(timeoutAt.valueOf() + 1000) + ); + + // Directly call expireTimedOutTasks instead of polling + await Task.expireTimedOutTasks(); + + // Reload the task and check its status + timedOutTask = await Task.findById(timedOutTask._id); + assert.ok(timedOutTask, 'Still found timed out task'); + assert.equal(timedOutTask.status, 'timed_out'); + assert.ok(timedOutTask.finishedRunningAt.valueOf() >= timeoutAt.valueOf()); + + // If repeating, should have queued repeat (not in this test) + const repeatTask = await Task.findOne({ previousTaskId: timedOutTask._id }); + assert.ok(!repeatTask, 'No repeat should exist for non-repeating task'); + + // Now try with repeatAfterMS to verify repeat scheduled + const repeatTaskObj = await Task.create({ + name: 'timedOutJob', + scheduledAt, + startedRunningAt, + timeoutAt, + status: 'in_progress', + repeatAfterMS: 60000, + timeoutMS, + params: { foo: 'baz' } + }); + + // We should advance the fake clock further to catch this one too + sinon.restore(); + sinon.stub(time, 'now').callsFake(() => + new Date(timeoutAt.valueOf() + 2000) + ); + + await Task.expireTimedOutTasks(); + + const afterRepeat = await Task.findById(repeatTaskObj._id); + assert.equal(afterRepeat.status, 'timed_out'); + + // The repeat should exist and be pending + const repeated = await Task.findOne({ previousTaskId: repeatTaskObj._id, status: 'pending' }); + assert.ok(repeated, 'A repeat should be created for timed out repeating task'); + assert.equal(repeated.name, 'timedOutJob'); + assert.deepEqual(repeated.params, { foo: 'baz' }); + assert.ok(repeated.scheduledAt.valueOf() === repeatTaskObj.scheduledAt.valueOf() + 60000); + }); + + it('handles scheduling_timed_out tasks and schedules next repeat if needed', async function() { + Task.registerHandler('delayedJob', async () => { + // Will not be executed due to scheduling_timed_out logic + return 'should not be run'; + }); + + // Arrange: schedule a task whose schedulingTimeoutAt is in the past + const scheduledAt = time.now(); + const schedulingTimeoutAt = new Date(scheduledAt.valueOf() - 1000); // already "expired" + let task = await Task.create({ + name: 'delayedJob', + scheduledAt, + schedulingTimeoutAt, + status: 'pending', + params: { foo: 'qux' } + }); + + // Should move to scheduling_timed_out when execute is called + task = await Task.execute(task); + + assert.ok(task); + assert.equal(task.status, 'scheduling_timed_out'); + assert.ok(task.finishedRunningAt.valueOf() >= schedulingTimeoutAt.valueOf()); + + // Should NOT have side effected and not produced result + assert.strictEqual(task.result, undefined); + + // No repeat should exist for non-repeating task + const repeated = await Task.findOne({ previousTaskId: task._id }); + assert.ok(!repeated, 'No repeat created for non-repeating scheduling_timed_out'); + + // Now try with repeatAfterMS to verify repeat scheduled + const scheduledAt2 = time.now(); + const schedulingTimeoutAt2 = new Date(scheduledAt2.valueOf() - 2000); + const repeatAfterMS = 60000; + + let repeatTaskInput = await Task.create({ + name: 'delayedJob', + scheduledAt: scheduledAt2, + schedulingTimeoutAt: schedulingTimeoutAt2, + status: 'pending', + params: { bar: 'baz' }, + repeatAfterMS + }); + + let repeatTask = await Task.execute(repeatTaskInput); + + assert.ok(repeatTask); + assert.equal(repeatTask.status, 'scheduling_timed_out'); + + // The repeat should exist and be pending + const scheduledRepeat = await Task.findOne({ previousTaskId: repeatTask._id, status: 'pending' }); + assert.ok(scheduledRepeat, 'A repeat should be created for scheduling_timed_out repeating task'); + assert.equal(scheduledRepeat.name, 'delayedJob'); + assert.deepEqual(scheduledRepeat.params, { bar: 'baz' }); + assert.ok(scheduledRepeat.scheduledAt.valueOf() === repeatTask.scheduledAt.valueOf() + repeatAfterMS); + + // Also works with nextScheduledAt + const now = time.now(); + const nextScheduledAt = new Date(now.valueOf() + 100000); + let taskWithNext = await Task.create({ + name: 'delayedJob', + scheduledAt: now, + schedulingTimeoutAt: new Date(now.valueOf() - 5000), + status: 'pending', + params: { blah: 'blah' }, + nextScheduledAt + }); + + let resultTaskWithNext = await Task.execute(taskWithNext); + assert.equal(resultTaskWithNext.status, 'scheduling_timed_out'); + + // Next repeat should be at nextScheduledAt + const foundNext = await Task.findOne({ previousTaskId: resultTaskWithNext._id, status: 'pending' }); + assert.ok(foundNext, 'Should make repeat with nextScheduledAt'); + assert.equal(foundNext.scheduledAt.toString(), nextScheduledAt.toString()); + assert.deepEqual(foundNext.params, { blah: 'blah' }); + }); });