Skip to content

Commit 5451fe6

Browse files
karthiknadigeleanorjboyd
authored andcommitted
Fixes duplicated messages with fifo and dispose
1 parent 67367bb commit 5451fe6

File tree

4 files changed

+102
-82
lines changed

4 files changed

+102
-82
lines changed

src/client/common/pipes/namedPipes.ts

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33

44
import * as cp from 'child_process';
55
import * as crypto from 'crypto';
6-
import * as fs from 'fs';
6+
import * as fs from 'fs-extra';
77
import * as net from 'net';
88
import * as os from 'os';
99
import * as path from 'path';
1010
import * as rpc from 'vscode-jsonrpc/node';
11-
import { CancellationError, CancellationToken } from 'vscode';
11+
import { CancellationError, CancellationToken, Disposable } from 'vscode';
1212
import { traceVerbose } from '../../logging';
1313
import { isWindows } from '../platform/platformService';
1414
import { createDeferred } from '../utils/async';
@@ -73,6 +73,9 @@ export async function createWriterPipe(pipeName: string, token?: CancellationTok
7373
}
7474
// linux implementation of FIFO
7575
await mkfifo(pipeName);
76+
try {
77+
await fs.chmod(pipeName, 0o666);
78+
} catch {}
7679
const writer = fs.createWriteStream(pipeName, {
7780
encoding: 'utf-8',
7881
});
@@ -86,14 +89,14 @@ class CombinedReader implements rpc.MessageReader {
8689

8790
private _onPartialMessage = new rpc.Emitter<rpc.PartialMessageInfo>();
8891

89-
private _listeners = new rpc.Emitter<rpc.NotificationMessage>();
90-
91-
private _readers: rpc.MessageReader[] = [];
92+
private _callback: rpc.DataCallback = () => {};
9293

9394
private _disposables: rpc.Disposable[] = [];
9495

96+
private _readers: rpc.MessageReader[] = [];
97+
9598
constructor() {
96-
this._disposables.push(this._onClose, this._onError, this._onPartialMessage, this._listeners);
99+
this._disposables.push(this._onClose, this._onError, this._onPartialMessage);
97100
}
98101

99102
onError: rpc.Event<Error> = this._onError.event;
@@ -103,34 +106,41 @@ class CombinedReader implements rpc.MessageReader {
103106
onPartialMessage: rpc.Event<rpc.PartialMessageInfo> = this._onPartialMessage.event;
104107

105108
listen(callback: rpc.DataCallback): rpc.Disposable {
106-
return this._listeners.event(callback);
109+
this._callback = callback;
110+
return new Disposable(() => (this._callback = () => {}));
107111
}
108112

109113
add(reader: rpc.MessageReader): void {
110114
this._readers.push(reader);
111-
this._disposables.push(
112-
reader.onError((error) => this._onError.fire(error)),
113-
reader.onClose(() => this.dispose()),
114-
reader.onPartialMessage((info) => this._onPartialMessage.fire(info)),
115-
reader.listen((msg) => {
116-
this._listeners.fire(msg as rpc.NotificationMessage);
117-
}),
118-
);
115+
reader.listen((msg) => {
116+
this._callback(msg as rpc.NotificationMessage);
117+
});
118+
this._disposables.push(reader);
119+
reader.onClose(() => {
120+
this.remove(reader);
121+
if (this._readers.length === 0) {
122+
this._onClose.fire();
123+
}
124+
});
125+
reader.onError((e) => {
126+
this.remove(reader);
127+
this._onError.fire(e);
128+
});
119129
}
120130

121-
error(error: Error): void {
122-
this._onError.fire(error);
131+
remove(reader: rpc.MessageReader): void {
132+
const found = this._readers.find((r) => r === reader);
133+
if (found) {
134+
this._readers = this._readers.filter((r) => r !== reader);
135+
reader.dispose();
136+
}
123137
}
124138

125139
dispose(): void {
126-
this._onClose.fire();
127-
this._disposables.forEach((disposable) => {
128-
try {
129-
disposable.dispose();
130-
} catch (e) {
131-
/* noop */
132-
}
133-
});
140+
this._readers.forEach((r) => r.dispose());
141+
this._readers = [];
142+
this._disposables.forEach((disposable) => disposable.dispose());
143+
this._disposables = [];
134144
}
135145
}
136146

@@ -168,8 +178,9 @@ export async function createReaderPipe(pipeName: string, token?: CancellationTok
168178
}
169179
// mac/linux implementation of FIFO
170180
await mkfifo(pipeName);
171-
const reader = fs.createReadStream(pipeName, {
172-
encoding: 'utf-8',
173-
});
181+
try {
182+
await fs.chmod(pipeName, 0o666);
183+
} catch {}
184+
const reader = fs.createReadStream(pipeName, { encoding: 'utf-8' });
174185
return new rpc.StreamMessageReader(reader, 'utf-8');
175186
}

src/client/testing/testController/common/utils.ts

Lines changed: 49 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -229,44 +229,47 @@ export async function startRunResultNamedPipe(
229229
dataReceivedCallback: (payload: ExecutionTestPayload) => void,
230230
deferredTillServerClose: Deferred<void>,
231231
cancellationToken?: CancellationToken,
232-
): Promise<{ name: string } & Disposable> {
232+
): Promise<string> {
233233
traceVerbose('Starting Test Result named pipe');
234-
const pipeName: string = '/Users/eleanorboyd/testingFiles/inc_dec_example/temp.txt'; // generateRandomPipeName('python-test-results');
234+
const pipeName: string = generateRandomPipeName('python-test-results');
235235

236-
let disposeOfServer: () => void = () => {
237-
deferredTillServerClose.resolve();
238-
/* noop */
239-
};
240236
const reader = await createReaderPipe(pipeName, cancellationToken);
241-
traceVerbose(`Test Discovery named pipe ${pipeName} connected`);
242-
let perConnectionDisposables: (Disposable | undefined)[] = [reader];
243-
244-
// create a function to dispose of the server
245-
disposeOfServer = () => {
246-
// dispose of all data listeners and cancelation listeners
247-
perConnectionDisposables.forEach((d) => d?.dispose());
248-
perConnectionDisposables = [];
237+
traceVerbose(`Test Results named pipe ${pipeName} connected`);
238+
let disposables: Disposable[] = [];
239+
const disposable = new Disposable(() => {
240+
traceVerbose(`Test Results named pipe ${pipeName} disposed`);
241+
disposables.forEach((d) => d.dispose());
242+
disposables = [];
249243
deferredTillServerClose.resolve();
250-
};
251-
perConnectionDisposables.push(
252-
cancellationToken?.onCancellationRequested(() => {
253-
console.log(`Test Result named pipe ${pipeName} cancelled`);
254-
// if cancel is called on one connection, dispose of all connections
255-
disposeOfServer();
256-
}),
244+
});
245+
246+
if (cancellationToken) {
247+
disposables.push(
248+
cancellationToken?.onCancellationRequested(() => {
249+
console.log(`Test Result named pipe ${pipeName} cancelled`);
250+
disposable.dispose();
251+
}),
252+
);
253+
}
254+
disposables.push(
255+
reader,
257256
reader.listen((data: Message) => {
258257
traceVerbose(`Test Result named pipe ${pipeName} received data`);
259258
// if EOT, call decrement connection count (callback)
260259
dataReceivedCallback((data as ExecutionResultMessage).params as ExecutionTestPayload | EOTTestPayload);
261260
}),
261+
reader.onClose(() => {
262+
// this is called once the server close, once per run instance
263+
traceVerbose(`Test Result named pipe ${pipeName} closed. Disposing of listener/s.`);
264+
// dispose of all data listeners and cancelation listeners
265+
disposable.dispose();
266+
}),
267+
reader.onError((error) => {
268+
traceError(`Test Results named pipe ${pipeName} error:`, error);
269+
}),
262270
);
263-
reader.onClose(() => {
264-
// this is called once the server close, once per run instance
265-
traceVerbose(`Test Result named pipe ${pipeName} closed. Disposing of listener/s.`);
266-
// dispose of all data listeners and cancelation listeners
267-
disposeOfServer();
268-
});
269-
return { name: pipeName, dispose: disposeOfServer };
271+
272+
return pipeName;
270273
}
271274

272275
interface DiscoveryResultMessage extends Message {
@@ -276,46 +279,45 @@ interface DiscoveryResultMessage extends Message {
276279
export async function startDiscoveryNamedPipe(
277280
callback: (payload: DiscoveredTestPayload) => void,
278281
cancellationToken?: CancellationToken,
279-
): Promise<{ name: string } & Disposable> {
282+
): Promise<string> {
280283
traceVerbose('Starting Test Discovery named pipe');
281284
// const pipeName: string = '/Users/eleanorboyd/testingFiles/inc_dec_example/temp33.txt';
282285
const pipeName: string = generateRandomPipeName('python-test-discovery');
283-
let dispose: () => void = () => {
284-
/* noop */
285-
};
286286
const reader = await createReaderPipe(pipeName, cancellationToken);
287287

288-
reader.listen((data: Message) => {
289-
traceVerbose(`Test Discovery named pipe ${pipeName} received data`);
290-
callback((data as DiscoveryResultMessage).params as DiscoveredTestPayload | EOTTestPayload);
291-
});
292288
traceVerbose(`Test Discovery named pipe ${pipeName} connected`);
293-
let disposables: (Disposable | undefined)[] = [reader];
294-
dispose = () => {
289+
let disposables: Disposable[] = [];
290+
const disposable = new Disposable(() => {
295291
traceVerbose(`Test Discovery named pipe ${pipeName} disposed`);
296-
disposables.forEach((d) => d?.dispose());
292+
disposables.forEach((d) => d.dispose());
297293
disposables = [];
298-
};
294+
});
295+
296+
if (cancellationToken) {
297+
disposables.push(
298+
cancellationToken.onCancellationRequested(() => {
299+
traceVerbose(`Test Discovery named pipe ${pipeName} cancelled`);
300+
disposable.dispose();
301+
}),
302+
);
303+
}
304+
299305
disposables.push(
300-
cancellationToken?.onCancellationRequested(() => {
301-
traceVerbose(`Test Discovery named pipe ${pipeName} cancelled`);
302-
dispose();
303-
}),
306+
reader,
304307
reader.listen((data: Message) => {
305308
traceVerbose(`Test Discovery named pipe ${pipeName} received data`);
306309
callback((data as DiscoveryResultMessage).params as DiscoveredTestPayload | EOTTestPayload);
307310
}),
308311
reader.onClose(() => {
309312
callback(createEOTPayload(false));
310313
traceVerbose(`Test Discovery named pipe ${pipeName} closed`);
311-
dispose();
314+
disposable.dispose();
312315
}),
313316
reader.onError((error) => {
314317
traceError(`Test Discovery named pipe ${pipeName} error:`, error);
315-
dispose();
316318
}),
317319
);
318-
return { name: pipeName, dispose };
320+
return pipeName;
319321
}
320322

321323
export async function startTestIdServer(testIds: string[]): Promise<number> {

src/client/testing/testController/pytest/pytestDiscoveryAdapter.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,14 @@ export class PytestTestDiscoveryAdapter implements ITestDiscoveryAdapter {
4141
executionFactory?: IPythonExecutionFactory,
4242
interpreter?: PythonEnvironment,
4343
): Promise<DiscoveredTestPayload> {
44-
const { name, dispose } = await startDiscoveryNamedPipe((data: DiscoveredTestPayload) => {
44+
const name = await startDiscoveryNamedPipe((data: DiscoveredTestPayload) => {
4545
this.resultResolver?.resolveDiscovery(data);
4646
});
4747

4848
try {
4949
await this.runPytestDiscovery(uri, name, executionFactory, interpreter);
5050
} finally {
51-
dispose();
51+
traceVerbose('donee');
5252
}
5353
// this is only a placeholder to handle function overloading until rewrite is finished
5454
const discoveryPayload: DiscoveredTestPayload = { cwd: uri.fsPath, status: 'success' };

src/client/testing/testController/pytest/pytestExecutionAdapter.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,13 @@ export class PytestTestExecutionAdapter implements ITestExecutionAdapter {
4848
traceError(`No run instance found, cannot resolve execution, for workspace ${uri.fsPath}.`);
4949
}
5050
};
51-
const { name, dispose: serverDispose } = await utils.startRunResultNamedPipe(
51+
const cSource = new CancellationTokenSource();
52+
runInstance?.token.onCancellationRequested(() => cSource.cancel());
53+
54+
const name = await utils.startRunResultNamedPipe(
5255
dataReceivedCallback, // callback to handle data received
5356
deferredTillServerClose, // deferred to resolve when server closes
54-
runInstance?.token, // token to cancel
57+
cSource.token, // token to cancel
5558
);
5659
runInstance?.token.onCancellationRequested(() => {
5760
traceInfo(`Test run cancelled, resolving 'TillServerClose' deferred for ${uri.fsPath}.`);
@@ -71,7 +74,8 @@ export class PytestTestExecutionAdapter implements ITestExecutionAdapter {
7174
uri,
7275
testIds,
7376
name,
74-
serverDispose,
77+
deferredTillEOT,
78+
cSource,
7579
runInstance,
7680
profileKind,
7781
executionFactory,
@@ -169,7 +173,8 @@ export class PytestTestExecutionAdapter implements ITestExecutionAdapter {
169173
};
170174
traceInfo(`Running DEBUG pytest with arguments: ${testArgs} for workspace ${uri.fsPath} \r\n`);
171175
await debugLauncher!.launchDebugger(launchOptions, () => {
172-
serverDispose(); // this will resolve deferredTillServerClose
176+
serverCancel.cancel();
177+
deferredTillEOT?.resolve();
173178
});
174179
} else {
175180
// deferredTillExecClose is resolved when all stdout and stderr is read
@@ -233,10 +238,12 @@ export class PytestTestExecutionAdapter implements ITestExecutionAdapter {
233238
}
234239
// this doesn't work, it instead directs us to the noop one which is defined first
235240
// potentially this is due to the server already being close, if this is the case?
236-
serverDispose(); // this will resolve deferredTillServerClose
237241
}
242+
243+
// deferredTillEOT is resolved when all data sent on stdout and stderr is received, close event is only called when this occurs
238244
// due to the sync reading of the output.
239245
deferredTillExecClose.resolve();
246+
serverCancel.cancel();
240247
});
241248
await deferredTillExecClose.promise;
242249
}

0 commit comments

Comments
 (0)