@@ -59,7 +59,7 @@ public BatchingChannelReader<T> WithTimeout(long millisecondsTimeout)
5959 {
6060 _timeout = millisecondsTimeout <= 0 ? Timeout . Infinite : millisecondsTimeout ;
6161
62- if ( Buffer is null || Buffer . Reader . Completion . IsCompleted )
62+ if ( Buffer ? . Reader . Completion . IsCompleted != false )
6363 return this ;
6464
6565 if ( _timeout == Timeout . Infinite )
@@ -72,7 +72,7 @@ public BatchingChannelReader<T> WithTimeout(long millisecondsTimeout)
7272 ( ) => new Timer ( ForceBatch ) ) ;
7373
7474 if ( _batch is null ) return this ;
75-
75+
7676 // Might be in the middle of a batch so we need to update the timeout.
7777 lock ( Buffer )
7878 {
@@ -108,7 +108,7 @@ protected override void OnBeforeFinalFlush()
108108 /// <inheritdoc />
109109 protected override bool TryPipeItems ( bool flush )
110110 {
111- if ( Buffer is null || Buffer . Reader . Completion . IsCompleted )
111+ if ( Buffer ? . Reader . Completion . IsCompleted != false )
112112 return false ;
113113
114114 lock ( Buffer )
@@ -119,7 +119,7 @@ protected override bool TryPipeItems(bool flush)
119119 var newBatch = false ;
120120 List < T > ? c = _batch ;
121121 ChannelReader < T > ? source = Source ;
122- if ( source is null || source . Completion . IsCompleted )
122+ if ( source ? . Completion . IsCompleted != false )
123123 {
124124 // All finished, if necessary, release the last batch to the buffer.
125125 if ( c is null ) return false ;
@@ -128,12 +128,15 @@ protected override bool TryPipeItems(bool flush)
128128
129129 while ( source . TryRead ( out T ? item ) )
130130 {
131- if ( c is not null ) c . Add ( item ) ;
132- else
131+ if ( c is null )
133132 {
134133 newBatch = true ; // a new batch could start but not be emmited.
135134 _batch = c = new List < T > ( _batchSize ) { item } ;
136135 }
136+ else
137+ {
138+ c . Add ( item ) ;
139+ }
137140
138141 Debug . Assert ( c . Count <= _batchSize ) ;
139142 var full = c . Count == _batchSize ;
0 commit comments