@@ -11,6 +11,7 @@ internal sealed class OutputMultiplexer<T>(
1111 where T : IStreamMetrics
1212{
1313 private readonly Memory < byte > framingBuffer ;
14+ private readonly MultiplexedStreamFactory ? factory ;
1415
1516 public required AsyncAutoResetEvent TransportSignal { private get ; init ; }
1617
@@ -21,7 +22,10 @@ public required Memory<byte> FramingBuffer
2122
2223 public required TimeSpan Timeout { private get ; init ; }
2324
24- public MultiplexedStreamFactory ? Factory { get ; init ; }
25+ public MultiplexedStreamFactory Factory
26+ {
27+ init => factory = value ;
28+ }
2529
2630 public Task ProcessAsync ( Socket socket )
2731 {
@@ -68,30 +72,31 @@ private async Task ProcessCoreAsync(Socket socket)
6872 await ResetOperationTimeoutAsync ( ) . ConfigureAwait ( false ) ;
6973 }
7074
75+ MultiplexedStream ? stream ;
7176 if ( header . Control is FrameControl . Heartbeat )
77+ {
7278 continue ;
73-
74- if ( ! Streams . TryGetValue ( header . Id , out var stream ) )
79+ }
80+ else if ( Streams . TryGetValue ( header . Id , out stream ) )
7581 {
76- if ( Factory is null || header . CanBeIgnored )
77- {
82+ if ( stream . IsTransportOutputCompleted )
7883 continue ;
79- }
80-
81- if ( ( stream = Factory ( TransportSignal , MeasurementTags ) ) is null )
82- {
83- Commands . TryAdd ( new StreamRejectedCommand ( header . Id ) ) ;
84- TransportSignal . Set ( ) ;
85- continue ;
86- }
87-
88- Streams [ header . Id ] = stream ;
89- ChangeStreamCount ( ) ;
9084 }
91- else if ( stream . IsTransportOutputCompleted )
85+ else if ( factory is null || header . CanBeIgnored )
86+ {
87+ continue ;
88+ }
89+ else if ( ( stream = factory ( TransportSignal , MeasurementTags ) ) is null )
9290 {
91+ Commands . TryAdd ( new StreamRejectedCommand ( header . Id ) ) ;
92+ TransportSignal . Set ( ) ;
9393 continue ;
9494 }
95+ else
96+ {
97+ Streams [ header . Id ] = stream ;
98+ ChangeStreamCount ( ) ;
99+ }
95100
96101 // write the frame to the output header
97102 await stream
0 commit comments