File tree Expand file tree Collapse file tree 1 file changed +6
-1
lines changed
Expand file tree Collapse file tree 1 file changed +6
-1
lines changed Original file line number Diff line number Diff line change @@ -101,7 +101,6 @@ async def consume_all(self) -> AsyncGenerator[Event]:
101101 logger .debug (
102102 f'Dequeued event of type: { type (event )} in consume_all.'
103103 )
104- yield event
105104 self .queue .task_done ()
106105 logger .debug (
107106 'Marked task as done in event queue in consume_all'
@@ -123,10 +122,16 @@ async def consume_all(self) -> AsyncGenerator[Event]:
123122 )
124123 )
125124
125+ # Make sure the yield is after the close events, otherwise
126+ # the caller may end up in a blocked state where this
127+ # generator isn't called again to close things out and the
128+ # other part is waiting for an event or a closed queue.
126129 if is_final_event :
127130 logger .debug ('Stopping event consumption in consume_all.' )
128131 await self .queue .close ()
132+ yield event
129133 break
134+ yield event
130135 except TimeoutError :
131136 # continue polling until there is a final event
132137 continue
You can’t perform that action at this time.
0 commit comments