Skip to content

Commit dde5a4f

Browse files
committed
Add dedicated protocol for managed sockets (mainThreadManagedSockets and extHostManagedSockets)
1 parent 1439f53 commit dde5a4f

14 files changed

+365
-280
lines changed

src/vs/platform/remote/common/remoteSocketFactoryService.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
* Licensed under the MIT License. See License.txt in the project root for license information.
44
*--------------------------------------------------------------------------------------------*/
55

6+
import { IDisposable, toDisposable } from 'vs/base/common/lifecycle';
67
import { ISocket } from 'vs/base/parts/ipc/common/ipc.net';
78
import { createDecorator } from 'vs/platform/instantiation/common/instantiation';
89
import { RemoteConnectionOfType, RemoteConnectionType, RemoteConnection } from 'vs/platform/remote/common/remoteAuthorityResolver';
@@ -18,7 +19,7 @@ export interface IRemoteSocketFactoryService {
1819
* @param factory function that returns the socket factory, or undefined if
1920
* it can't handle the data.
2021
*/
21-
register<T extends RemoteConnectionType>(type: T, factory: ISocketFactory<T>): void;
22+
register<T extends RemoteConnectionType>(type: T, factory: ISocketFactory<T>): IDisposable;
2223

2324
connect(connectTo: RemoteConnection, path: string, query: string, debugLabel: string): Promise<ISocket>;
2425
}
@@ -33,9 +34,15 @@ export class RemoteSocketFactoryService implements IRemoteSocketFactoryService {
3334

3435
private readonly factories: { [T in RemoteConnectionType]?: ISocketFactory<T>[] } = {};
3536

36-
public register<T extends RemoteConnectionType>(type: T, factory: ISocketFactory<T>): void {
37+
public register<T extends RemoteConnectionType>(type: T, factory: ISocketFactory<T>): IDisposable {
3738
this.factories[type] ??= [];
3839
this.factories[type]!.push(factory);
40+
return toDisposable(() => {
41+
const idx = this.factories[type]?.indexOf(factory);
42+
if (typeof idx === 'number' && idx >= 0) {
43+
this.factories[type]?.splice(idx, 1);
44+
}
45+
});
3946
}
4047

4148
private getSocketFactory<T extends RemoteConnectionType>(messagePassing: RemoteConnectionOfType<T>): ISocketFactory<T> | undefined {

src/vs/workbench/api/browser/extensionHost.contribution.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import './mainThreadLanguageFeatures';
4343
import './mainThreadLanguages';
4444
import './mainThreadLogService';
4545
import './mainThreadMessageService';
46+
import './mainThreadManagedSockets';
4647
import './mainThreadOutputService';
4748
import './mainThreadProgress';
4849
import './mainThreadQuickDiff';

src/vs/workbench/api/browser/mainThreadExtensionService.ts

Lines changed: 30 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import { ILocalExtension } from 'vs/platform/extensionManagement/common/extensio
1616
import { areSameExtensions } from 'vs/platform/extensionManagement/common/extensionManagementUtil';
1717
import { ExtensionIdentifier, IExtensionDescription } from 'vs/platform/extensions/common/extensions';
1818
import { INotificationService } from 'vs/platform/notification/common/notification';
19-
import { IRemoteConnectionData, RemoteConnectionType } from 'vs/platform/remote/common/remoteAuthorityResolver';
19+
import { IRemoteConnectionData, ManagedRemoteConnection, RemoteConnection, RemoteConnectionType, ResolvedAuthority, WebSocketRemoteConnection } from 'vs/platform/remote/common/remoteAuthorityResolver';
2020
import { ExtHostContext, ExtHostExtensionServiceShape, MainContext, MainThreadExtensionServiceShape } from 'vs/workbench/api/common/extHost.protocol';
2121
import { IExtension, IExtensionsWorkbenchService } from 'vs/workbench/contrib/extensions/common/extensions';
2222
import { IWorkbenchEnvironmentService } from 'vs/workbench/services/environment/common/environmentService';
@@ -25,7 +25,8 @@ import { ExtensionHostKind } from 'vs/workbench/services/extensions/common/exten
2525
import { IExtensionDescriptionDelta } from 'vs/workbench/services/extensions/common/extensionHostProtocol';
2626
import { IExtensionHostProxy, IResolveAuthorityResult } from 'vs/workbench/services/extensions/common/extensionHostProxy';
2727
import { ActivationKind, ExtensionActivationReason, IExtensionService, IInternalExtensionService, MissingExtensionDependency } from 'vs/workbench/services/extensions/common/extensions';
28-
import { extHostNamedCustomer, IExtHostContext, IInternalExtHostContext, IManagedSocketCallbacks } from 'vs/workbench/services/extensions/common/extHostCustomers';
28+
import { extHostNamedCustomer, IExtHostContext, IInternalExtHostContext } from 'vs/workbench/services/extensions/common/extHostCustomers';
29+
import { Dto } from 'vs/workbench/services/extensions/common/proxyIdentifier';
2930
import { IHostService } from 'vs/workbench/services/host/browser/host';
3031
import { ITimerService } from 'vs/workbench/services/timer/browser/timerService';
3132

@@ -34,7 +35,6 @@ export class MainThreadExtensionService implements MainThreadExtensionServiceSha
3435

3536
private readonly _extensionHostKind: ExtensionHostKind;
3637
private readonly _internalExtensionService: IInternalExtensionService;
37-
private readonly _managedSocketCallbacks: IManagedSocketCallbacks;
3838

3939
constructor(
4040
extHostContext: IExtHostContext,
@@ -51,7 +51,6 @@ export class MainThreadExtensionService implements MainThreadExtensionServiceSha
5151

5252
const internalExtHostContext = (<IInternalExtHostContext>extHostContext);
5353
this._internalExtensionService = internalExtHostContext.internalExtensionService;
54-
this._managedSocketCallbacks = internalExtHostContext.managedSocketCallbacks;
5554
internalExtHostContext._setExtensionHostProxy(
5655
new ExtensionHostProxy(extHostContext.getProxy(ExtHostContext.ExtHostExtensionService))
5756
);
@@ -61,18 +60,6 @@ export class MainThreadExtensionService implements MainThreadExtensionServiceSha
6160
public dispose(): void {
6261
}
6362

64-
$onDidRemoteSocketHaveData(id: number, data: VSBuffer): void {
65-
this._managedSocketCallbacks.onDidRemoteSocketHaveData(id, data);
66-
}
67-
68-
$onDidRemoteSocketClose(id: number, error: string | undefined): void {
69-
this._managedSocketCallbacks.onDidRemoteSocketClose(id, error ? new Error(error) : undefined);
70-
}
71-
72-
$onDidRemoteSocketEnd(id: number): void {
73-
this._managedSocketCallbacks.onDidRemoteSocketEnd(id);
74-
}
75-
7663
$getExtension(extensionId: string) {
7764
return this._extensionService.getExtension(extensionId);
7865
}
@@ -214,13 +201,7 @@ class ExtensionHostProxy implements IExtensionHostProxy {
214201
) { }
215202

216203
async resolveAuthority(remoteAuthority: string, resolveAttempt: number): Promise<IResolveAuthorityResult> {
217-
const resolved = await this._actual.$resolveAuthority(remoteAuthority, resolveAttempt);
218-
if (resolved.type === 'ok') {
219-
resolved.value.authority.toString = function () {
220-
return this.connectTo.type === RemoteConnectionType.Managed ? `ManagedSocket#${this.connectTo.id}` : `${this.connectTo.host}:${this.connectTo.type}`;
221-
};
222-
}
223-
204+
const resolved = reviveResolveAuthorityResult(await this._actual.$resolveAuthority(remoteAuthority, resolveAttempt));
224205
return resolved;
225206
}
226207
async getCanonicalURI(remoteAuthority: string, uri: URI): Promise<URI | null> {
@@ -257,16 +238,32 @@ class ExtensionHostProxy implements IExtensionHostProxy {
257238
test_down(size: number): Promise<VSBuffer> {
258239
return this._actual.$test_down(size);
259240
}
260-
openRemoteSocket(factoryId: number): Promise<number> {
261-
return this._actual.$openRemoteSocket(factoryId);
262-
}
263-
remoteSocketWrite(socketId: number, buffer: VSBuffer): void {
264-
return this._actual.$remoteSocketWrite(socketId, buffer);
265-
}
266-
remoteSocketEnd(socketId: number): void {
267-
return this._actual.$remoteSocketEnd(socketId);
241+
}
242+
243+
function reviveResolveAuthorityResult(result: Dto<IResolveAuthorityResult>): IResolveAuthorityResult {
244+
if (result.type === 'ok') {
245+
return {
246+
type: 'ok',
247+
value: {
248+
...result.value,
249+
authority: reviveResolvedAuthority(result.value.authority),
250+
}
251+
};
252+
} else {
253+
return result;
268254
}
269-
remoteSocketDrain(socketId: number): Promise<void> {
270-
return this._actual.$remoteSocketDrain(socketId);
255+
}
256+
257+
function reviveResolvedAuthority(resolvedAuthority: Dto<ResolvedAuthority>): ResolvedAuthority {
258+
return {
259+
...resolvedAuthority,
260+
connectTo: reviveConnection(resolvedAuthority.connectTo),
261+
};
262+
}
263+
264+
function reviveConnection(connection: Dto<RemoteConnection>): RemoteConnection {
265+
if (connection.type === RemoteConnectionType.WebSocket) {
266+
return new WebSocketRemoteConnection(connection.host, connection.port);
271267
}
268+
return new ManagedRemoteConnection(connection.id);
272269
}
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*---------------------------------------------------------------------------------------------
2+
* Copyright (c) Microsoft Corporation. All rights reserved.
3+
* Licensed under the MIT License. See License.txt in the project root for license information.
4+
*--------------------------------------------------------------------------------------------*/
5+
6+
import { MainContext, ExtHostContext, MainThreadManagedSocketsShape, ExtHostManagedSocketsShape } from 'vs/workbench/api/common/extHost.protocol';
7+
import { extHostNamedCustomer, IExtHostContext } from 'vs/workbench/services/extensions/common/extHostCustomers';
8+
import { Disposable, DisposableStore, IDisposable } from 'vs/base/common/lifecycle';
9+
import { ManagedRemoteConnection, RemoteConnectionType } from 'vs/platform/remote/common/remoteAuthorityResolver';
10+
import { VSBuffer } from 'vs/base/common/buffer';
11+
import { IRemoteSocketFactoryService, ISocketFactory } from 'vs/platform/remote/common/remoteSocketFactoryService';
12+
import { ISocket, SocketCloseEvent, SocketCloseEventType, SocketDiagnostics, SocketDiagnosticsEventType } from 'vs/base/parts/ipc/common/ipc.net';
13+
import { Emitter, Event } from 'vs/base/common/event';
14+
import { makeRawSocketHeaders, socketRawEndHeaderSequence } from 'vs/platform/remote/common/managedSocket';
15+
16+
@extHostNamedCustomer(MainContext.MainThreadManagedSockets)
17+
export class MainThreadManagedSockets extends Disposable implements MainThreadManagedSocketsShape {
18+
19+
private readonly _proxy: ExtHostManagedSocketsShape;
20+
private readonly _registrations = new Map<number, IDisposable>();
21+
private readonly _remoteSockets = new Map<number, RemoteSocketHalf>();
22+
23+
constructor(
24+
extHostContext: IExtHostContext,
25+
@IRemoteSocketFactoryService private readonly _remoteSocketFactoryService: IRemoteSocketFactoryService,
26+
) {
27+
super();
28+
this._proxy = extHostContext.getProxy(ExtHostContext.ExtHostManagedSockets);
29+
}
30+
31+
async $registerSocketFactory(socketFactoryId: number): Promise<void> {
32+
const that = this;
33+
const scoketFactory = new class implements ISocketFactory<RemoteConnectionType.Managed> {
34+
35+
supports(connectTo: ManagedRemoteConnection): boolean {
36+
return (connectTo.id === socketFactoryId);
37+
}
38+
39+
connect(connectTo: ManagedRemoteConnection, path: string, query: string, debugLabel: string): Promise<ISocket> {
40+
return new Promise<ISocket>((resolve, reject) => {
41+
if (connectTo.id !== socketFactoryId) {
42+
return reject(new Error('Invalid connectTo'));
43+
}
44+
45+
const factoryId = connectTo.id;
46+
that._proxy.$openRemoteSocket(factoryId).then(socketId => {
47+
const half: RemoteSocketHalf = {
48+
onClose: new Emitter(),
49+
onData: new Emitter(),
50+
onEnd: new Emitter(),
51+
};
52+
that._remoteSockets.set(socketId, half);
53+
54+
ManagedSocket.connect(socketId, that._proxy, path, query, debugLabel, half)
55+
.then(
56+
socket => {
57+
socket.onDidDispose(() => that._remoteSockets.delete(socketId));
58+
resolve(socket);
59+
},
60+
err => {
61+
that._remoteSockets.delete(socketId);
62+
reject(err);
63+
});
64+
}).catch(reject);
65+
});
66+
}
67+
};
68+
this._registrations.set(socketFactoryId, this._remoteSocketFactoryService.register(RemoteConnectionType.Managed, scoketFactory));
69+
70+
}
71+
72+
async $unregisterSocketFactory(socketFactoryId: number): Promise<void> {
73+
this._registrations.get(socketFactoryId)?.dispose();
74+
}
75+
76+
$onDidManagedSocketHaveData(socketId: number, data: VSBuffer): void {
77+
this._remoteSockets.get(socketId)?.onData.fire(data);
78+
}
79+
80+
$onDidManagedSocketClose(socketId: number, error: string | undefined): void {
81+
this._remoteSockets.get(socketId)?.onClose.fire({
82+
type: SocketCloseEventType.NodeSocketCloseEvent,
83+
error: error ? new Error(error) : undefined,
84+
hadError: !!error
85+
});
86+
this._remoteSockets.delete(socketId);
87+
}
88+
89+
$onDidManagedSocketEnd(socketId: number): void {
90+
this._remoteSockets.get(socketId)?.onEnd.fire();
91+
this._remoteSockets.delete(socketId);
92+
}
93+
}
94+
95+
interface RemoteSocketHalf {
96+
onData: Emitter<VSBuffer>;
97+
onClose: Emitter<SocketCloseEvent>;
98+
onEnd: Emitter<void>;
99+
}
100+
101+
export class ManagedSocket extends Disposable implements ISocket {
102+
public static connect(
103+
socketId: number,
104+
proxy: ExtHostManagedSocketsShape,
105+
path: string, query: string, debugLabel: string,
106+
107+
half: {
108+
onClose: Emitter<SocketCloseEvent>;
109+
onData: Emitter<VSBuffer>;
110+
onEnd: Emitter<void>;
111+
}
112+
): Promise<ManagedSocket> {
113+
const socket = new ManagedSocket(socketId, proxy, debugLabel, half.onClose, half.onData, half.onEnd);
114+
115+
socket.write(VSBuffer.fromString(makeRawSocketHeaders(path, query, debugLabel)));
116+
117+
const d = new DisposableStore();
118+
return new Promise<ManagedSocket>((resolve, reject) => {
119+
d.add(socket.onData(d => {
120+
if (d.indexOf(socketRawEndHeaderSequence) !== -1) {
121+
resolve(socket);
122+
}
123+
}));
124+
125+
d.add(socket.onClose(err => reject(err ?? new Error('socket closed'))));
126+
d.add(socket.onEnd(() => reject(new Error('socket ended'))));
127+
}).finally(() => d.dispose());
128+
}
129+
130+
public onData: Event<VSBuffer>;
131+
public onClose: Event<SocketCloseEvent>;
132+
public onEnd: Event<void>;
133+
134+
private readonly didDisposeEmitter = this._register(new Emitter<void>());
135+
public onDidDispose = this.didDisposeEmitter.event;
136+
137+
private ended = false;
138+
139+
private constructor(
140+
private readonly socketId: number,
141+
private readonly proxy: ExtHostManagedSocketsShape,
142+
private readonly debugLabel: string,
143+
onCloseEmitter: Emitter<SocketCloseEvent>,
144+
onDataEmitter: Emitter<VSBuffer>,
145+
onEndEmitter: Emitter<void>,
146+
) {
147+
super();
148+
this.onClose = this._register(onCloseEmitter).event;
149+
this.onData = this._register(onDataEmitter).event;
150+
this.onEnd = this._register(onEndEmitter).event;
151+
}
152+
153+
write(buffer: VSBuffer): void {
154+
this.proxy.$remoteSocketWrite(this.socketId, buffer);
155+
}
156+
157+
end(): void {
158+
this.ended = true;
159+
this.proxy.$remoteSocketEnd(this.socketId);
160+
}
161+
162+
drain(): Promise<void> {
163+
return this.proxy.$remoteSocketDrain(this.socketId);
164+
}
165+
166+
traceSocketEvent(type: SocketDiagnosticsEventType, data?: any): void {
167+
SocketDiagnostics.traceSocketEvent(this, this.debugLabel, type, data);
168+
}
169+
170+
override dispose(): void {
171+
if (!this.ended) {
172+
this.proxy.$remoteSocketEnd(this.socketId);
173+
}
174+
175+
this.didDisposeEmitter.fire();
176+
super.dispose();
177+
}
178+
}

src/vs/workbench/api/common/extHost.api.impl.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ import { ExtHostInteractiveEditor } from 'vs/workbench/api/common/extHostInterac
102102
import { ExtHostNotebookDocumentSaveParticipant } from 'vs/workbench/api/common/extHostNotebookDocumentSaveParticipant';
103103
import { ExtHostSemanticSimilarity } from 'vs/workbench/api/common/extHostSemanticSimilarity';
104104
import { ExtHostIssueReporter } from 'vs/workbench/api/common/extHostIssueReporter';
105+
import { IExtHostManagedSockets } from 'vs/workbench/api/common/extHostManagedSockets';
105106

106107
export interface IExtensionRegistries {
107108
mine: ExtensionDescriptionRegistry;
@@ -136,6 +137,7 @@ export function createApiFactoryAndRegisterActors(accessor: ServicesAccessor): I
136137
const extHostWindow = accessor.get(IExtHostWindow);
137138
const extHostSecretState = accessor.get(IExtHostSecretState);
138139
const extHostEditorTabs = accessor.get(IExtHostEditorTabs);
140+
const extHostManagedSockets = accessor.get(IExtHostManagedSockets);
139141

140142
// register addressable instances
141143
rpcProtocol.set(ExtHostContext.ExtHostFileSystemInfo, extHostFileSystemInfo);
@@ -149,6 +151,7 @@ export function createApiFactoryAndRegisterActors(accessor: ServicesAccessor): I
149151
rpcProtocol.set(ExtHostContext.ExtHostSecretState, extHostSecretState);
150152
rpcProtocol.set(ExtHostContext.ExtHostTelemetry, extHostTelemetry);
151153
rpcProtocol.set(ExtHostContext.ExtHostEditorTabs, extHostEditorTabs);
154+
rpcProtocol.set(ExtHostContext.ExtHostManagedSockets, extHostManagedSockets);
152155

153156
// automatically create and register addressable instances
154157
const extHostDecorations = rpcProtocol.set(ExtHostContext.ExtHostDecorations, accessor.get(IExtHostDecorations));

src/vs/workbench/api/common/extHost.common.services.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import { ExtHostLoggerService } from 'vs/workbench/api/common/extHostLoggerServi
2727
import { ILoggerService } from 'vs/platform/log/common/log';
2828
import { ExtHostVariableResolverProviderService, IExtHostVariableResolverProvider } from 'vs/workbench/api/common/extHostVariableResolverService';
2929
import { ExtHostLocalizationService, IExtHostLocalizationService } from 'vs/workbench/api/common/extHostLocalizationService';
30+
import { ExtHostManagedSockets, IExtHostManagedSockets } from 'vs/workbench/api/common/extHostManagedSockets';
3031

3132
registerSingleton(IExtHostLocalizationService, ExtHostLocalizationService, InstantiationType.Delayed);
3233
registerSingleton(ILoggerService, ExtHostLoggerService, InstantiationType.Delayed);
@@ -37,6 +38,7 @@ registerSingleton(IExtHostConsumerFileSystem, ExtHostConsumerFileSystem, Instant
3738
registerSingleton(IExtHostDebugService, WorkerExtHostDebugService, InstantiationType.Eager);
3839
registerSingleton(IExtHostDecorations, ExtHostDecorations, InstantiationType.Eager);
3940
registerSingleton(IExtHostDocumentsAndEditors, ExtHostDocumentsAndEditors, InstantiationType.Eager);
41+
registerSingleton(IExtHostManagedSockets, ExtHostManagedSockets, InstantiationType.Eager);
4042
registerSingleton(IExtHostFileSystemInfo, ExtHostFileSystemInfo, InstantiationType.Eager);
4143
registerSingleton(IExtHostOutputService, ExtHostOutputService, InstantiationType.Delayed);
4244
registerSingleton(IExtHostSearch, ExtHostSearch, InstantiationType.Eager);

0 commit comments

Comments
 (0)