diff --git a/package.json b/package.json index 24aff95e..bbc16f85 100644 --- a/package.json +++ b/package.json @@ -35,6 +35,7 @@ "@types/node": "^14.0.27", "@types/puppeteer": "^3.0.1", "@types/puppeteer-core": "^2.0.0", + "@types/underscore": "^1.11.3", "coveralls": "^3.1.0", "express": "^4.17.1", "jest": "^24.9.0", diff --git a/src/Cluster.ts b/src/Cluster.ts index 6d5c183a..fa29c042 100644 --- a/src/Cluster.ts +++ b/src/Cluster.ts @@ -12,6 +12,7 @@ import SystemMonitor from './SystemMonitor'; import { EventEmitter } from 'events'; import ConcurrencyImplementation, { WorkerInstance, ConcurrencyImplementationClassType } from './concurrency/ConcurrencyImplementation'; +import * as _ from 'underscore'; const debug = util.debugGenerator('Cluster'); @@ -404,6 +405,15 @@ export default class Cluster extends EventEmitt this.work(); } + private removeTask(id: any) { + try { + const findIndex = this.jobQueue.list.lastIndexOf(id) + findIndex !== -1 && this.jobQueue.list.splice(findIndex , 1) + } catch (error) { + console.log('error while removing task from queue >>>', JSON.stringify(error)) + } + } + public async queue( data: JobData, taskFunction?: TaskFunction, @@ -418,6 +428,10 @@ export default class Cluster extends EventEmitt this.queueJob(data, taskFunction); } + public removeTaskFromQueue(id: string) { + this.removeTask(id); + } + public execute( data: JobData, taskFunction?: TaskFunction, diff --git a/src/Job.ts b/src/Job.ts index 4d94c999..f627a6c6 100644 --- a/src/Job.ts +++ b/src/Job.ts @@ -17,15 +17,16 @@ export default class Job { private lastError: Error | null = null; public tries: number = 0; - + public url: string; public constructor( - data?: JobData, + data?: any, taskFunction?: TaskFunction, executeCallbacks?: ExecuteCallbacks, ) { this.data = data; this.taskFunction = taskFunction; this.executeCallbacks = executeCallbacks; + this.url = data.url; } public getUrl(): string | undefined { diff --git a/src/Queue.ts b/src/Queue.ts index 99f17d68..37de216b 100644 --- a/src/Queue.ts +++ b/src/Queue.ts @@ -6,7 +6,7 @@ interface QueueOptions { export default class Queue { - private list: T[] = []; + public list: T[] = []; private delayedItems: number = 0; public size(): number {