Skip to content

Commit f76fd4f

Browse files
committed
address pr comments
1 parent 364b84e commit f76fd4f

File tree

8 files changed

+163
-24
lines changed

8 files changed

+163
-24
lines changed

extensions/vscode-test-resolver/src/extension.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,8 @@ export function activate(context: vscode.ExtensionContext) {
196196
onDidReceiveMessage: dataEmitter.event,
197197
onDidClose: closeEmitter.event,
198198
onDidEnd: endEmitter.event,
199-
dataHandler: d => remoteSocket.write(d),
200-
endHandler: () => remoteSocket.end(),
199+
send: d => remoteSocket.write(d),
200+
end: () => remoteSocket.end(),
201201
};
202202
}, connectionToken));
203203
}

src/vs/base/common/event.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1171,6 +1171,10 @@ export class PauseableEmitter<T> extends Emitter<T> {
11711171
protected _eventQueue = new LinkedList<T>();
11721172
private _mergeFn?: (input: T[]) => T;
11731173

1174+
public get isPaused(): boolean {
1175+
return this._isPaused !== 0;
1176+
}
1177+
11741178
constructor(options?: EmitterOptions & { merge?: (input: T[]) => T }) {
11751179
super(options);
11761180
this._mergeFn = options?.merge;

src/vs/workbench/api/browser/mainThreadManagedSockets.ts

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { ManagedRemoteConnection, RemoteConnectionType } from 'vs/platform/remot
1010
import { VSBuffer } from 'vs/base/common/buffer';
1111
import { IRemoteSocketFactoryService, ISocketFactory } from 'vs/platform/remote/common/remoteSocketFactoryService';
1212
import { ISocket, SocketCloseEvent, SocketCloseEventType, SocketDiagnostics, SocketDiagnosticsEventType } from 'vs/base/parts/ipc/common/ipc.net';
13-
import { Emitter, Event } from 'vs/base/common/event';
13+
import { Emitter, Event, PauseableEmitter } from 'vs/base/common/event';
1414
import { makeRawSocketHeaders, socketRawEndHeaderSequence } from 'vs/platform/remote/common/managedSocket';
1515

1616
@extHostNamedCustomer(MainContext.MainThreadManagedSockets)
@@ -30,7 +30,7 @@ export class MainThreadManagedSockets extends Disposable implements MainThreadMa
3030

3131
async $registerSocketFactory(socketFactoryId: number): Promise<void> {
3232
const that = this;
33-
const scoketFactory = new class implements ISocketFactory<RemoteConnectionType.Managed> {
33+
const socketFactory = new class implements ISocketFactory<RemoteConnectionType.Managed> {
3434

3535
supports(connectTo: ManagedRemoteConnection): boolean {
3636
return (connectTo.id === socketFactoryId);
@@ -65,7 +65,7 @@ export class MainThreadManagedSockets extends Disposable implements MainThreadMa
6565
});
6666
}
6767
};
68-
this._registrations.set(socketFactoryId, this._remoteSocketFactoryService.register(RemoteConnectionType.Managed, scoketFactory));
68+
this._registrations.set(socketFactoryId, this._remoteSocketFactoryService.register(RemoteConnectionType.Managed, socketFactory));
6969

7070
}
7171

@@ -91,7 +91,7 @@ export class MainThreadManagedSockets extends Disposable implements MainThreadMa
9191
}
9292
}
9393

