11using System . Collections . Concurrent ;
2- using System . Diagnostics ;
3- using System . Diagnostics . Metrics ;
42using System . Net . Sockets ;
53using System . Runtime . CompilerServices ;
64
@@ -9,79 +7,96 @@ namespace DotNext.Net.Multiplexing;
97using Buffers ;
108using Threading ;
119
12- internal sealed class InputMultiplexer (
13- ConcurrentDictionary < uint , MultiplexedStream > streams ,
14- AsyncAutoResetEvent writeSignal ,
15- BufferWriter < byte > framingBuffer ,
16- int flushThreshold ,
17- UpDownCounter < int > streamCounter ,
18- in TagList measurementTags ,
19- TimeSpan timeout ,
20- TimeSpan heartbeatTimeout ,
21- CancellationToken token ) : Multiplexer ( streams , new ConcurrentQueue < ProtocolCommand > ( ) , streamCounter , measurementTags , token )
10+ internal sealed class InputMultiplexer < T > ( ) : Multiplexer < T > ( new ( ) , new ConcurrentQueue < ProtocolCommand > ( ) )
11+ where T : IStreamMetrics
2212{
23-
24- public TimeSpan Timeout => timeout ;
13+ public required TimeSpan Timeout { get ; init ; }
14+
15+ public required TimeSpan HeartbeatTimeout { private get ; init ; }
16+
17+ public required int FlushThreshold { private get ; init ; }
18+
19+ public required BufferWriter < byte > FramingBuffer { private get ; init ; }
20+
21+ public required AsyncAutoResetEvent TransportSignal { private get ; init ; }
2522
2623 public bool TryAddStream ( uint streamId , MultiplexedStream stream )
2724 {
28- var result = streams . TryAdd ( streamId , stream ) ;
25+ var result = Streams . TryAdd ( streamId , stream ) ;
2926 ChangeStreamCount ( Unsafe . BitCast < bool , byte > ( result ) ) ;
3027 return result ;
3128 }
32-
33- public OutputMultiplexer CreateOutput ( Memory < byte > framingBuffer , TimeSpan receiveTimeout )
34- => new ( streams , writeSignal , commands , framingBuffer , streamCounter , measurementTags , receiveTimeout , RootToken ) ;
3529
36- public OutputMultiplexer CreateOutput ( Memory < byte > framingBuffer , TimeSpan receiveTimeout , Func < AsyncAutoResetEvent , MultiplexedStream ? > handlerFactory ,
37- CancellationToken token )
38- => new ( streams , writeSignal , commands , framingBuffer , streamCounter , measurementTags , receiveTimeout , token )
39- { HandlerFactory = handlerFactory } ;
30+ public bool TryRemoveStream ( uint streamId , MultiplexedStream stream )
31+ {
32+ var removed = Streams . TryRemove ( new ( streamId , stream ) ) ;
33+ ChangeStreamCount ( - Unsafe . BitCast < bool , byte > ( removed ) ) ;
34+ return removed ;
35+ }
36+
37+ public OutputMultiplexer < T > CreateOutput ( Memory < byte > framingBuffer , TimeSpan receiveTimeout ) => new ( Streams , Commands )
38+ {
39+ MeasurementTags = MeasurementTags ,
40+ RootToken = RootToken ,
41+ FramingBuffer = framingBuffer ,
42+ Timeout = receiveTimeout ,
43+ TransportSignal = TransportSignal ,
44+ } ;
45+
46+ public OutputMultiplexer < T > CreateOutput ( Memory < byte > framingBuffer , TimeSpan receiveTimeout , MultiplexedStreamFactory handlerFactory ,
47+ CancellationToken token ) => new ( Streams , Commands )
48+ {
49+ MeasurementTags = MeasurementTags ,
50+ RootToken = token ,
51+ FramingBuffer = framingBuffer ,
52+ Timeout = receiveTimeout ,
53+ TransportSignal = TransportSignal ,
54+ Factory = handlerFactory ,
55+ } ;
4056
4157 public async Task ProcessAsync ( Func < bool > condition , Socket socket )
4258 {
43- using var enumerator = streams . GetEnumerator ( ) ;
59+ using var enumerator = Streams . GetEnumerator ( ) ;
4460 for ( var requiresHeartbeat = false ;
4561 condition ( ) ;
46- requiresHeartbeat = ! await writeSignal . WaitAsync ( heartbeatTimeout , RootToken ) . ConfigureAwait ( false ) )
62+ requiresHeartbeat = ! await TransportSignal . WaitAsync ( HeartbeatTimeout , RootToken ) . ConfigureAwait ( false ) )
4763 {
48- framingBuffer . Clear ( reuseBuffer : true ) ;
64+ FramingBuffer . Clear ( reuseBuffer : true ) ;
4965
5066 // combine streams
5167 while ( enumerator . MoveNext ( ) )
5268 {
5369 var ( streamId , stream ) = enumerator . Current ;
5470
55- if ( stream . IsCompleted && streams . TryRemove ( streamId , out _ ) )
71+ if ( stream . IsCompleted && TryRemoveStream ( streamId , stream ) )
5672 {
57- Protocol . WriteStreamClosed ( framingBuffer , streamId ) ;
58- ChangeStreamCount ( - 1 ) ;
73+ Protocol . WriteStreamClosed ( FramingBuffer , streamId ) ;
5974 }
6075 else
6176 {
62- await stream . WriteFrameAsync ( framingBuffer , streamId ) . ConfigureAwait ( false ) ;
77+ await stream . WriteFrameAsync ( FramingBuffer , streamId ) . ConfigureAwait ( false ) ;
6378 }
6479
6580 // write the buffer on overflow
66- if ( framingBuffer . WrittenCount >= flushThreshold )
81+ if ( FramingBuffer . WrittenCount >= FlushThreshold )
6782 {
68- await SendAsync ( framingBuffer . WrittenMemory , socket ) . ConfigureAwait ( false ) ;
69- framingBuffer . Clear ( reuseBuffer : true ) ;
83+ await SendAsync ( FramingBuffer . WrittenMemory , socket ) . ConfigureAwait ( false ) ;
84+ FramingBuffer . Clear ( reuseBuffer : true ) ;
7085 }
7186 }
7287
7388 // process protocol commands
74- commands . Serialize ( framingBuffer ) ;
89+ Commands . Serialize ( FramingBuffer ) ;
7590
76- switch ( framingBuffer . WrittenCount )
91+ switch ( FramingBuffer . WrittenCount )
7792 {
7893 case 0 when requiresHeartbeat :
79- Protocol . WriteHeartbeat ( framingBuffer ) ;
94+ Protocol . WriteHeartbeat ( FramingBuffer ) ;
8095 goto default ;
8196 case 0 :
8297 break ;
8398 default :
84- await SendAsync ( framingBuffer . WrittenMemory , socket ) . ConfigureAwait ( false ) ;
99+ await SendAsync ( FramingBuffer . WrittenMemory , socket ) . ConfigureAwait ( false ) ;
85100 break ;
86101 }
87102
@@ -94,7 +109,7 @@ private async ValueTask SendAsync(ReadOnlyMemory<byte> buffer, Socket socket)
94109 {
95110 for ( int bytesWritten ; ! buffer . IsEmpty ; buffer = buffer . Slice ( bytesWritten ) )
96111 {
97- StartOperation ( timeout ) ;
112+ StartOperation ( Timeout ) ;
98113 try
99114 {
100115 bytesWritten = await socket . SendAsync ( buffer , TimeBoundedToken ) . ConfigureAwait ( false ) ;
@@ -116,9 +131,9 @@ private async ValueTask SendAsync(ReadOnlyMemory<byte> buffer, Socket socket)
116131
117132 public async ValueTask CompleteAllAsync ( Exception e )
118133 {
119- foreach ( var id in streams . Keys )
134+ foreach ( var id in Streams . Keys )
120135 {
121- if ( streams . TryRemove ( id , out var stream ) )
136+ if ( Streams . TryRemove ( id , out var stream ) )
122137 {
123138 await stream . CompleteTransportOutputAsync ( e ) . ConfigureAwait ( false ) ;
124139 await stream . CompleteTransportInputAsync ( e ) . ConfigureAwait ( false ) ;
0 commit comments