|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
3 | | -from typing import NamedTuple |
| 3 | +from typing import Any, Coroutine, Awaitable, NamedTuple |
4 | 4 | import asyncio |
5 | | -from asyncio import PriorityQueue, QueueEmpty |
| 5 | +from asyncio import Event, PriorityQueue, Task, QueueEmpty |
6 | 6 |
|
7 | 7 | import logging |
8 | 8 |
|
@@ -57,6 +57,12 @@ def __init__(self, queue_size: int = 10, parent: MessagePump | None = None) -> N |
57 | 57 | self._closed: bool = False |
58 | 58 | self._disabled_messages: set[type[Message]] = set() |
59 | 59 | self._pending_message: MessageQueueItem | None = None |
| 60 | + self._task: Task | None = None |
| 61 | + |
| 62 | + @property |
| 63 | + def task(self) -> Task: |
| 64 | + assert self._task is not None |
| 65 | + return self._task |
60 | 66 |
|
61 | 67 | def set_parent(self, parent: MessagePump) -> None: |
62 | 68 | self._parent = parent |
@@ -133,9 +139,16 @@ def set_interval( |
133 | 139 | asyncio.get_event_loop().create_task(timer.run()) |
134 | 140 | return timer |
135 | 141 |
|
136 | | - async def close_messages(self) -> None: |
| 142 | + async def close_messages(self, wait: bool = False) -> None: |
| 143 | + """Close message queue, and optionally wait for queue to finish processing.""" |
137 | 144 | self._closing = True |
138 | 145 | await self._message_queue.put(None) |
| 146 | + if wait and self._task is not None: |
| 147 | + await self._task |
| 148 | + |
| 149 | + def start_messages(self) -> None: |
| 150 | + task = asyncio.create_task(self.process_messages()) |
| 151 | + self._task = task |
139 | 152 |
|
140 | 153 | async def process_messages(self) -> None: |
141 | 154 | """Process messages until the queue is closed.""" |
|
0 commit comments