94-
interface RemoteSocketHalf {
94+
export interface RemoteSocketHalf {
9595
onData: Emitter<VSBuffer>;
9696
onClose: Emitter<SocketCloseEvent>;
9797
onEnd: Emitter<void>;
@@ -103,21 +103,36 @@ export class ManagedSocket extends Disposable implements ISocket {
103103
proxy: ExtHostManagedSocketsShape,
104104
path: string, query: string, debugLabel: string,
105105

106-
half: {
107-
onClose: Emitter<SocketCloseEvent>;
108-
onData: Emitter<VSBuffer>;
109-
onEnd: Emitter<void>;
110-
}
106+
half: RemoteSocketHalf
111107
): Promise<ManagedSocket> {
112108
const socket = new ManagedSocket(socketId, proxy, debugLabel, half.onClose, half.onData, half.onEnd);
113109

114110
socket.write(VSBuffer.fromString(makeRawSocketHeaders(path, query, debugLabel)));
115111

116112
const d = new DisposableStore();
117113
return new Promise<ManagedSocket>((resolve, reject) => {
114+
let dataSoFar: VSBuffer | undefined;
118115
d.add(socket.onData(d => {
119-
if (d.indexOf(socketRawEndHeaderSequence) !== -1) {
120-
resolve(socket);
116+
if (!dataSoFar) {
117+
dataSoFar = d;
118+
} else {
119+
dataSoFar = VSBuffer.concat([dataSoFar, d], dataSoFar.byteLength + d.byteLength);
120+
}
121+
122+
const index = dataSoFar.indexOf(socketRawEndHeaderSequence);
123+
if (index === -1) {
124+
return;
125+
}
126+
127+
resolve(socket);
128+
// pause data events until the socket consumer is hooked up. We may
129+
// immediately emit remaining data, but if not there may still be
130+
// microtasks queued which would fire data into the abyss.
131+
socket.pauseData();
132+
133+
const rest = dataSoFar.slice(index + socketRawEndHeaderSequence.byteLength);
134+
if (rest.byteLength) {
135+
half.onData.fire(rest);
121136
}
122137
}));
123138

@@ -126,7 +141,14 @@ export class ManagedSocket extends Disposable implements ISocket {
126141
}).finally(() => d.dispose());
127142
}
128143

129-
public onData: Event<VSBuffer>;
144+
private readonly pausableDataEmitter = this._register(new PauseableEmitter<VSBuffer>());
145+
146+
public onData: Event<VSBuffer> = (...args) => {
147+
if (this.pausableDataEmitter.isPaused) {
148+
queueMicrotask(() => this.pausableDataEmitter.resume());
149+
}
150+
return this.pausableDataEmitter.event(...args);
151+
};
130152
public onClose: Event<SocketCloseEvent>;
131153
public onEnd: Event<void>;
132154

@@ -144,11 +166,19 @@ export class ManagedSocket extends Disposable implements ISocket {
144166
onEndEmitter: Emitter<void>,
145167
) {
146168
super();
169+
170+
this._register(onDataEmitter);
171+
this._register(onDataEmitter.event(data => this.pausableDataEmitter.fire(data)));
172+
147173
this.onClose = this._register(onCloseEmitter).event;
148-
this.onData = this._register(onDataEmitter).event;
149174
this.onEnd = this._register(onEndEmitter).event;
150175
}
151176

177+
/** Pauses data events until a new listener comes in onData() */
178+
pauseData() {
179+
this.pausableDataEmitter.pause();
180+
}
181+
152182
write(buffer: VSBuffer): void {
153183
this.proxy.$remoteSocketWrite(this.socketId, buffer);
154184
}

src/vs/workbench/api/common/extHostManagedSockets.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,19 +71,19 @@ export class ExtHostManagedSockets implements IExtHostManagedSockets {
7171
}
7272

7373
$remoteSocketWrite(socketId: number, buffer: VSBuffer): void {
74-
this._managedRemoteSockets.get(socketId)?.actual.dataHandler(buffer.buffer);
74+
this._managedRemoteSockets.get(socketId)?.actual.send(buffer.buffer);
7575
}
7676

7777
$remoteSocketEnd(socketId: number): void {
7878
const socket = this._managedRemoteSockets.get(socketId);
7979
if (socket) {
80-
socket.actual.endHandler();
80+
socket.actual.end();
8181
socket.dispose();
8282
}
8383
}
8484

85-
$remoteSocketDrain(socketId: number): Promise<void> {
86-
return this._managedRemoteSockets.get(socketId)?.actual.drainHandler?.() ?? Promise.resolve();
85+
async $remoteSocketDrain(socketId: number): Promise<void> {
86+
await this._managedRemoteSockets.get(socketId)?.actual.drain?.();
8787
}
8888
}
8989

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*---------------------------------------------------------------------------------------------
2+
* Copyright (c) Microsoft Corporation. All rights reserved.
3+
* Licensed under the MIT License. See License.txt in the project root for license information.
4+
*--------------------------------------------------------------------------------------------*/
5+
6+
import * as assert from 'assert';
7+
import { disposableTimeout, timeout } from 'vs/base/common/async';
8+
import { VSBuffer } from 'vs/base/common/buffer';
9+
import { Emitter } from 'vs/base/common/event';
10+
import { DisposableStore } from 'vs/base/common/lifecycle';
11+
import { SocketCloseEvent } from 'vs/base/parts/ipc/common/ipc.net';
12+
import { mock } from 'vs/base/test/common/mock';
13+
import { ManagedSocket, RemoteSocketHalf } from 'vs/workbench/api/browser/mainThreadManagedSockets';
14+
import { ExtHostManagedSocketsShape } from 'vs/workbench/api/common/extHost.protocol';
15+
16+
suite('MainThreadManagedSockets', () => {
17+
18+
suite('ManagedSocket', () => {
19+
let extHost: ExtHostMock;
20+
let half: RemoteSocketHalf;
21+
22+
class ExtHostMock extends mock<ExtHostManagedSocketsShape>() {
23+
private onDidFire = new Emitter<void>();
24+
public readonly events: any[] = [];
25+
26+
override $remoteSocketWrite(socketId: number, buffer: VSBuffer): void {
27+
this.events.push({ socketId, data: buffer.toString() });
28+
this.onDidFire.fire();
29+
}
30+
31+
override $remoteSocketDrain(socketId: number) {
32+
this.events.push({ socketId, event: 'drain' });
33+
this.onDidFire.fire();
34+
return Promise.resolve();
35+
}
36+
37+
override $remoteSocketEnd(socketId: number) {
38+
this.events.push({ socketId, event: 'end' });
39+
this.onDidFire.fire();
40+
}
41+
42+
expectEvent(test: (evt: any) => void, message: string) {
43+
if (this.events.some(test)) {
44+
return;
45+
}
46+
47+
const d = new DisposableStore();
48+
return new Promise<void>(resolve => {
49+
d.add(this.onDidFire.event(() => {
50+
if (this.events.some(test)) {
51+
return;
52+
}
53+
}));
54+
d.add(disposableTimeout(() => {
55+
throw new Error(`Expected ${message} but only had ${JSON.stringify(this.events, null, 2)}`);
56+
}, 1000));
57+
}).finally(() => d.dispose());
58+
}
59+
}
60+
61+
setup(() => {
62+
extHost = new ExtHostMock();
63+
half = {
64+
onClose: new Emitter<SocketCloseEvent>(),
65+
onData: new Emitter<VSBuffer>(),
66+
onEnd: new Emitter<void>(),
67+
};
68+
});
69+
70+
async function doConnect() {
71+
const socket = ManagedSocket.connect(1, extHost, '/hello', 'world=true', '', half);
72+
await extHost.expectEvent(evt => evt.data && evt.data.startsWith('GET ws://localhost/hello?world=true&skipWebSocketFrames=true HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: websocket\r\nSec-WebSocket-Key:'), 'websocket open event');
73+
half.onData.fire(VSBuffer.fromString('Opened successfully ;)\r\n\r\n'));
74+
return await socket;
75+
}
76+
77+
test('connects', async () => {
78+
await doConnect();
79+
});
80+
81+
test('includes trailing connection data', async () => {
82+
const socketProm = ManagedSocket.connect(1, extHost, '/hello', 'world=true', '', half);
83+
await extHost.expectEvent(evt => evt.data && evt.data.includes('GET ws://localhost'), 'websocket open event');
84+
half.onData.fire(VSBuffer.fromString('Opened successfully ;)\r\n\r\nSome trailing data'));
85+
const socket = await socketProm;
86+
87+
const data: string[] = [];
88+
socket.onData(d => data.push(d.toString()));
89+
await timeout(1); // allow microtasks to flush
90+
assert.deepStrictEqual(data, ['Some trailing data']);
91+
});
92+
93+
test('round trips data', async () => {
94+
const socket = await doConnect();
95+
const data: string[] = [];
96+
socket.onData(d => data.push(d.toString()));
97+
98+
socket.write(VSBuffer.fromString('ping'));
99+
await extHost.expectEvent(evt => evt.data === 'ping', 'expected ping');
100+
half.onData.fire(VSBuffer.fromString("pong"));
101+
assert.deepStrictEqual(data, ['pong']);
102+
});
103+
});
104+
});

src/vs/workbench/services/environment/electron-sandbox/environmentService.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ export class NativeWorkbenchEnvironmentService extends AbstractNativeEnvironment
6363
get remoteAuthority() { return this.configuration.remoteAuthority; }
6464

6565
@memoize
66-
get expectsResolverExtension() { return !!this.configuration.remoteAuthority; }
66+
get expectsResolverExtension() { return !!this.configuration.remoteAuthority?.includes('+'); }
6767

6868
@memoize
6969
get execPath() { return this.configuration.execPath; }

src/vs/workbench/services/remote/common/remoteExtensionsScanner.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { IRemoteAgentService } from 'vs/workbench/services/remote/common/remoteA
77
import { IRemoteExtensionsScannerService, RemoteExtensionsScannerChannelName } from 'vs/platform/remote/common/remoteExtensionsScanner';
88
import * as platform from 'vs/base/common/platform';
99
import { IChannel } from 'vs/base/parts/ipc/common/ipc';
10-
import { IExtensionDescription, IRelaxedExtensionDescription } from 'vs/platform/extensions/common/extensions';
10+
import { ExtensionIdentifier, IExtensionDescription, IRelaxedExtensionDescription } from 'vs/platform/extensions/common/extensions';
1111
import { URI } from 'vs/base/common/uri';
1212
import { IUserDataProfileService } from 'vs/workbench/services/userDataProfile/common/userDataProfile';
1313
import { IRemoteUserDataProfilesService } from 'vs/workbench/services/userDataProfile/common/remoteUserDataProfiles';
@@ -45,6 +45,7 @@ class RemoteExtensionsScannerService implements IRemoteExtensionsScannerService
4545
const scannedExtensions = await channel.call<IRelaxedExtensionDescription[]>('scanExtensions', [platform.language, profileLocation, this.environmentService.extensionDevelopmentLocationURI, languagePack]);
4646
scannedExtensions.forEach((extension) => {
4747
extension.extensionLocation = URI.revive(extension.extensionLocation);
48+
extension.identifier = new ExtensionIdentifier(extension.identifier.value);
4849
});
4950
return scannedExtensions;
5051
},

src/vscode-dts/vscode.proposed.resolvers.d.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ declare module 'vscode' {
3131
onDidClose: Event<Error | undefined>;
3232
onDidEnd: Event<void>;
3333

34-
dataHandler: (data: Uint8Array) => void;
35-
endHandler: () => void;
36-
drainHandler?: () => void;
34+
send: (data: Uint8Array) => void;
35+
end: () => void;
36+
drain?: () => Thenable<void>;
3737
}
3838

3939
export class ManagedResolvedAuthority {

0 commit comments

Comments
 (0)