@@ -49,8 +49,9 @@ pub(crate) struct SortPreservingMergeStream<C: CursorValues> {
4949 /// used to record execution metrics
5050 metrics : BaselineMetrics ,
5151
52- /// If the stream has encountered an error
53- aborted : bool ,
52+ /// If the stream has encountered an error or reaches the
53+ /// `fetch` limit.
54+ done : bool ,
5455
5556 /// A loser tree that always produces the minimum cursor
5657 ///
@@ -162,7 +163,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
162163 in_progress : BatchBuilder :: new ( schema, stream_count, batch_size, reservation) ,
163164 streams,
164165 metrics,
165- aborted : false ,
166+ done : false ,
166167 cursors : ( 0 ..stream_count) . map ( |_| None ) . collect ( ) ,
167168 prev_cursors : ( 0 ..stream_count) . map ( |_| None ) . collect ( ) ,
168169 round_robin_tie_breaker_mode : false ,
@@ -206,7 +207,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
206207 & mut self ,
207208 cx : & mut Context < ' _ > ,
208209 ) -> Poll < Option < Result < RecordBatch > > > {
209- if self . aborted {
210+ if self . done {
210211 return Poll :: Ready ( None ) ;
211212 }
212213 // Once all partitions have set their corresponding cursors for the loser tree,
@@ -220,7 +221,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
220221 let partition_idx = self . uninitiated_partitions [ idx] ;
221222 match self . maybe_poll_stream ( cx, partition_idx) {
222223 Poll :: Ready ( Err ( e) ) => {
223- self . aborted = true ;
224+ self . done = true ;
224225 return Poll :: Ready ( Some ( Err ( e) ) ) ;
225226 }
226227 Poll :: Pending => {
@@ -268,7 +269,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
268269 if !self . loser_tree_adjusted {
269270 let winner = self . loser_tree [ 0 ] ;
270271 if let Err ( e) = ready ! ( self . maybe_poll_stream( cx, winner) ) {
271- self . aborted = true ;
272+ self . done = true ;
272273 return Poll :: Ready ( Some ( Err ( e) ) ) ;
273274 }
274275 self . update_loser_tree ( ) ;
@@ -281,7 +282,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
281282
282283 // stop sorting if fetch has been reached
283284 if self . fetch_reached ( ) {
284- self . aborted = true ;
285+ self . done = true ;
285286 } else if self . in_progress . len ( ) < self . batch_size {
286287 continue ;
287288 }
0 commit comments