Skip to content

Commit 17c2c33

Browse files
committed
atomics
1 parent 2ad5e01 commit 17c2c33

File tree

1 file changed

+25
-7
lines changed

1 file changed

+25
-7
lines changed

src/threading/index.mjs

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,29 @@ import { Worker } from 'node:worker_threads';
44
* WorkerPool class to manage a pool of worker threads
55
*/
66
export default class WorkerPool {
7-
/** @private {number} - Number of active threads */
8-
activeThreads = 0;
7+
/** @private {SharedArrayBuffer} - Shared memory for active thread count */
8+
sharedBuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT);
9+
/** @private {Int32Array} - A typed array to access shared memory */
10+
activeThreads = new Int32Array(this.sharedBuffer);
911
/** @private {Array<Function>} - Queue of pending tasks */
1012
queue = [];
1113

14+
/**
15+
* Gets the current active thread count.
16+
* @returns {number} The current active thread count.
17+
*/
18+
getActiveThreadCount() {
19+
return Atomics.load(this.activeThreads, 0);
20+
}
21+
22+
/**
23+
* Changes the active thread count atomically by a given delta.
24+
* @param {number} delta - The value to increment or decrement the active thread count by.
25+
*/
26+
changeActiveThreadCount(delta) {
27+
Atomics.add(this.activeThreads, 0, delta);
28+
}
29+
1230
/**
1331
* Runs a generator within a worker thread.
1432
* @param {string} name - The name of the generator to execute
@@ -23,7 +41,7 @@ export default class WorkerPool {
2341
* Function to run the generator in a worker thread
2442
*/
2543
const run = () => {
26-
this.activeThreads++;
44+
this.changeActiveThreadCount(1);
2745

2846
// Create and start the worker thread
2947
const worker = new Worker(
@@ -35,22 +53,22 @@ export default class WorkerPool {
3553

3654
// Handle worker thread messages (result or error)
3755
worker.on('message', result => {
38-
this.activeThreads--;
56+
this.changeActiveThreadCount(-1);
3957
this.processQueue(threads);
4058

4159
(result?.error ? reject : resolve)(result);
4260
});
4361

4462
// Handle worker thread errors
4563
worker.on('error', err => {
46-
this.activeThreads--;
64+
this.changeActiveThreadCount(-1);
4765
this.processQueue(threads);
4866
reject(err);
4967
});
5068
};
5169

5270
// If the active thread count exceeds the limit, add the task to the queue
53-
if (this.activeThreads >= threads) {
71+
if (this.getActiveThreadCount() >= threads) {
5472
this.queue.push(run);
5573
} else {
5674
run();
@@ -65,7 +83,7 @@ export default class WorkerPool {
6583
* @private
6684
*/
6785
processQueue(threads) {
68-
if (this.queue.length > 0 && this.activeThreads < threads) {
86+
if (this.queue.length > 0 && this.getActiveThreadCount() < threads) {
6987
const next = this.queue.shift();
7088
if (next) next();
7189
}

0 commit comments

Comments
 (0)