-
Notifications
You must be signed in to change notification settings - Fork 39
Expand file tree
/
Copy pathindex.mjs
More file actions
95 lines (85 loc) · 2.89 KB
/
index.mjs
File metadata and controls
95 lines (85 loc) · 2.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import { Worker } from 'node:worker_threads';
/**
* WorkerPool class to manage a pool of worker threads
*/
export default class WorkerPool {
/** @private {SharedArrayBuffer} - Shared memory for active thread count */
sharedBuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT);
/** @private {Int32Array} - A typed array to access shared memory */
activeThreads = new Int32Array(this.sharedBuffer);
/** @private {Array<Function>} - Queue of pending tasks */
queue = [];
/**
* Gets the current active thread count.
* @returns {number} The current active thread count.
*/
getActiveThreadCount() {
return Atomics.load(this.activeThreads, 0);
}
/**
* Changes the active thread count atomically by a given delta.
* @param {number} delta - The value to increment or decrement the active thread count by.
*/
changeActiveThreadCount(delta) {
Atomics.add(this.activeThreads, 0, delta);
}
/**
* Runs a generator within a worker thread.
* @param {string} name - The name of the generator to execute
* @param {any} dependencyOutput - Input data for the generator
* @param {number} threads - Maximum number of threads to run concurrently
* @param {Object} extra - Additional options for the generator
* @returns {Promise<any>} Resolves with the generator result, or rejects with an error
*/
run(name, dependencyOutput, threads, extra) {
return new Promise((resolve, reject) => {
/**
* Function to run the generator in a worker thread
*/
const run = () => {
this.changeActiveThreadCount(1);
// Create and start the worker thread
const worker = new Worker(
new URL(import.meta.resolve('./worker.mjs')),
{
workerData: { name, dependencyOutput, extra },
}
);
// Handle worker thread messages (result or error)
worker.on('message', result => {
this.changeActiveThreadCount(-1);
this.processQueue(threads);
if (result?.error) {
reject(result.error);
} else {
resolve(result);
}
});
// Handle worker thread errors
worker.on('error', err => {
this.changeActiveThreadCount(-1);
this.processQueue(threads);
reject(err);
});
};
// If the active thread count exceeds the limit, add the task to the queue
if (this.getActiveThreadCount() >= threads) {
this.queue.push(run);
} else {
run();
}
});
}
/**
* Process the worker thread queue to start the next available task
* when there is room for more threads.
* @param {number} threads - Maximum number of threads to run concurrently
* @private
*/
processQueue(threads) {
if (this.queue.length > 0 && this.getActiveThreadCount() < threads) {
const next = this.queue.shift();
if (next) next();
}
}
}