@@ -90,21 +90,25 @@ class Ether {
90
90
return < any > this . _b ;
91
91
}
92
92
93
- constructor ( ) {
93
+ constructor (
94
+ private readonly _wireLatency = 0
95
+ ) {
94
96
this . _a = new EtherStream ( this , 'a' ) ;
95
97
this . _b = new EtherStream ( this , 'b' ) ;
96
98
this . _ab = [ ] ;
97
99
this . _ba = [ ] ;
98
100
}
99
101
100
102
public write ( from : 'a' | 'b' , data : Buffer ) : void {
101
- if ( from === 'a' ) {
102
- this . _ab . push ( data ) ;
103
- } else {
104
- this . _ba . push ( data ) ;
105
- }
103
+ setTimeout ( ( ) => {
104
+ if ( from === 'a' ) {
105
+ this . _ab . push ( data ) ;
106
+ } else {
107
+ this . _ba . push ( data ) ;
108
+ }
106
109
107
- setTimeout ( ( ) => this . _deliver ( ) , 0 ) ;
110
+ setTimeout ( ( ) => this . _deliver ( ) , 0 ) ;
111
+ } , this . _wireLatency ) ;
108
112
}
109
113
110
114
private _deliver ( ) : void {
@@ -365,6 +369,68 @@ suite('PersistentProtocol reconnection', () => {
365
369
) ;
366
370
} ) ;
367
371
372
+ test ( 'acks are always sent after a reconnection' , async ( ) => {
373
+ await runWithFakedTimers (
374
+ {
375
+ useFakeTimers : true ,
376
+ useSetImmediate : true ,
377
+ maxTaskCount : 1000
378
+ } ,
379
+ async ( ) => {
380
+
381
+ const loadEstimator : ILoadEstimator = {
382
+ hasHighLoad : ( ) => false
383
+ } ;
384
+ const wireLatency = 1000 ;
385
+ const ether = new Ether ( wireLatency ) ;
386
+ const aSocket = new NodeSocket ( ether . a ) ;
387
+ const a = new PersistentProtocol ( aSocket , null , loadEstimator ) ;
388
+ const aMessages = new MessageStream ( a ) ;
389
+ const bSocket = new NodeSocket ( ether . b ) ;
390
+ const b = new PersistentProtocol ( bSocket , null , loadEstimator ) ;
391
+ const bMessages = new MessageStream ( b ) ;
392
+
393
+ // send message a1 to have something unacknowledged
394
+ a . send ( VSBuffer . fromString ( 'a1' ) ) ;
395
+ assert . strictEqual ( a . unacknowledgedCount , 1 ) ;
396
+ assert . strictEqual ( b . unacknowledgedCount , 0 ) ;
397
+
398
+ // read message a1 at B
399
+ const a1 = await bMessages . waitForOne ( ) ;
400
+ assert . strictEqual ( a1 . toString ( ) , 'a1' ) ;
401
+ assert . strictEqual ( a . unacknowledgedCount , 1 ) ;
402
+ assert . strictEqual ( b . unacknowledgedCount , 0 ) ;
403
+
404
+ // wait for B to send an ACK message,
405
+ // but resume before A receives it
406
+ await timeout ( ProtocolConstants . AcknowledgeTime + wireLatency / 2 ) ;
407
+ assert . strictEqual ( a . unacknowledgedCount , 1 ) ;
408
+ assert . strictEqual ( b . unacknowledgedCount , 0 ) ;
409
+
410
+ // simulate complete reconnection
411
+ aSocket . dispose ( ) ;
412
+ bSocket . dispose ( ) ;
413
+ const ether2 = new Ether ( wireLatency ) ;
414
+ const aSocket2 = new NodeSocket ( ether2 . a ) ;
415
+ const bSocket2 = new NodeSocket ( ether2 . b ) ;
416
+ b . beginAcceptReconnection ( bSocket2 , null ) ;
417
+ b . endAcceptReconnection ( ) ;
418
+ a . beginAcceptReconnection ( aSocket2 , null ) ;
419
+ a . endAcceptReconnection ( ) ;
420
+
421
+ // wait for quite some time
422
+ await timeout ( 2 * ProtocolConstants . AcknowledgeTime + wireLatency ) ;
423
+ assert . strictEqual ( a . unacknowledgedCount , 0 ) ;
424
+ assert . strictEqual ( b . unacknowledgedCount , 0 ) ;
425
+
426
+ aMessages . dispose ( ) ;
427
+ bMessages . dispose ( ) ;
428
+ a . dispose ( ) ;
429
+ b . dispose ( ) ;
430
+ }
431
+ ) ;
432
+ } ) ;
433
+
368
434
test ( 'writing can be paused' , async ( ) => {
369
435
await runWithFakedTimers ( { useFakeTimers : true , maxTaskCount : 100 } , async ( ) => {
370
436
const loadEstimator : ILoadEstimator = {
0 commit comments