3
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
4
*--------------------------------------------------------------------------------------------*/
5
5
6
- import { IntervalTimer } from 'vs/base/common/async' ;
7
6
import { VSBuffer } from 'vs/base/common/buffer' ;
8
7
import { Emitter , Event } from 'vs/base/common/event' ;
9
8
import { Disposable , DisposableStore , IDisposable } from 'vs/base/common/lifecycle' ;
@@ -264,9 +263,7 @@ const enum ProtocolMessageType {
264
263
ReplayRequest = 6 ,
265
264
Pause = 7 ,
266
265
Resume = 8 ,
267
- KeepAlive = 9 ,
268
- LatencyMeasurementRequest = 10 ,
269
- LatencyMeasurementResponse = 11 ,
266
+ KeepAlive = 9
270
267
}
271
268
272
269
function protocolMessageTypeToString ( messageType : ProtocolMessageType ) {
@@ -280,8 +277,6 @@ function protocolMessageTypeToString(messageType: ProtocolMessageType) {
280
277
case ProtocolMessageType . Pause : return 'PauseWriting' ;
281
278
case ProtocolMessageType . Resume : return 'ResumeWriting' ;
282
279
case ProtocolMessageType . KeepAlive : return 'KeepAlive' ;
283
- case ProtocolMessageType . LatencyMeasurementRequest : return 'LatencyMeasurementRequest' ;
284
- case ProtocolMessageType . LatencyMeasurementResponse : return 'LatencyMeasurementResponse' ;
285
280
}
286
281
}
287
282
@@ -309,22 +304,6 @@ export const enum ProtocolConstants {
309
304
* Send a message every 5 seconds to avoid that the connection is closed by the OS.
310
305
*/
311
306
KeepAliveSendTime = 5000 , // 5 seconds
312
- /**
313
- * Measure the latency every 1 minute.
314
- */
315
- LatencySampleTime = 1 * 60 * 1000 , // 1 minute
316
- /**
317
- * Keep the last 5 samples for latency measurement.
318
- */
319
- LatencySampleCount = 5 ,
320
- /**
321
- * A latency over 1s will be considered high.
322
- */
323
- HighLatencyTimeThreshold = 1000 ,
324
- /**
325
- * Having 3 or more samples with high latency will trigger a high latency event.
326
- */
327
- HighLatencySampleThreshold = 3 ,
328
307
}
329
308
330
309
class ProtocolMessage {
@@ -803,52 +782,6 @@ export interface ILoadEstimator {
803
782
hasHighLoad ( ) : boolean ;
804
783
}
805
784
806
- export const enum ConnectionHealth {
807
- /**
808
- * The connection health is considered good when a certain number of recent round trip time measurements are below a certain threshold.
809
- * @see ProtocolConstants.HighLatencyTimeThreshold @see ProtocolConstants.HighLatencySampleThreshold
810
- */
811
- Good ,
812
- /**
813
- * The connection health is considered poor when a certain number of recent round trip time measurements are above a certain threshold.
814
- * @see ProtocolConstants.HighLatencyTimeThreshold @see ProtocolConstants.HighLatencySampleThreshold
815
- */
816
- Poor
817
- }
818
-
819
- export function connectionHealthToString ( connectionHealth : ConnectionHealth ) : 'good' | 'poor' {
820
- switch ( connectionHealth ) {
821
- case ConnectionHealth . Good : return 'good' ;
822
- case ConnectionHealth . Poor : return 'poor' ;
823
- }
824
- }
825
-
826
- /**
827
- * An event describing that the connection health has changed.
828
- */
829
- export class ConnectionHealthChangedEvent {
830
- constructor (
831
- public readonly connectionHealth : ConnectionHealth
832
- ) { }
833
- }
834
-
835
- /**
836
- * An event describing that a round trip time measurement was above a certain threshold.
837
- */
838
- export class HighRoundTripTimeEvent {
839
- constructor (
840
- /**
841
- * The round trip time in milliseconds.
842
- */
843
- public readonly roundTripTime : number ,
844
- /**
845
- * The number of recent round trip time measurements that were above the threshold.
846
- * @see ProtocolConstants.HighLatencyTimeThreshold @see ProtocolConstants.HighLatencySampleThreshold
847
- */
848
- public readonly recentHighRoundTripCount : number
849
- ) { }
850
- }
851
-
852
785
export interface PersistentProtocolOptions {
853
786
/**
854
787
* The socket to use.
@@ -862,10 +795,6 @@ export interface PersistentProtocolOptions {
862
795
* The CPU load estimator to use.
863
796
*/
864
797
loadEstimator ?: ILoadEstimator ;
865
- /**
866
- * Whether to measure round trip time. Defaults to false.
867
- */
868
- measureRoundTripTime ?: boolean ;
869
798
/**
870
799
* Whether to send keep alive messages. Defaults to true.
871
800
*/
@@ -898,11 +827,9 @@ export class PersistentProtocol implements IMessagePassingProtocol {
898
827
private _socket : ISocket ;
899
828
private _socketWriter : ProtocolWriter ;
900
829
private _socketReader : ProtocolReader ;
901
- private _socketLatencyMonitor : LatencyMonitor ;
902
830
private _socketDisposables : DisposableStore ;
903
831
904
832
private readonly _loadEstimator : ILoadEstimator ;
905
- private readonly _measureRoundTripTime : boolean ;
906
833
private readonly _shouldSendKeepAlive : boolean ;
907
834
908
835
private readonly _onControlMessage = new BufferedEmitter < VSBuffer > ( ) ;
@@ -920,19 +847,12 @@ export class PersistentProtocol implements IMessagePassingProtocol {
920
847
private readonly _onSocketTimeout = new BufferedEmitter < SocketTimeoutEvent > ( ) ;
921
848
readonly onSocketTimeout : Event < SocketTimeoutEvent > = this . _onSocketTimeout . event ;
922
849
923
- private readonly _onHighRoundTripTime = new BufferedEmitter < HighRoundTripTimeEvent > ( ) ;
924
- readonly onHighRoundTripTime = this . _onHighRoundTripTime . event ;
925
-
926
- private readonly _onDidChangeConnectionHealth = new BufferedEmitter < ConnectionHealth > ( ) ;
927
- readonly onDidChangeConnectionHealth = this . _onDidChangeConnectionHealth . event ;
928
-
929
850
public get unacknowledgedCount ( ) : number {
930
851
return this . _outgoingMsgId - this . _outgoingAckId ;
931
852
}
932
853
933
854
constructor ( opts : PersistentProtocolOptions ) {
934
855
this . _loadEstimator = opts . loadEstimator ?? LoadEstimator . getInstance ( ) ;
935
- this . _measureRoundTripTime = opts . measureRoundTripTime ?? false ;
936
856
this . _shouldSendKeepAlive = opts . sendKeepAlive ?? true ;
937
857
this . _isReconnecting = false ;
938
858
this . _outgoingUnackMsg = new Queue < ProtocolMessage > ( ) ;
@@ -954,13 +874,6 @@ export class PersistentProtocol implements IMessagePassingProtocol {
954
874
this . _socketReader = this . _socketDisposables . add ( new ProtocolReader ( this . _socket ) ) ;
955
875
this . _socketDisposables . add ( this . _socketReader . onMessage ( msg => this . _receiveMessage ( msg ) ) ) ;
956
876
this . _socketDisposables . add ( this . _socket . onClose ( e => this . _onSocketClose . fire ( e ) ) ) ;
957
- this . _socketLatencyMonitor = this . _socketDisposables . add ( new LatencyMonitor ( ) ) ; // is started immediately
958
- this . _socketDisposables . add ( this . _socketLatencyMonitor . onSendLatencyRequest ( buffer => this . _sendLatencyMeasurementRequest ( buffer ) ) ) ;
959
- this . _socketDisposables . add ( this . _socketLatencyMonitor . onHighRoundTripTime ( e => this . _onHighRoundTripTime . fire ( e ) ) ) ;
960
- this . _socketDisposables . add ( this . _socketLatencyMonitor . onDidChangeConnectionHealth ( e => this . _onDidChangeConnectionHealth . fire ( e ) ) ) ;
961
- if ( this . _measureRoundTripTime ) {
962
- this . _socketLatencyMonitor . start ( ) ;
963
- }
964
877
965
878
if ( opts . initialChunk ) {
966
879
this . _socketReader . acceptChunk ( opts . initialChunk ) ;
@@ -1041,19 +954,12 @@ export class PersistentProtocol implements IMessagePassingProtocol {
1041
954
this . _socketReader = this . _socketDisposables . add ( new ProtocolReader ( this . _socket ) ) ;
1042
955
this . _socketDisposables . add ( this . _socketReader . onMessage ( msg => this . _receiveMessage ( msg ) ) ) ;
1043
956
this . _socketDisposables . add ( this . _socket . onClose ( e => this . _onSocketClose . fire ( e ) ) ) ;
1044
- this . _socketLatencyMonitor = this . _socketDisposables . add ( new LatencyMonitor ( ) ) ; // will be started later
1045
- this . _socketDisposables . add ( this . _socketLatencyMonitor . onSendLatencyRequest ( buffer => this . _sendLatencyMeasurementRequest ( buffer ) ) ) ;
1046
- this . _socketDisposables . add ( this . _socketLatencyMonitor . onHighRoundTripTime ( e => this . _onHighRoundTripTime . fire ( e ) ) ) ;
1047
- this . _socketDisposables . add ( this . _socketLatencyMonitor . onDidChangeConnectionHealth ( e => this . _onDidChangeConnectionHealth . fire ( e ) ) ) ;
1048
957
1049
958
this . _socketReader . acceptChunk ( initialDataChunk ) ;
1050
959
}
1051
960
1052
961
public endAcceptReconnection ( ) : void {
1053
962
this . _isReconnecting = false ;
1054
- if ( this . _measureRoundTripTime ) {
1055
- this . _socketLatencyMonitor . start ( ) ;
1056
- }
1057
963
1058
964
// After a reconnection, let the other party know (again) which messages have been received.
1059
965
// (perhaps the other party didn't receive a previous ACK)
@@ -1144,15 +1050,6 @@ export class PersistentProtocol implements IMessagePassingProtocol {
1144
1050
// nothing to do
1145
1051
break ;
1146
1052
}
1147
- case ProtocolMessageType . LatencyMeasurementRequest : {
1148
- // we just send the data back
1149
- this . _sendLatencyMeasurementResponse ( msg . data ) ;
1150
- break ;
1151
- }
1152
- case ProtocolMessageType . LatencyMeasurementResponse : {
1153
- this . _socketLatencyMonitor . handleResponse ( msg . data ) ;
1154
- break ;
1155
- }
1156
1053
}
1157
1054
}
1158
1055
@@ -1282,92 +1179,6 @@ export class PersistentProtocol implements IMessagePassingProtocol {
1282
1179
const msg = new ProtocolMessage ( ProtocolMessageType . KeepAlive , 0 , this . _incomingAckId , getEmptyBuffer ( ) ) ;
1283
1180
this . _socketWriter . write ( msg ) ;
1284
1181
}
1285
-
1286
- private _sendLatencyMeasurementRequest ( buffer : VSBuffer ) : void {
1287
- this . _incomingAckId = this . _incomingMsgId ;
1288
- const msg = new ProtocolMessage ( ProtocolMessageType . LatencyMeasurementRequest , 0 , this . _incomingAckId , buffer ) ;
1289
- this . _socketWriter . write ( msg ) ;
1290
- }
1291
-
1292
- private _sendLatencyMeasurementResponse ( buffer : VSBuffer ) : void {
1293
- this . _incomingAckId = this . _incomingMsgId ;
1294
- const msg = new ProtocolMessage ( ProtocolMessageType . LatencyMeasurementResponse , 0 , this . _incomingAckId , buffer ) ;
1295
- this . _socketWriter . write ( msg ) ;
1296
- }
1297
- }
1298
-
1299
- class LatencyMonitor extends Disposable {
1300
-
1301
- private readonly _onSendLatencyRequest = this . _register ( new Emitter < VSBuffer > ( ) ) ;
1302
- readonly onSendLatencyRequest : Event < VSBuffer > = this . _onSendLatencyRequest . event ;
1303
-
1304
- private readonly _onHighRoundTripTime = this . _register ( new Emitter < HighRoundTripTimeEvent > ( ) ) ;
1305
- public readonly onHighRoundTripTime = this . _onHighRoundTripTime . event ;
1306
-
1307
- private readonly _onDidChangeConnectionHealth = this . _register ( new Emitter < ConnectionHealth > ( ) ) ;
1308
- public readonly onDidChangeConnectionHealth = this . _onDidChangeConnectionHealth . event ;
1309
-
1310
- private readonly _measureLatencyTimer = this . _register ( new IntervalTimer ( ) ) ;
1311
-
1312
- /**
1313
- * Timestamp of our last latency request message sent to the other host.
1314
- */
1315
- private _lastLatencyMeasurementSent : number = - 1 ;
1316
-
1317
- /**
1318
- * ID separate from the regular message IDs. Used to match up latency
1319
- * requests with responses so we know we're timing the right message
1320
- * even if a reconnection occurs.
1321
- */
1322
- private _lastLatencyMeasurementId : number = 0 ;
1323
-
1324
- /**
1325
- * Circular buffer of latency measurements
1326
- */
1327
- private _latencySamples : number [ ] = Array . from ( { length : ProtocolConstants . LatencySampleCount } , ( _ ) => 0 ) ;
1328
- private _latencySampleIndex : number = 0 ;
1329
- private _connectionHealth = ConnectionHealth . Good ;
1330
-
1331
- constructor ( ) {
1332
- super ( ) ;
1333
- }
1334
-
1335
- public start ( ) : void {
1336
- this . _measureLatencyTimer . cancelAndSet ( ( ) => {
1337
- this . _lastLatencyMeasurementSent = Date . now ( ) ;
1338
- const measurementId = ++ this . _lastLatencyMeasurementId ;
1339
- const buffer = VSBuffer . alloc ( 4 ) ;
1340
- buffer . writeUInt32BE ( measurementId , 0 ) ;
1341
- this . _onSendLatencyRequest . fire ( buffer ) ;
1342
- } , ProtocolConstants . LatencySampleTime ) ;
1343
- }
1344
-
1345
- public handleResponse ( buffer : VSBuffer ) : void {
1346
- if ( buffer . byteLength !== 4 ) {
1347
- // invalid measurementId
1348
- return ;
1349
- }
1350
- const measurementId = buffer . readUInt32BE ( 0 ) ;
1351
- if ( this . _lastLatencyMeasurementSent <= 0 || measurementId !== this . _lastLatencyMeasurementId ) {
1352
- // invalid measurementId
1353
- return ;
1354
- }
1355
-
1356
- const roundtripTime = Date . now ( ) - this . _lastLatencyMeasurementSent ;
1357
- const sampleIndex = this . _latencySampleIndex ++ ;
1358
- this . _latencySamples [ sampleIndex % this . _latencySamples . length ] = roundtripTime ;
1359
-
1360
- const previousConnectionHealth = this . _connectionHealth ;
1361
- const highLatencySampleCount = this . _latencySamples . filter ( s => s >= ProtocolConstants . HighLatencyTimeThreshold ) . length ;
1362
- this . _connectionHealth = ( highLatencySampleCount >= ProtocolConstants . HighLatencySampleThreshold ? ConnectionHealth . Poor : ConnectionHealth . Good ) ;
1363
-
1364
- if ( roundtripTime > ProtocolConstants . HighLatencyTimeThreshold ) {
1365
- this . _onHighRoundTripTime . fire ( new HighRoundTripTimeEvent ( roundtripTime , highLatencySampleCount ) ) ;
1366
- }
1367
- if ( previousConnectionHealth !== this . _connectionHealth ) {
1368
- this . _onDidChangeConnectionHealth . fire ( this . _connectionHealth ) ;
1369
- }
1370
- }
1371
1182
}
1372
1183
1373
1184
// (() => {
0 commit comments