Skip to content

Commit 1f03c63

Browse files
CMCDragonkaitegefaulkes
authored andcommitted
WIP - this commit does not work, threadsjs is likely needs to be incorporated into our libraries and rewritten
1 parent b7f733a commit 1f03c63

11 files changed

+92
-14
lines changed

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@
4040
"@matrixai/async-init": "^2.0.0",
4141
"@matrixai/errors": "^2.0.1",
4242
"@matrixai/logger": "^4.0.1",
43-
"threads": "^1.7.0"
43+
"threads": "^1.7.0",
44+
"ts-node": "^10.9.1"
4445
},
4546
"devDependencies": {
4647
"@swc/core": "^1.3.76",

src/WorkerManager.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1-
import type { ModuleThread } from 'threads';
2-
import type { ModuleMethods } from 'threads/dist/types/master';
31
import type WorkerManagerInterface from './WorkerManagerInterface.js';
4-
import type { QueuedTask } from 'threads/dist/master/pool-types';
5-
import { Pool } from 'threads';
2+
import type {
3+
ModuleMethods,
4+
ModuleThread,
5+
QueuedTask,
6+
} from './types.js';
67
import Logger from '@matrixai/logger';
78
import { CreateDestroy, ready } from '@matrixai/async-init/CreateDestroy.js';
9+
import { Pool } from 'threads';
810
import * as errors from './errors.js';
911

12+
1013
@CreateDestroy()
1114
class WorkerManager<W extends ModuleMethods>
1215
implements WorkerManagerInterface<W>

src/WorkerManagerInterface.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
import type { ModuleThread } from 'threads';
2-
import type { ModuleMethods } from 'threads/dist/types/master';
3-
import type { QueuedTask } from 'threads/dist/master/pool-types';
1+
import type {
2+
ModuleMethods,
3+
ModuleThread,
4+
QueuedTask,
5+
} from './types.js';
46

57
interface WorkerManagerInterface<W extends ModuleMethods> {
68
destroy(options?: { force?: boolean }): Promise<void>;

src/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ export { default as workerModule } from './workerModule.js';
33
export * as errors from './errors.js';
44

55
export type { default as WorkerManagerInterface } from './WorkerManagerInterface.js';
6-
export type { WorkerModule } from './workerModule.js';
6+
// export type { WorkerModule } from './workerModule.js';
7+
export type { ModuleMethods, ModuleThread, QueuedTask } from './types.js';

src/worker.ts renamed to src/oldworker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ import worker from '../src/workerModule.js';
66

77
expose(worker);
88

9-
export type { WorkerModule } from '../src/workerModule.js';
9+
export type { WorkerModule } from '../src/oldworkerModule.js';
File renamed without changes.

src/types.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
type ModuleMethods = {
2+
[methodName: string]: (...args: any) => any;
3+
};
4+
type ModuleThread<Methods = any> = any;
5+
type QueuedTask<ThreadType, Return> = any;
6+
7+
export type {
8+
ModuleMethods,
9+
ModuleThread,
10+
QueuedTask
11+
};

src/worker.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
// This is an example worker script
2+
3+
import { expose } from 'threads/worker';
4+
import worker from '../src/workerModule.js';
5+
6+
expose(worker);

src/workerModule.js

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// import { Transfer } from 'threads';
2+
// import { isWorkerRuntime } from 'threads';
3+
4+
import * as threads from 'threads';
5+
6+
const { Transfer, isWorkerRuntime } = threads;
7+
8+
9+
/**
10+
* Worker object that contains all functions that will be executed in parallel
11+
* Functions should be using CPU-parallelism not IO-parallelism
12+
* Most functions should be synchronous, not asynchronous
13+
* Making them asynchronous does not make a difference to the caller
14+
* The caller must always await because the functions will run on the pool
15+
*/
16+
const worker = {
17+
/**
18+
* Check if we are running in the worker.
19+
* Only used for testing
20+
*/
21+
isRunningInWorker() {
22+
return isWorkerRuntime();
23+
},
24+
/**
25+
* Sleep synchronously
26+
* This blocks the entire event loop
27+
* Only used for testing
28+
*/
29+
sleep(ms) {
30+
Atomics.wait(new Int32Array(new SharedArrayBuffer(4)), 0, 0, ms);
31+
return;
32+
},
33+
/**
34+
* Zero copy demonstration manipulating buffers
35+
*/
36+
transferBuffer(data) {
37+
// Zero-copy wrap to use Node Buffer API
38+
const buffer = Buffer.from(data);
39+
// Set the last character to 2
40+
buffer[buffer.byteLength - 1] = '2'.charCodeAt(0);
41+
// Node Buffer cannot be detached
42+
// so we transfer the ArrayBuffer instead
43+
return Transfer(data);
44+
},
45+
};
46+
47+
export default worker;

tests/WorkerManager.test.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1-
import type { WorkerModule } from '#worker.js';
2-
import { spawn, Worker, Transfer } from 'threads';
1+
// import type { WorkerModule } from '#worker.js';
32
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
43
import { destroyed } from '@matrixai/async-init';
54
import * as testUtils from './utils.js';
5+
import { spawn, Worker, Transfer } from 'threads';
66
import WorkerManager from '#WorkerManager.js';
77
import * as errors from '#errors.js';
88

9+
type WorkerModule = any;
10+
911
describe('WorkerManager', () => {
1012
const logger = new Logger('WorkerManager Test', LogLevel.WARN, [
1113
new StreamHandler(),
@@ -70,7 +72,7 @@ describe('WorkerManager', () => {
7072
expect(
7173
await workerManager.call(async (w) => {
7274
mainPid2 = process.pid;
73-
const process2 = require('process');
75+
const process2 = await import('node:process');
7476
mainPid3 = process2.pid;
7577
return await w.isRunningInWorker();
7678
}),

0 commit comments

Comments
 (0)