Skip to content

Commit 84fa715

Browse files
authored
feat(node-runtime-worker-thread): Add proxied eventEmitter to the worker runtime COMPASS-4558 (#597)
* feat(node-runtime-worker-thread): Add proxied eventEmitter to the worker runtime * refactor(node-runtime-worker-thread): Use 'global' runtime in tests * refactor(node-runtime-worker-thread): Throw if on method called on mongosh bus
1 parent 2cfce4c commit 84fa715

File tree

6 files changed

+102
-18
lines changed

6 files changed

+102
-18
lines changed

packages/node-runtime-worker-thread/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
"@mongosh/browser-runtime-electron": "0.0.0-dev.0",
4444
"@mongosh/service-provider-core": "0.0.0-dev.0",
4545
"@mongosh/service-provider-server": "0.0.0-dev.0",
46+
"@mongosh/types": "0.0.0-dev.0",
4647
"bson": "^4.2.2",
4748
"postmsg-rpc": "^2.4.0"
4849
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { ChildProcess } from 'child_process';
2+
import { MongoshBus } from '@mongosh/types';
3+
import { exposeAll, WithClose } from './rpc';
4+
5+
export class ChildProcessMongoshBus {
6+
exposedEmitter: WithClose<MongoshBus>;
7+
8+
constructor(eventEmitter: MongoshBus, childProcess: ChildProcess) {
9+
const exposedEmitter: WithClose<MongoshBus> = exposeAll(
10+
{
11+
emit(...args) {
12+
eventEmitter.emit(...args);
13+
},
14+
on() {
15+
throw new Error("Can't use `on` method on ChildProcessMongoshBus");
16+
}
17+
},
18+
childProcess
19+
);
20+
this.exposedEmitter = exposedEmitter;
21+
}
22+
}

packages/node-runtime-worker-thread/src/child-process-proxy.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,7 @@ const evaluationListener = createCaller(
6868
);
6969

7070
exposeAll(evaluationListener, workerProcess);
71+
72+
const messageBus = createCaller(['emit', 'on'], process);
73+
74+
exposeAll(messageBus, workerProcess);

packages/node-runtime-worker-thread/src/index.spec.ts

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,31 @@
11
import chai, { expect } from 'chai';
22
import sinon from 'sinon';
33
import sinonChai from 'sinon-chai';
4+
import { MongoshBus } from '@mongosh/types';
45
import { startTestServer } from '../../../testing/integration-testing-hooks';
56
import { WorkerRuntime } from '../dist/index';
67

78
chai.use(sinonChai);
89

10+
function createMockEventEmitter() {
11+
return (sinon.stub({ on() {}, emit() {} }) as unknown) as MongoshBus;
12+
}
13+
914
describe('WorkerRuntime', () => {
15+
let runtime: WorkerRuntime;
16+
17+
afterEach(async() => {
18+
if (runtime) {
19+
await runtime.terminate();
20+
}
21+
});
22+
1023
describe('evaluate', () => {
1124
it('should evaluate and return basic values', async() => {
12-
const runtime = new WorkerRuntime('mongodb://nodb/', {}, { nodb: true });
25+
runtime = new WorkerRuntime('mongodb://nodb/', {}, { nodb: true });
1326
const result = await runtime.evaluate('1+1');
1427

1528
expect(result.printable).to.equal(2);
16-
17-
await runtime.terminate();
1829
});
1930

2031
describe('errors', () => {
@@ -46,7 +57,9 @@ describe('WorkerRuntime', () => {
4657
});
4758

4859
it("should return an error if it's returned from evaluation", async() => {
49-
const { printable } = await runtime.evaluate('new SyntaxError("Syntax!")');
60+
const { printable } = await runtime.evaluate(
61+
'new SyntaxError("Syntax!")'
62+
);
5063

5164
expect(printable).to.be.instanceof(Error);
5265
expect(printable).to.have.property('name', 'SyntaxError');
@@ -62,25 +75,21 @@ describe('WorkerRuntime', () => {
6275
const testServer = startTestServer('shared');
6376

6477
it('should return completions', async() => {
65-
const runtime = new WorkerRuntime(await testServer.connectionString());
78+
runtime = new WorkerRuntime(await testServer.connectionString());
6679
const completions = await runtime.getCompletions('db.coll1.f');
6780

6881
expect(completions).to.deep.contain({ completion: 'db.coll1.find' });
69-
70-
await runtime.terminate();
7182
});
7283
});
7384

7485
describe('getShellPrompt', () => {
7586
const testServer = startTestServer('shared');
7687

7788
it('should return prompt when connected to the server', async() => {
78-
const runtime = new WorkerRuntime(await testServer.connectionString());
89+
runtime = new WorkerRuntime(await testServer.connectionString());
7990
const result = await runtime.getShellPrompt();
8091

8192
expect(result).to.match(/>/);
82-
83-
await runtime.terminate();
8493
});
8594
});
8695

@@ -90,16 +99,39 @@ describe('WorkerRuntime', () => {
9099
onPrompt: sinon.spy(() => 'password123')
91100
};
92101

93-
const runtime = new WorkerRuntime('mongodb://nodb/', {}, { nodb: true });
102+
runtime = new WorkerRuntime('mongodb://nodb/', {}, { nodb: true });
94103

95104
runtime.setEvaluationListener(evalListener);
96105

97106
const password = await runtime.evaluate('passwordPrompt()');
98107

99108
expect(evalListener.onPrompt).to.have.been.called;
100109
expect(password.printable).to.equal('password123');
110+
});
111+
});
101112

102-
await runtime.terminate();
113+
describe('eventEmitter', () => {
114+
const testServer = startTestServer('shared');
115+
116+
it('should propagate emitted events from worker', async() => {
117+
const eventEmitter = createMockEventEmitter();
118+
119+
runtime = new WorkerRuntime(
120+
await testServer.connectionString(),
121+
{},
122+
{},
123+
{},
124+
eventEmitter
125+
);
126+
127+
await runtime.evaluate('db.getCollectionNames()');
128+
129+
expect(eventEmitter.emit).to.have.been.calledWith('mongosh:api-call', {
130+
arguments: {},
131+
class: 'Database',
132+
db: 'test',
133+
method: 'getCollectionNames'
134+
});
103135
});
104136
});
105137
});

packages/node-runtime-worker-thread/src/index.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,19 @@
44
import { ChildProcess, SpawnOptionsWithoutStdio } from 'child_process';
55
import { MongoClientOptions } from '@mongosh/service-provider-core';
66
import { Runtime, RuntimeEvaluationListener, RuntimeEvaluationResult } from '@mongosh/browser-runtime-core';
7+
import { MongoshBus } from '@mongosh/types';
78
import { promises as fs } from 'fs';
89
import path from 'path';
10+
import { EventEmitter } from 'events';
911
import spawnChildFromSource, { kill } from './spawn-child-from-source';
1012
import { Caller, createCaller } from './rpc';
1113
import { ChildProcessEvaluationListener } from './child-process-evaluation-listener';
1214
import type { WorkerRuntime as WorkerThreadWorkerRuntime } from './worker-runtime';
1315
import { deserializeEvaluationResult } from './serializer';
16+
import { ChildProcessMongoshBus } from './child-process-mongosh-bus';
1417

1518
type ChildProcessRuntime = Caller<WorkerThreadWorkerRuntime>;
19+
1620
class WorkerRuntime implements Runtime {
1721
private initOptions: {
1822
uri: string;
@@ -23,6 +27,10 @@ class WorkerRuntime implements Runtime {
2327

2428
evaluationListener: RuntimeEvaluationListener | null = null;
2529

30+
private eventEmitter: MongoshBus;
31+
32+
private childProcessMongoshBus!: ChildProcessMongoshBus;
33+
2634
private childProcessEvaluationListener!: ChildProcessEvaluationListener;
2735

2836
private childProcess!: ChildProcess;
@@ -35,10 +43,12 @@ class WorkerRuntime implements Runtime {
3543
uri: string,
3644
driverOptions: MongoClientOptions = {},
3745
cliOptions: { nodb?: boolean } = {},
38-
spawnOptions: SpawnOptionsWithoutStdio = {}
46+
spawnOptions: SpawnOptionsWithoutStdio = {},
47+
eventEmitter: MongoshBus = new EventEmitter()
3948
) {
4049
this.initOptions = { uri, driverOptions, cliOptions, spawnOptions };
4150
this.initWorkerPromise = this.initWorker();
51+
this.eventEmitter = eventEmitter;
4252
}
4353

4454
private async initWorker() {
@@ -76,6 +86,11 @@ class WorkerRuntime implements Runtime {
7686
this.childProcess
7787
);
7888

89+
this.childProcessMongoshBus = new ChildProcessMongoshBus(
90+
this.eventEmitter,
91+
this.childProcess
92+
);
93+
7994
await this.childProcessRuntime.init(uri, driverOptions, cliOptions);
8095
}
8196

packages/node-runtime-worker-thread/src/worker-runtime.ts

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
/* ^^^ we test the dist directly, so isntanbul can't calculate the coverage correctly */
33

44
import { parentPort, isMainThread } from 'worker_threads';
5-
import { Runtime, RuntimeEvaluationListener } from '@mongosh/browser-runtime-core';
5+
import {
6+
Runtime,
7+
RuntimeEvaluationListener
8+
} from '@mongosh/browser-runtime-core';
69
import { ElectronRuntime } from '@mongosh/browser-runtime-electron';
710
import {
811
MongoClientOptions,
@@ -11,6 +14,7 @@ import {
1114
import { CompassServiceProvider } from '@mongosh/service-provider-server';
1215
import { exposeAll, createCaller } from './rpc';
1316
import { serializeEvaluationResult } from './serializer';
17+
import { MongoshBus } from '@mongosh/types';
1418

1519
if (!parentPort || isMainThread) {
1620
throw new Error('Worker runtime can be used only in a worker thread');
@@ -34,6 +38,15 @@ const evaluationListener = createCaller<RuntimeEvaluationListener>(
3438
parentPort
3539
);
3640

41+
const messageBus: MongoshBus = Object.assign(
42+
createCaller(['emit'], parentPort),
43+
{
44+
on() {
45+
throw new Error("Can't call `on` method on worker runtime MongoshBus");
46+
}
47+
}
48+
);
49+
3750
export type WorkerRuntime = Runtime & {
3851
init(
3952
uri: string,
@@ -53,10 +66,7 @@ const workerRuntime: WorkerRuntime = {
5366
driverOptions,
5467
cliOptions
5568
);
56-
runtime = new ElectronRuntime(
57-
provider /** , TODO: `messageBus` support for telemetry in a separate ticket */
58-
);
59-
69+
runtime = new ElectronRuntime(provider, messageBus);
6070
runtime.setEvaluationListener(evaluationListener);
6171
},
6272

0 commit comments

Comments
 (0)