File tree Expand file tree Collapse file tree 1 file changed +9
-1
lines changed Expand file tree Collapse file tree 1 file changed +9
-1
lines changed Original file line number Diff line number Diff line change 71
71
// If we have no space, we're going to provide backpressure until we have space
72
72
while this. count . load ( Ordering :: Relaxed ) >= * this. limit {
73
73
match this. group . next ( ) . await {
74
+ // Case 1: there are no more items available in the group. We
75
+ // can no longer iterate over them, and necessarily should be
76
+ // able to insert.
74
77
None => break ,
75
78
Some ( res) => match res. branch ( ) {
76
- ControlFlow :: Continue ( _) => todo ! ( ) ,
79
+ // Case 2: We got more data and no error, try to loop again.
80
+ ControlFlow :: Continue ( _) => continue ,
81
+
82
+ // Case 3: We got an error of some kind, stop iterating
83
+ // entirely so we can short-circuit with an error from the
84
+ // `flush` method.
77
85
ControlFlow :: Break ( residual) => {
78
86
* this. residual = Some ( residual) ;
79
87
return ConsumerState :: Break ;
You can’t perform that action at this time.
0 commit comments