Skip to content

Commit 10d0018

Browse files
authored
Merge pull request microsoft#138287 from microsoft/alex/connection-improvements
Improve remote connection
2 parents 785e46c + 9415001 commit 10d0018

File tree

6 files changed

+156
-11
lines changed

6 files changed

+156
-11
lines changed

src/vs/base/parts/ipc/common/ipc.net.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -778,6 +778,7 @@ export class PersistentProtocol implements IMessagePassingProtocol {
778778
private _incomingAckTimeout: any | null;
779779

780780
private _lastReplayRequestTime: number;
781+
private _lastSocketTimeoutTime: number;
781782

782783
private _socket: ISocket;
783784
private _socketWriter: ProtocolWriter;
@@ -819,6 +820,7 @@ export class PersistentProtocol implements IMessagePassingProtocol {
819820
this._incomingAckTimeout = null;
820821

821822
this._lastReplayRequestTime = 0;
823+
this._lastSocketTimeoutTime = Date.now();
822824

823825
this._socketDisposables = [];
824826
this._socket = socket;
@@ -887,6 +889,7 @@ export class PersistentProtocol implements IMessagePassingProtocol {
887889
this._socket.dispose();
888890

889891
this._lastReplayRequestTime = 0;
892+
this._lastSocketTimeoutTime = Date.now();
890893

891894
this._socket = socket;
892895
this._socketWriter = new ProtocolWriter(this._socket);
@@ -901,6 +904,12 @@ export class PersistentProtocol implements IMessagePassingProtocol {
901904
public endAcceptReconnection(): void {
902905
this._isReconnecting = false;
903906

907+
// After a reconnection, let the other party know (again) which messages have been received.
908+
// (perhaps the other party didn't receive a previous ACK)
909+
this._incomingAckId = this._incomingMsgId;
910+
const msg = new ProtocolMessage(ProtocolMessageType.Ack, 0, this._incomingAckId, getEmptyBuffer());
911+
this._socketWriter.write(msg);
912+
904913
// Send again all unacknowledged messages
905914
const toSend = this._outgoingUnackMsg.toArray();
906915
for (let i = 0, len = toSend.length; i < len; i++) {
@@ -1057,26 +1066,36 @@ export class PersistentProtocol implements IMessagePassingProtocol {
10571066
const oldestUnacknowledgedMsg = this._outgoingUnackMsg.peek()!;
10581067
const timeSinceOldestUnacknowledgedMsg = Date.now() - oldestUnacknowledgedMsg.writtenTime;
10591068
const timeSinceLastReceivedSomeData = Date.now() - this._socketReader.lastReadTime;
1069+
const timeSinceLastTimeout = Date.now() - this._lastSocketTimeoutTime;
10601070

10611071
if (
10621072
timeSinceOldestUnacknowledgedMsg >= ProtocolConstants.TimeoutTime
10631073
&& timeSinceLastReceivedSomeData >= ProtocolConstants.TimeoutTime
1074+
&& timeSinceLastTimeout >= ProtocolConstants.TimeoutTime
10641075
) {
10651076
// It's been a long time since our sent message was acknowledged
10661077
// and a long time since we received some data
10671078

10681079
// But this might be caused by the event loop being busy and failing to read messages
10691080
if (!this._loadEstimator.hasHighLoad()) {
10701081
// Trash the socket
1082+
this._lastSocketTimeoutTime = Date.now();
10711083
this._onSocketTimeout.fire(undefined);
10721084
return;
10731085
}
10741086
}
10751087

1088+
const minimumTimeUntilTimeout = Math.max(
1089+
ProtocolConstants.TimeoutTime - timeSinceOldestUnacknowledgedMsg,
1090+
ProtocolConstants.TimeoutTime - timeSinceLastReceivedSomeData,
1091+
ProtocolConstants.TimeoutTime - timeSinceLastTimeout,
1092+
500
1093+
);
1094+
10761095
this._outgoingAckTimeout = setTimeout(() => {
10771096
this._outgoingAckTimeout = null;
10781097
this._recvAckCheck();
1079-
}, Math.max(ProtocolConstants.TimeoutTime - timeSinceOldestUnacknowledgedMsg, 500));
1098+
}, minimumTimeUntilTimeout);
10801099
}
10811100

10821101
private _sendAck(): void {

src/vs/base/parts/ipc/test/node/ipc.net.test.ts

Lines changed: 126 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { createServer, Socket } from 'net';
99
import { tmpdir } from 'os';
1010
import { Barrier, timeout } from 'vs/base/common/async';
1111
import { VSBuffer } from 'vs/base/common/buffer';
12-
import { Emitter } from 'vs/base/common/event';
12+
import { Emitter, Event } from 'vs/base/common/event';
1313
import { Disposable, DisposableStore } from 'vs/base/common/lifecycle';
1414
import { ILoadEstimator, PersistentProtocol, Protocol, ProtocolConstants, SocketCloseEvent, SocketDiagnosticsEventType } from 'vs/base/parts/ipc/common/ipc.net';
1515
import { createRandomIPCHandle, createStaticIPCHandle, NodeSocket, WebSocketNodeSocket } from 'vs/base/parts/ipc/node/ipc.net';
@@ -90,21 +90,25 @@ class Ether {
9090
return <any>this._b;
9191
}
9292

93-
constructor() {
93+
constructor(
94+
private readonly _wireLatency = 0
95+
) {
9496
this._a = new EtherStream(this, 'a');
9597
this._b = new EtherStream(this, 'b');
9698
this._ab = [];
9799
this._ba = [];
98100
}
99101

100102
public write(from: 'a' | 'b', data: Buffer): void {
101-
if (from === 'a') {
102-
this._ab.push(data);
103-
} else {
104-
this._ba.push(data);
105-
}
103+
setTimeout(() => {
104+
if (from === 'a') {
105+
this._ab.push(data);
106+
} else {
107+
this._ba.push(data);
108+
}
106109

107-
setTimeout(() => this._deliver(), 0);
110+
setTimeout(() => this._deliver(), 0);
111+
}, this._wireLatency);
108112
}
109113

110114
private _deliver(): void {
@@ -365,6 +369,120 @@ suite('PersistentProtocol reconnection', () => {
365369
);
366370
});
367371

372+
test('acks are always sent after a reconnection', async () => {
373+
await runWithFakedTimers(
374+
{
375+
useFakeTimers: true,
376+
useSetImmediate: true,
377+
maxTaskCount: 1000
378+
},
379+
async () => {
380+
381+
const loadEstimator: ILoadEstimator = {
382+
hasHighLoad: () => false
383+
};
384+
const wireLatency = 1000;
385+
const ether = new Ether(wireLatency);
386+
const aSocket = new NodeSocket(ether.a);
387+
const a = new PersistentProtocol(aSocket, null, loadEstimator);
388+
const aMessages = new MessageStream(a);
389+
const bSocket = new NodeSocket(ether.b);
390+
const b = new PersistentProtocol(bSocket, null, loadEstimator);
391+
const bMessages = new MessageStream(b);
392+
393+
// send message a1 to have something unacknowledged
394+
a.send(VSBuffer.fromString('a1'));
395+
assert.strictEqual(a.unacknowledgedCount, 1);
396+
assert.strictEqual(b.unacknowledgedCount, 0);
397+
398+
// read message a1 at B
399+
const a1 = await bMessages.waitForOne();
400+
assert.strictEqual(a1.toString(), 'a1');
401+
assert.strictEqual(a.unacknowledgedCount, 1);
402+
assert.strictEqual(b.unacknowledgedCount, 0);
403+
404+
// wait for B to send an ACK message,
405+
// but resume before A receives it
406+
await timeout(ProtocolConstants.AcknowledgeTime + wireLatency / 2);
407+
assert.strictEqual(a.unacknowledgedCount, 1);
408+
assert.strictEqual(b.unacknowledgedCount, 0);
409+
410+
// simulate complete reconnection
411+
aSocket.dispose();
412+
bSocket.dispose();
413+
const ether2 = new Ether(wireLatency);
414+
const aSocket2 = new NodeSocket(ether2.a);
415+
const bSocket2 = new NodeSocket(ether2.b);
416+
b.beginAcceptReconnection(bSocket2, null);
417+
b.endAcceptReconnection();
418+
a.beginAcceptReconnection(aSocket2, null);
419+
a.endAcceptReconnection();
420+
421+
// wait for quite some time
422+
await timeout(2 * ProtocolConstants.AcknowledgeTime + wireLatency);
423+
assert.strictEqual(a.unacknowledgedCount, 0);
424+
assert.strictEqual(b.unacknowledgedCount, 0);
425+
426+
aMessages.dispose();
427+
bMessages.dispose();
428+
a.dispose();
429+
b.dispose();
430+
}
431+
);
432+
});
433+
434+
test('onSocketTimeout is emitted at most once every 20s', async () => {
435+
await runWithFakedTimers(
436+
{
437+
useFakeTimers: true,
438+
useSetImmediate: true,
439+
maxTaskCount: 1000
440+
},
441+
async () => {
442+
443+
const loadEstimator: ILoadEstimator = {
444+
hasHighLoad: () => false
445+
};
446+
const ether = new Ether();
447+
const aSocket = new NodeSocket(ether.a);
448+
const a = new PersistentProtocol(aSocket, null, loadEstimator);
449+
const aMessages = new MessageStream(a);
450+
const bSocket = new NodeSocket(ether.b);
451+
const b = new PersistentProtocol(bSocket, null, loadEstimator);
452+
const bMessages = new MessageStream(b);
453+
454+
// never receive acks
455+
b.pauseSocketWriting();
456+
457+
// send message a1 to have something unacknowledged
458+
a.send(VSBuffer.fromString('a1'));
459+
460+
// wait for the first timeout to fire
461+
await Event.toPromise(a.onSocketTimeout);
462+
463+
let timeoutFiredAgain = false;
464+
const timeoutListener = a.onSocketTimeout(() => {
465+
timeoutFiredAgain = true;
466+
});
467+
468+
// send more messages
469+
a.send(VSBuffer.fromString('a2'));
470+
a.send(VSBuffer.fromString('a3'));
471+
472+
// wait for 10s
473+
await timeout(ProtocolConstants.TimeoutTime / 2);
474+
475+
assert.strictEqual(timeoutFiredAgain, false);
476+
477+
timeoutListener.dispose();
478+
aMessages.dispose();
479+
bMessages.dispose();
480+
a.dispose();
481+
b.dispose();
482+
}
483+
);
484+
});
485+
368486
test('writing can be paused', async () => {
369487
await runWithFakedTimers({ useFakeTimers: true, maxTaskCount: 100 }, async () => {
370488
const loadEstimator: ILoadEstimator = {

src/vs/platform/remote/common/remoteAgentConnection.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ export abstract class PersistentConnection extends Disposable {
573573
}));
574574
this._register(protocol.onSocketTimeout(() => {
575575
const logPrefix = commonLogPrefix(this._connectionType, this.reconnectionToken, true);
576-
this._options.logService.trace(`${logPrefix} received socket timeout event.`);
576+
this._options.logService.info(`${logPrefix} received socket timeout event.`);
577577
this._beginReconnecting();
578578
}));
579579

src/vs/platform/remote/node/nodeSocketFactory.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ export const nodeSocketFactory = new class implements ISocketFactory {
2121
}
2222
const nonce = buffer.toString('base64');
2323

24-
let headers = [
24+
const headers = [
2525
`GET ws://${/:/.test(host) ? `[${host}]` : host}:${port}/?${query}&skipWebSocketFrames=true HTTP/1.1`,
2626
`Connection: Upgrade`,
2727
`Upgrade: websocket`,
@@ -39,6 +39,8 @@ export const nodeSocketFactory = new class implements ISocketFactory {
3939
};
4040
socket.on('data', onData);
4141
});
42+
// Disable Nagle's algorithm.
43+
socket.setNoDelay(true);
4244
socket.once('error', errorListener);
4345
}
4446
};

src/vs/server/remoteExtensionHostAgentServer.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,8 @@ export class RemoteExtensionHostAgentServer extends Disposable {
497497

498498
// Never timeout this socket due to inactivity!
499499
socket.setTimeout(0);
500+
// Disable Nagle's algorithm
501+
socket.setNoDelay(true);
500502
// Finally!
501503

502504
if (skipWebSocketFrames) {

src/vs/workbench/services/extensions/node/extensionHostProcessSetup.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ function _createExtHostProtocol(): Promise<PersistentProtocol> {
120120

121121
process.on('message', (msg: IExtHostSocketMessage | IExtHostReduceGraceTimeMessage, handle: net.Socket) => {
122122
if (msg && msg.type === 'VSCODE_EXTHOST_IPC_SOCKET') {
123+
// Disable Nagle's algorithm. We also do this on the server process,
124+
// but nodejs doesn't document if this option is transferred with the socket
125+
handle.setNoDelay(true);
126+
123127
const initialDataChunk = VSBuffer.wrap(Buffer.from(msg.initialDataChunk, 'base64'));
124128
let socket: NodeSocket | WebSocketNodeSocket;
125129
if (msg.skipWebSocketFrames) {

0 commit comments

Comments
 (0)