|
33 | 33 | from google.protobuf.empty_pb2 import Empty # pylint: disable=no-name-in-module |
34 | 34 |
|
35 | 35 | from ... import microgrid |
| 36 | +from ..._internal.asyncio import cancel_and_await |
36 | 37 | from ...actor._decorator import actor |
37 | 38 | from ...microgrid import ComponentGraph |
38 | 39 | from ...microgrid.client import MicrogridApiClient |
@@ -186,13 +187,21 @@ def __init__( |
186 | 187 | self._users_channels: Dict[ |
187 | 188 | str, Bidirectional.Handle[Result, Request] |
188 | 189 | ] = users_channels |
189 | | - self._create_users_tasks() |
| 190 | + self._users_tasks = self._create_users_tasks() |
190 | 191 | self._started = asyncio.Event() |
191 | 192 |
|
192 | | - def _create_users_tasks(self) -> None: |
193 | | - """For each user create a task to wait for request.""" |
| 193 | + def _create_users_tasks(self) -> List[asyncio.Task[Empty]]: |
| 194 | + """For each user create a task to wait for request. |
| 195 | +
|
| 196 | + Returns: |
| 197 | + List with users tasks. |
| 198 | + """ |
| 199 | + tasks = [] |
194 | 200 | for user, handler in self._users_channels.items(): |
195 | | - asyncio.create_task(self._wait_for_request(_User(user, handler))) |
| 201 | + tasks.append( |
| 202 | + asyncio.create_task(self._wait_for_request(_User(user, handler))) |
| 203 | + ) |
| 204 | + return tasks |
196 | 205 |
|
197 | 206 | def _get_upper_bound(self, batteries: Set[int]) -> int: |
198 | 207 | """Get total upper bound of power to be set for given batteries. |
@@ -634,3 +643,8 @@ async def _cancel_tasks(self, tasks: Iterable[asyncio.Task[Any]]) -> None: |
634 | 643 | aws.cancel() |
635 | 644 |
|
636 | 645 | await asyncio.gather(*tasks, return_exceptions=True) |
| 646 | + |
| 647 | + async def _stop_actor(self) -> None: |
| 648 | + """Stop all running async tasks.""" |
| 649 | + await asyncio.gather(*[cancel_and_await(t) for t in self._users_tasks]) |
| 650 | + await self._stop() # type: ignore # pylint: disable=no-member |
0 commit comments