11import { expect } from 'aegir/chai'
22import all from 'it-all'
33import { byteStream } from 'it-byte-stream'
4- import drain from 'it-drain'
54import map from 'it-map'
65import { duplexPair } from 'it-pair/duplex'
76import { pipe } from 'it-pipe'
@@ -12,22 +11,19 @@ import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
1211import { isValidTick } from '../is-valid-tick.js'
1312import type { TestSetup } from '../index.js'
1413import type { Stream , StreamMuxerFactory } from '@libp2p/interface'
15- import type { Source , Duplex } from 'it-stream-types'
14+ import type { Source } from 'it-stream-types'
1615import type { DeferredPromise } from 'p-defer'
1716
18- async function drainAndClose ( stream : Duplex < any > ) : Promise < void > {
19- await pipe ( [ ] , stream , drain )
20- }
21-
2217export default ( common : TestSetup < StreamMuxerFactory > ) : void => {
2318 describe ( 'base' , ( ) => {
24- it ( 'Open a stream from the dialer' , async ( ) => {
19+ it ( 'should open a stream from the dialer' , async ( ) => {
2520 const p = duplexPair < Uint8Array | Uint8ArrayList > ( )
26- const dialerFactory = await common . setup ( )
27- const dialer = dialerFactory . createStreamMuxer ( { direction : 'outbound' } )
2821 const onStreamPromise : DeferredPromise < Stream > = defer ( )
2922 const onStreamEndPromise : DeferredPromise < Stream > = defer ( )
3023
24+ const dialerFactory = await common . setup ( )
25+ const dialer = dialerFactory . createStreamMuxer ( { direction : 'outbound' } )
26+
3127 const listenerFactory = await common . setup ( )
3228 const listener = listenerFactory . createStreamMuxer ( {
3329 direction : 'inbound' ,
@@ -42,18 +38,20 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
4238 void pipe ( p [ 0 ] , dialer , p [ 0 ] )
4339 void pipe ( p [ 1 ] , listener , p [ 1 ] )
4440
45- const conn = await dialer . newStream ( )
46- expect ( dialer . streams ) . to . include ( conn )
47- expect ( isValidTick ( conn . timeline . open ) ) . to . equal ( true )
41+ const dialerStream = await dialer . newStream ( )
42+ expect ( dialer . streams ) . to . include ( dialerStream )
43+ expect ( isValidTick ( dialerStream . timeline . open ) ) . to . equal ( true )
4844
49- void drainAndClose ( conn )
45+ const dialerBytes = byteStream ( dialerStream )
46+ void dialerBytes . write ( uint8ArrayFromString ( 'hello' ) )
5047
51- const stream = await onStreamPromise . promise
52- expect ( isValidTick ( stream . timeline . open ) ) . to . equal ( true )
48+ const listenerStream = await onStreamPromise . promise
49+ expect ( isValidTick ( listenerStream . timeline . open ) ) . to . equal ( true )
5350 // Make sure the stream is being tracked
54- expect ( listener . streams ) . to . include ( stream )
51+ expect ( listener . streams ) . to . include ( listenerStream )
5552
56- void drainAndClose ( stream )
53+ await dialerStream . close ( )
54+ await listenerStream . close ( )
5755
5856 // Make sure stream is closed properly
5957 const endedStream = await onStreamEndPromise . promise
@@ -66,22 +64,26 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
6664 // Make sure the stream is removed from tracking
6765 expect ( isValidTick ( endedStream . timeline . close ) ) . to . equal ( true )
6866
69- await drainAndClose ( dialer )
70- await drainAndClose ( listener )
67+ await dialer . close ( )
68+ await listener . close ( )
7169
7270 // ensure we have no streams left
7371 expect ( dialer . streams ) . to . have . length ( 0 )
7472 expect ( listener . streams ) . to . have . length ( 0 )
7573 } )
7674
77- it ( 'Open a stream from the listener' , async ( ) => {
75+ it ( 'should open a stream from the listener' , async ( ) => {
7876 const p = duplexPair < Uint8Array | Uint8ArrayList > ( )
7977 const onStreamPromise : DeferredPromise < Stream > = defer ( )
78+ const onStreamEndPromise : DeferredPromise < Stream > = defer ( )
8079 const dialerFactory = await common . setup ( )
8180 const dialer = dialerFactory . createStreamMuxer ( {
8281 direction : 'outbound' ,
8382 onIncomingStream : ( stream : Stream ) => {
8483 onStreamPromise . resolve ( stream )
84+ } ,
85+ onStreamEnd : ( stream ) => {
86+ onStreamEndPromise . resolve ( stream )
8587 }
8688 } )
8789
@@ -91,21 +93,35 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
9193 void pipe ( p [ 0 ] , dialer , p [ 0 ] )
9294 void pipe ( p [ 1 ] , listener , p [ 1 ] )
9395
94- const conn = await listener . newStream ( )
96+ const listenerStream = await listener . newStream ( )
97+ const listenerBytes = byteStream ( listenerStream )
98+ void listenerBytes . write ( uint8ArrayFromString ( 'hello' ) )
9599
96- void drainAndClose ( conn )
100+ const dialerStream = await onStreamPromise . promise
97101
98- const stream = await onStreamPromise . promise
99- expect ( isValidTick ( stream . timeline . open ) ) . to . equal ( true )
100- expect ( listener . streams ) . to . include ( conn )
101- expect ( isValidTick ( conn . timeline . open ) ) . to . equal ( true )
102- void drainAndClose ( stream )
102+ expect ( isValidTick ( dialerStream . timeline . open ) ) . to . equal ( true )
103+ expect ( listener . streams ) . to . include ( listenerStream )
104+ expect ( isValidTick ( listenerStream . timeline . open ) ) . to . equal ( true )
105+
106+ await dialerStream . close ( )
107+ await listenerStream . close ( )
108+
109+ // Make sure stream is closed properly
110+ const endedStream = await onStreamEndPromise . promise
111+ expect ( dialer . streams ) . to . not . include ( endedStream )
112+
113+ if ( endedStream . timeline . close == null ) {
114+ throw new Error ( 'timeline had no close time' )
115+ }
116+
117+ // Make sure the stream is removed from tracking
118+ expect ( isValidTick ( endedStream . timeline . close ) ) . to . equal ( true )
103119
104- await drainAndClose ( dialer )
105- await drainAndClose ( listener )
120+ await dialer . close ( )
121+ await listener . close ( )
106122 } )
107123
108- it ( 'Open a stream on both sides' , async ( ) => {
124+ it ( 'should open a stream on both sides' , async ( ) => {
109125 const p = duplexPair < Uint8Array | Uint8ArrayList > ( )
110126 const onDialerStreamPromise : DeferredPromise < Stream > = defer ( )
111127 const onListenerStreamPromise : DeferredPromise < Stream > = defer ( )
@@ -132,19 +148,19 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
132148 const listenerInitiatorStream = await listener . newStream ( )
133149
134150 await Promise . all ( [
135- drainAndClose ( dialerInitiatorStream ) ,
136- drainAndClose ( listenerInitiatorStream ) ,
137- onDialerStreamPromise . promise . then ( async stream => { await drainAndClose ( stream ) } ) ,
138- onListenerStreamPromise . promise . then ( async stream => { await drainAndClose ( stream ) } )
151+ dialerInitiatorStream . close ( ) ,
152+ listenerInitiatorStream . close ( ) ,
153+ onDialerStreamPromise . promise . then ( async stream => { await stream . close ( ) } ) ,
154+ onListenerStreamPromise . promise . then ( async stream => { await stream . close ( ) } )
139155 ] )
140156
141157 await Promise . all ( [
142- drainAndClose ( dialer ) ,
143- drainAndClose ( listener )
158+ dialer . close ( ) ,
159+ listener . close ( )
144160 ] )
145161 } )
146162
147- it ( 'Open a stream on one side, write, open a stream on the other side' , async ( ) => {
163+ it ( 'should open a stream on one side, write, open a stream on the other side' , async ( ) => {
148164 const toString = ( source : Source < Uint8ArrayList > ) : AsyncGenerator < string > => map ( source , ( u ) => uint8ArrayToString ( u . subarray ( ) ) )
149165 const p = duplexPair < Uint8Array | Uint8ArrayList > ( )
150166 const onDialerStreamPromise : DeferredPromise < Stream > = defer ( )
0 commit comments