Skip to content

Commit e801319

Browse files
authored
Add push() options (#13)
1 parent fc964b2 commit e801319

File tree

5 files changed

+116
-23
lines changed

5 files changed

+116
-23
lines changed

README.md

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ In ESM:
1919
```ts
2020
import {
2121
AwaitQueue,
22+
AwaitQueuePushOptions,
2223
AwaitQueueTask,
2324
AwaitQueueTaskDump,
2425
AwaitQueueStoppedError,
@@ -31,6 +32,7 @@ Using CommonJS:
3132
```ts
3233
const {
3334
AwaitQueue,
35+
AwaitQueuePushOptions,
3436
AwaitQueueTask,
3537
AwaitQueueTaskDump,
3638
AwaitQueueStoppedError,
@@ -40,6 +42,18 @@ const {
4042

4143
## Types
4244

45+
### `type AwaitQueuePushOptions`
46+
47+
```ts
48+
export type AwaitQueuePushOptions = {
49+
removeOngoingTasksWithSameName?: boolean;
50+
};
51+
```
52+
53+
Options given to `awaitQueue.push()`.
54+
55+
- `removeOngoingTasksWithSameName`: If `true`, all previously enqueued tasks with same name will be removed and will reject with an instance of `AwaitQueueRemovedTaskError`.
56+
4357
### `type AwaitQueueTask`
4458

4559
```ts
@@ -87,13 +101,14 @@ Number of enqueued pending tasks in the queue (including the running one if any)
87101
#### Method `awaitQueue.push()`
88102

89103
```ts
90-
async push<T>(task: AwaitQueueTask<T>, name?: string): Promise<T>
104+
async push<T>(task: AwaitQueueTask<T>, name?: string, options?: AwaitQueuePushOptions): Promise<T>
91105
```
92106

93107
Accepts a task as argument and enqueues it after pending tasks. Once processed, the `push()` method resolves (or rejects) with the result (or error) returned by the given task.
94108

95109
- `@param task`: Asynchronous or asynchronous function.
96-
- `@param name`: Optional task name (useful for `awaitQueue.dump()` method).
110+
- `@param name`: Optional task name.
111+
- `@param.options`: Options.
97112

98113
#### Method `awaitQueue.stop()`
99114

src/AwaitQueue.ts

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
import { Logger } from './Logger';
2-
import type { AwaitQueueTask, AwaitQueueTaskDump } from './types';
2+
import type {
3+
AwaitQueuePushOptions,
4+
AwaitQueueTask,
5+
AwaitQueueTaskDump,
6+
} from './types';
37
import { AwaitQueueStoppedError, AwaitQueueRemovedTaskError } from './errors';
48

59
const logger = new Logger('AwaitQueue');
@@ -12,16 +16,17 @@ type PendingTask<T> = {
1216
executedAt?: number;
1317
completed: boolean;
1418
resolve: (result: T | PromiseLike<T>) => void;
15-
reject: (error: Error) => void;
19+
reject: (
20+
error: Error,
21+
{ canExecuteNextTask }: { canExecuteNextTask: boolean }
22+
) => void;
1623
};
1724

1825
export class AwaitQueue {
1926
// Queue of pending tasks (map of PendingTasks indexed by id).
2027
private readonly pendingTasks: Map<number, PendingTask<any>> = new Map();
2128
// Incrementing PendingTask id.
2229
private nextTaskId = 0;
23-
// Whether stop() method is stopping all pending tasks.
24-
private stopping = false;
2530

2631
constructor() {
2732
logger.debug('constructor()');
@@ -31,10 +36,14 @@ export class AwaitQueue {
3136
return this.pendingTasks.size;
3237
}
3338

34-
async push<T>(task: AwaitQueueTask<T>, name?: string): Promise<T> {
39+
async push<T>(
40+
task: AwaitQueueTask<T>,
41+
name?: string,
42+
options?: AwaitQueuePushOptions
43+
): Promise<T> {
3544
name = name ?? task.name;
3645

37-
logger.debug(`push() [name:${name}]`);
46+
logger.debug(`push() [name:${name}, options:%o]`, options);
3847

3948
if (typeof task !== 'function') {
4049
throw new TypeError('given task is not a function');
@@ -47,6 +56,16 @@ export class AwaitQueue {
4756
}
4857

4958
return new Promise<T>((resolve, reject) => {
59+
if (name && options?.removeOngoingTasksWithSameName) {
60+
for (const pendingTask of this.pendingTasks.values()) {
61+
if (pendingTask.name === name) {
62+
pendingTask.reject(new AwaitQueueRemovedTaskError(), {
63+
canExecuteNextTask: false,
64+
});
65+
}
66+
}
67+
}
68+
5069
const pendingTask: PendingTask<T> = {
5170
id: this.nextTaskId++,
5271
task: task,
@@ -86,7 +105,10 @@ export class AwaitQueue {
86105
void this.execute(nextPendingTask);
87106
}
88107
},
89-
reject: (error: Error) => {
108+
reject: (
109+
error: Error,
110+
{ canExecuteNextTask }: { canExecuteNextTask: boolean }
111+
) => {
90112
// pendingTask.reject() can be called within execute() method if the
91113
// task completed with error. However it may have also been called in
92114
// stop() or remove() methods (before or while being executed) so its
@@ -109,8 +131,8 @@ export class AwaitQueue {
109131
// Reject the task with the obtained error.
110132
reject(error);
111133

112-
// Execute the next pending task (if any) unless stop() is running.
113-
if (!this.stopping) {
134+
// May execute next pending task (if any).
135+
if (canExecuteNextTask) {
114136
const [nextPendingTask] = this.pendingTasks.values();
115137

116138
// NOTE: During the reject() callback the user app may have interacted
@@ -137,15 +159,13 @@ export class AwaitQueue {
137159
stop(): void {
138160
logger.debug('stop()');
139161

140-
this.stopping = true;
141-
142162
for (const pendingTask of this.pendingTasks.values()) {
143163
logger.debug(`stop() | stopping task [name:${pendingTask.name}]`);
144164

145-
pendingTask.reject(new AwaitQueueStoppedError());
165+
pendingTask.reject(new AwaitQueueStoppedError(), {
166+
canExecuteNextTask: false,
167+
});
146168
}
147-
148-
this.stopping = false;
149169
}
150170

151171
remove(taskIdx: number): void {
@@ -159,7 +179,9 @@ export class AwaitQueue {
159179
return;
160180
}
161181

162-
pendingTask.reject(new AwaitQueueRemovedTaskError());
182+
pendingTask.reject(new AwaitQueueRemovedTaskError(), {
183+
canExecuteNextTask: true,
184+
});
163185
}
164186

165187
dump(): AwaitQueueTaskDump[] {
@@ -193,7 +215,7 @@ export class AwaitQueue {
193215
pendingTask.resolve(result);
194216
} catch (error) {
195217
// Reject the task with its rejected error.
196-
pendingTask.reject(error as Error);
218+
pendingTask.reject(error as Error, { canExecuteNextTask: true });
197219
}
198220
}
199221
}

src/index.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
11
export { AwaitQueue } from './AwaitQueue';
22
export { AwaitQueueStoppedError, AwaitQueueRemovedTaskError } from './errors';
3-
export type { AwaitQueueTask, AwaitQueueTaskDump } from './types';
3+
export type {
4+
AwaitQueuePushOptions,
5+
AwaitQueueTask,
6+
AwaitQueueTaskDump,
7+
} from './types';

src/tests/test.ts

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ test('new task does not lead to next task execution if a stopped one is ongoing'
191191
const executionsCount: Map<string, number> = new Map();
192192
const emitter = new EventEmitter();
193193

194-
const taskA = function (): Promise<void> {
194+
const taskA = async function (): Promise<void> {
195195
const taskName = 'taskA';
196196

197197
return new Promise<void>(resolve => {
@@ -203,7 +203,7 @@ test('new task does not lead to next task execution if a stopped one is ongoing'
203203
});
204204
};
205205

206-
const taskB = function (): Promise<void> {
206+
const taskB = async function (): Promise<void> {
207207
const taskName = 'taskB';
208208

209209
return new Promise<void>(resolve => {
@@ -215,7 +215,7 @@ test('new task does not lead to next task execution if a stopped one is ongoing'
215215
});
216216
};
217217

218-
const taskC = function (): Promise<void> {
218+
const taskC = async function (): Promise<void> {
219219
const taskName = 'taskC';
220220

221221
return new Promise<void>(resolve => {
@@ -227,7 +227,7 @@ test('new task does not lead to next task execution if a stopped one is ongoing'
227227
});
228228
};
229229

230-
const taskD = function (): Promise<void> {
230+
const taskD = async function (): Promise<void> {
231231
const taskName = 'taskD';
232232

233233
return new Promise<void>(resolve => {
@@ -277,6 +277,54 @@ test('new task does not lead to next task execution if a stopped one is ongoing'
277277
expect(executionsCount.get('taskD')).toBe(1);
278278
}, 1000);
279279

280+
test('removeOngoingTasksWithSameName option removes ongoing tasks with same name', async () => {
281+
const awaitQueue = new AwaitQueue();
282+
const emitter = new EventEmitter();
283+
let removedTaskACount = 0;
284+
285+
const taskA = async function (): Promise<void> {
286+
return new Promise<void>(resolve => {
287+
emitter.on('resolve-task-a', resolve);
288+
});
289+
};
290+
291+
const taskB = async function (): Promise<void> {
292+
return new Promise<void>(resolve => {
293+
emitter.on('resolve-task-b', resolve);
294+
});
295+
};
296+
297+
for (let i = 0; i < 10; ++i) {
298+
awaitQueue
299+
.push(taskA, 'taskA', { removeOngoingTasksWithSameName: false })
300+
.catch(error => {
301+
if (error instanceof AwaitQueueRemovedTaskError) {
302+
++removedTaskACount;
303+
}
304+
});
305+
}
306+
307+
const lastTaskAPromise = awaitQueue.push(taskA, 'taskA', {
308+
removeOngoingTasksWithSameName: true,
309+
});
310+
311+
const taskBPromise = awaitQueue.push(taskB, 'taskB', {
312+
removeOngoingTasksWithSameName: true,
313+
});
314+
315+
expectDumpToContain(awaitQueue, ['taskA', 'taskB']);
316+
317+
emitter.emit('resolve-task-a');
318+
319+
await lastTaskAPromise;
320+
321+
emitter.emit('resolve-task-b');
322+
323+
await taskBPromise;
324+
325+
expect(removedTaskACount).toBe(10);
326+
}, 1000);
327+
280328
async function wait(timeMs: number): Promise<void> {
281329
await new Promise<void>(resolve => {
282330
setTimeout(resolve, timeMs);

src/types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
export type AwaitQueuePushOptions = {
2+
removeOngoingTasksWithSameName?: boolean;
3+
};
4+
15
export type AwaitQueueTask<T> = () => T | PromiseLike<T>;
26

37
export type AwaitQueueTaskDump = {

0 commit comments

Comments
 (0)