@@ -28,7 +28,6 @@ def __init__(self, queue_size: int = 10, parent: MessagePump | None = None) -> N
2828 self ._pending_message : Message | None = None
2929 self ._task : Task | None = None
3030 self ._child_tasks : set [Task ] = set ()
31- self ._queue_empty_event = Event ()
3231
3332 @property
3433 def task (self ) -> Task :
@@ -111,34 +110,16 @@ def set_interval(
111110 asyncio .get_event_loop ().create_task (timer .run ())
112111 return timer
113112
114- async def stop_messages (self ) -> None :
115- if not self ._closing :
116- await self .post_message (events .NoneEvent (self ))
117- self ._closing = True
118- return
119- if not (self ._closing or self ._closed ):
120- self ._queue_empty_event .clear ()
121- await self .post_message (events .NoneEvent (self ))
122- self ._closing = True
123- await self ._queue_empty_event .wait ()
124- self ._queue_empty_event .clear ()
125-
126113 async def close_messages (self , wait : bool = True ) -> None :
127114 """Close message queue, and optionally wait for queue to finish processing."""
128115 if self ._closed :
129116 return
130- log . debug ( "close_messages %r wait=%r" , self , wait )
117+
131118 self ._closing = True
132- log .debug ("close 1 %r" , self )
119+ await self ._message_queue .put (None )
120+
133121 for task in self ._child_tasks :
134122 task .cancel ()
135- log .debug ("close 2 %r" , self )
136- await self ._message_queue .put (None )
137- log .debug ("close 3 %r" , self )
138- if wait and self ._task is not None :
139- await self ._task
140- self ._task = None
141- log .debug ("close 4 %r" , self )
142123
143124 def start_messages (self ) -> None :
144125 self ._task = asyncio .create_task (self .process_messages ())
@@ -149,15 +130,14 @@ async def process_messages(self) -> None:
149130 try :
150131 message = await self .get_message ()
151132 except MessagePumpClosed :
152- log .debug ("CLOSED %r" , self )
153133 break
154134 except Exception as error :
155135 log .exception ("error in get_message()" )
156136 raise error from None
157137
158138 log .debug ("%r -> %r" , message , self )
159139 # Combine any pending messages that may supersede this one
160- while True :
140+ while not ( self . _closed or self . _closing ) :
161141 pending = self .peek_message ()
162142 if pending is None or not message .can_batch (pending ):
163143 break
@@ -172,21 +152,14 @@ async def process_messages(self) -> None:
172152 log .exception ("error in dispatch_message" )
173153 raise
174154 finally :
175- log .debug ("a" )
176155 if self ._message_queue .empty ():
177- log .debug ("b" )
178- self ._queue_empty_event .set ()
179156 if not self ._closed :
180157 idle_handler = getattr (self , "on_idle" , None )
181- log .debug ("c %r" , idle_handler )
182158 if idle_handler is not None and not self ._closed :
183- log .debug ("d" )
184159 await idle_handler (events .Idle (self ))
185- log .debug ("e" )
186- self ._queue_empty_event .set ()
160+ log .debug ("CLOSED %r" , self )
187161
188162 async def dispatch_message (self , message : Message ) -> bool | None :
189- log .debug ("dispatch_message %r" , message )
190163 if isinstance (message , events .Event ):
191164 await self .on_event (message )
192165 else :
@@ -196,7 +169,6 @@ async def dispatch_message(self, message: Message) -> bool | None:
196169 async def on_event (self , event : events .Event ) -> None :
197170 method_name = f"on_{ event .name } "
198171 dispatch_function : MessageHandler = getattr (self , method_name , None )
199- log .debug ("dispatching to %r" , dispatch_function )
200172 if dispatch_function is not None :
201173 await dispatch_function (event )
202174 if event .bubble and self ._parent and not event ._stop_propagaton :
@@ -217,15 +189,11 @@ def post_message_no_wait(self, message: Message) -> bool:
217189 return True
218190
219191 async def post_message (self , message : Message ) -> bool :
220- log .debug ("%r post_message 1" , self )
221192 if self ._closing or self ._closed :
222193 return False
223- log .debug ("%r post_message 2" , self )
224194 if not self .check_message_enabled (message ):
225195 return True
226- log .debug ("%r post_message 3" , self )
227196 await self ._message_queue .put (message )
228- log .debug ("%r post_message 4" , self )
229197 return True
230198
231199 async def post_message_from_child (self , message : Message ) -> bool :
0 commit comments