Skip to content

Commit 15bbb20

Browse files
committed
✨ Add TaskQueue
1 parent 6ed5d9b commit 15bbb20

File tree

4 files changed

+251
-6
lines changed

4 files changed

+251
-6
lines changed

README.md

Lines changed: 72 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ Does this library help you? Please give it a ⭐️!
1818
- Auto rejecting promises removed from the shared data structure (`TaskMap`, which is an extension of a `Map` data structure)
1919
- Zero dependencies
2020
- Map with limited space (sliding-window)
21+
- Task Queue mechanism
2122

2223
## 🚀 Installation
2324

@@ -110,16 +111,56 @@ app.get('/observations/:date', async function (req, res) {
110111
})
111112
```
112113

114+
**Task Queue**
115+
116+
```typescript
117+
// Type Definition
118+
declare class TaskQueue<T> {
119+
constructor(queueSize?: number); // default is 1
120+
clear(): Promise<void>;
121+
execute(fn: () => Promise<T>): Promise<T>;
122+
getStats(): {
123+
processingCount: number;
124+
waitingCount: number;
125+
};
126+
}
127+
```
128+
129+
You want to use the Task Queue when you want to throttle requests/executions of given part of your system.
130+
131+
```typescript
132+
const queue = new TaskQueue<IScrapeResult>(5)
133+
134+
app.get('/scrape/:source', async function (req, res) {
135+
const source = req.params.source
136+
const response = await queue.execute(async () => {
137+
// this function will run by the queue mechanism
138+
return service.scrape(source)
139+
})
140+
141+
res.json(response)
142+
})
143+
144+
app.get('/scrape/status', async function (req, res) {
145+
const { processingCount, waitingCount } = queue.getStats()
146+
147+
return res.json({
148+
processingCount,
149+
waitingCount,
150+
})
151+
})
152+
```
153+
113154
**Sliding window**
114155
```typescript
115156
// Type Definition
116157
declare class SlidingTaskMap<K, V extends Deletable> extends TaskMap<K, V> {
117-
constructor(windowSize: number, ttl?: number);
118-
set(key: K, value: V, customTTL?: number): this; // create/override
119-
delete(key: K): boolean; // remove single entry
120-
clear(): void; // removes all entries
121-
pop(): boolean; // removes oldest entry, false for an empty window
122-
shift(): boolean; // removes newest entry, false for an empty window
158+
constructor(windowSize: number, ttl?: number);
159+
set(key: K, value: V, customTTL?: number): this; // create/override
160+
delete(key: K): boolean; // remove single entry
161+
clear(): void; // removes all entries
162+
pop(): boolean; // removes oldest entry, false for an empty window
163+
shift(): boolean; // removes newest entry, false for an empty window
123164
}
124165
```
125166

@@ -133,6 +174,31 @@ const WINDOW_SIZE = 10
133174
const TTL = 60 * 1000 // data will persist 60 seconds in a cache
134175
const tasks = new SlidingTaskMap<string, number[]>(WINDOW_SIZE, TTL)
135176

177+
app.get('/calculation/:date', async function () {
178+
const date = req.params.date
179+
180+
if (!tasks.has(date)) {
181+
const task = new Task<void>()
182+
tasks.set(date, task)
183+
184+
const data = await fetchData(date)
185+
task.resolve(data)
186+
}
187+
188+
return tasks.get(date)
189+
})
190+
```
191+
192+
When your map size reaches a specified threshold, the oldest values will be
193+
removed. You can be then sure that the size of the map will never overflow your memory.
194+
195+
```typescript
196+
import { Task, SlidingTaskMap } from 'promise-based-task'
197+
198+
const WINDOW_SIZE = 10
199+
const TTL = 60 * 1000 // data will persist 60 seconds in a cache
200+
const tasks = new SlidingTaskMap<string, number[]>(WINDOW_SIZE, TTL)
201+
136202
app.get('/calculation/:date', async function () {
137203
const date = req.params.date
138204

src/TaskQueue.ts

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import { Task } from './Task'
2+
import { TaskDestroyedException } from './error';
3+
4+
interface IWaitingTask<T> {
5+
task: Task<T>
6+
resolver: () => Promise<void>
7+
}
8+
9+
export class TaskQueue<T> {
10+
private _waitingQueue: Array<IWaitingTask<T>> = []
11+
private _unlockQueue: Array<Task<void>> = []
12+
13+
14+
private readonly _queueSize: number
15+
private _processingCount = 0
16+
17+
constructor(queueSize?: number) {
18+
this._queueSize = Math.max(Number(queueSize), 1)
19+
}
20+
21+
private async _internalSync() {
22+
if (this._processingCount >= this._queueSize) {
23+
const waitForUnlockTask = new Task<void>()
24+
this._unlockQueue.push(waitForUnlockTask)
25+
await waitForUnlockTask
26+
}
27+
28+
const task = this._waitingQueue.shift()
29+
if (task) {
30+
await task.resolver()
31+
}
32+
}
33+
34+
async clear() {
35+
this._waitingQueue.forEach(item => item.task.reject(new TaskDestroyedException()))
36+
this._unlockQueue.forEach(item => item.reject(new TaskDestroyedException()))
37+
38+
this._unlockQueue.length = 0
39+
this._waitingQueue.length = 0
40+
this._processingCount = 0
41+
}
42+
43+
async execute(fn: () => Promise<T>): Promise<T> {
44+
const task = new Task<T>()
45+
this._waitingQueue.push({
46+
task,
47+
resolver: async () => {
48+
this._processingCount++
49+
50+
const cleanup = () => {
51+
this._processingCount--
52+
const unlockTask = this._unlockQueue.shift()
53+
unlockTask?.resolve()
54+
}
55+
56+
return fn()
57+
.then((res) => {
58+
cleanup();
59+
task.resolve(res);
60+
})
61+
.catch(e => {
62+
cleanup();
63+
task.reject(e);
64+
})
65+
}
66+
})
67+
68+
this._internalSync().catch(() => {})
69+
70+
return task
71+
}
72+
73+
getStats() {
74+
return {
75+
processingCount: this._processingCount,
76+
waitingCount: this._waitingQueue.length,
77+
}
78+
}
79+
}

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
export * from './Task';
22
export * from './TaskMap';
33
export * from './SlidingTaskMap';
4+
export * from './TaskQueue';
45
export * from './types';
56
export * from './error';

test/TaskQueue.test.ts

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import { TaskQueue } from '../src';
2+
3+
describe('TaskQueue', () => {
4+
const sleepFactory = <T>(ms: number, res?: T) => {
5+
return () => new Promise<T>((resolve) => setTimeout(() => resolve(res), ms));
6+
};
7+
8+
beforeEach(() => {
9+
jest.useRealTimers();
10+
});
11+
12+
it('Can process single item', async () => {
13+
const queue = new TaskQueue<number>(1);
14+
expect(queue.getStats().processingCount).toBe(0);
15+
expect(queue.getStats().waitingCount).toBe(0);
16+
17+
const result = queue.execute(sleepFactory(100, 42));
18+
expect(queue.getStats().processingCount).toBe(1);
19+
expect(queue.getStats().waitingCount).toBe(0);
20+
await expect(result).resolves.toBe(42);
21+
expect(queue.getStats().processingCount).toBe(0);
22+
expect(queue.getStats().waitingCount).toBe(0);
23+
});
24+
25+
it('Can process multiple items', async () => {
26+
const queue = new TaskQueue<number>(3);
27+
const results = Promise.all([
28+
queue.execute(sleepFactory(100, 1)),
29+
queue.execute(sleepFactory(100, 2)),
30+
queue.execute(sleepFactory(100, 3)),
31+
queue.execute(sleepFactory(100, 4)),
32+
queue.execute(sleepFactory(100, 5)),
33+
]);
34+
const preStats = queue.getStats();
35+
expect(preStats.processingCount).toBe(3);
36+
expect(preStats.waitingCount).toBe(2);
37+
38+
await expect(results).resolves.toStrictEqual([1, 2, 3, 4, 5]);
39+
40+
const postStats = queue.getStats();
41+
expect(postStats.processingCount).toBe(0);
42+
expect(postStats.waitingCount).toBe(0);
43+
});
44+
45+
it('Can process multiple items', async () => {
46+
const queue = new TaskQueue<number>(3);
47+
const results = Promise.all([
48+
queue.execute(sleepFactory(50, 1)),
49+
queue.execute(sleepFactory(50, 2)),
50+
queue.execute(sleepFactory(50, 3)),
51+
queue.execute(sleepFactory(50, 4)),
52+
queue.execute(sleepFactory(50, 5)),
53+
]);
54+
const preStats = queue.getStats();
55+
expect(preStats.processingCount).toBe(3);
56+
expect(preStats.waitingCount).toBe(2);
57+
58+
await expect(results).resolves.toStrictEqual([1, 2, 3, 4, 5]);
59+
60+
const postStats = queue.getStats();
61+
expect(postStats.processingCount).toBe(0);
62+
expect(postStats.waitingCount).toBe(0);
63+
});
64+
65+
it('Can clear/destroy the queue', async () => {
66+
const queue = new TaskQueue<number>(3);
67+
const results = Promise.allSettled([
68+
queue.execute(sleepFactory(50, 1)),
69+
queue.execute(sleepFactory(50, 2)),
70+
queue.execute(sleepFactory(50, 3)),
71+
queue.execute(sleepFactory(50, 4)),
72+
queue.execute(sleepFactory(50, 5)),
73+
]).then((results) => results.map((result) => result.status));
74+
75+
await sleepFactory(0)();
76+
await queue.clear();
77+
78+
await expect(results).resolves.toMatchInlineSnapshot(`
79+
Array [
80+
"fulfilled",
81+
"fulfilled",
82+
"fulfilled",
83+
"rejected",
84+
"rejected",
85+
]
86+
`);
87+
});
88+
89+
90+
it('Will resolve in correct order', async () => {
91+
const queue = new TaskQueue<number>(1);
92+
const result = Promise.race([
93+
queue.execute(sleepFactory(500, 1)),
94+
queue.execute(sleepFactory(50, 2)),
95+
])
96+
97+
await expect(result).resolves.toBe(1)
98+
});
99+
});

0 commit comments

Comments
 (0)