@@ -127,7 +127,7 @@ async def _loop(self) -> None:
127127
128128 t = asyncio .create_task (self .callback (queue_items ))
129129 tasks .add (t )
130- t .add_done_callback (tasks .remove )
130+ t .add_done_callback (tasks .discard )
131131
132132 for _ in range (num_items ):
133133 self .queue .task_done ()
@@ -137,7 +137,10 @@ async def _loop(self) -> None:
137137 await asyncio .wait_for (f , timeout = self .max_wait_finalize )
138138
139139 async def _finalize (self ) -> None :
140- # WARNING: Do not allow an async context switch before the gather below
140+ # WARNING: Do not allow an async context switch before the queue is drained
141+ # This can be changed to utilize queue.Shutdown in 3.13+
142+ # or when more of asyncio queues have been replaced here
143+ # as part of freethreading efforts.
141144
142145 self ._alive = False
143146 remaining_items : Sequence [T ] = []
@@ -152,27 +155,28 @@ async def _finalize(self) -> None:
152155
153156 remaining_items .append (ev )
154157
158+ # Context switches are safe again.
159+
155160 if not remaining_items :
156161 return
157162
158163 num_remaining = len (remaining_items )
159164
160- pending_futures : list [asyncio .Task [object ]] = []
165+ remaining_tasks : set [asyncio .Task [object ]] = set ()
161166
162167 for chunk in (
163168 remaining_items [p : p + self .max_quantity ]
164169 for p in range (0 , num_remaining , self .max_quantity )
165170 ):
166171 fut = asyncio .create_task (self .callback (chunk ))
167- pending_futures .append (fut )
172+ fut .add_done_callback (remaining_tasks .discard )
173+ remaining_tasks .add (fut )
168174
169- gathered = asyncio .gather (* pending_futures )
175+ timeout = self .max_wait_finalize
176+ _done , pending = await asyncio .wait (remaining_tasks , timeout = timeout )
170177
171- try :
172- await asyncio .wait_for (gathered , timeout = self .max_wait_finalize )
173- except TimeoutError :
174- for task in pending_futures :
175- task .cancel ()
178+ for task in pending :
179+ task .cancel ()
176180
177181 for _ in range (num_remaining ):
178182 self .queue .task_done ()
0 commit comments