66
77import asyncio
88import logging
9- from typing import Any
109
1110from ._actor import Actor
12- from ._decorator import BaseActor
1311
1412_logger = logging .getLogger (__name__ )
1513
1614
17- async def run (* actors : Any ) -> None :
15+ async def run (* actors : Actor ) -> None :
1816 """Await the completion of all actors.
1917
2018 Args:
2119 actors: the actors to be awaited.
22-
23- Raises:
24- AssertionError: if any of the actors is not an instance of BaseActor.
2520 """
26- pending_tasks : set [asyncio .Task [Any ]] = set ()
27-
28- # Check that each actor is an instance of BaseActor or Actor at runtime,
29- # due to the indirection created by the actor decorator.
30- for actor in actors :
31- if isinstance (actor , Actor ):
32- await actor .start ()
33- awaitable = actor .wait ()
34- else :
35- assert isinstance (
36- actor , BaseActor
37- ), f"{ actor } is not an instance of BaseActor or Actor"
38- awaitable = actor .join () # type: ignore
39- pending_tasks .add (asyncio .create_task (awaitable , name = str (actor )))
40-
41- # Currently the actor decorator manages the life-cycle of the actor tasks
21+ _logger .info ("Starting %s actor(s)..." , len (actors ))
22+ await _wait_tasks (
23+ set (asyncio .create_task (a .start (), name = str (a )) for a in actors ),
24+ "starting" ,
25+ "started" ,
26+ )
27+
28+ # Wait until all actors are done
29+ await _wait_tasks (
30+ set (asyncio .create_task (a .wait (), name = str (a )) for a in actors ),
31+ "running" ,
32+ "finished" ,
33+ )
34+
35+ _logger .info ("All %s actor(s) finished." , len (actors ))
36+
37+
38+ async def _wait_tasks (
39+ tasks : set [asyncio .Task [None ]], error_str : str , success_str : str
40+ ) -> None :
41+ pending_tasks = tasks
4242 while pending_tasks :
4343 done_tasks , pending_tasks = await asyncio .wait (
4444 pending_tasks , return_when = asyncio .FIRST_COMPLETED
@@ -49,12 +49,19 @@ async def run(*actors: Any) -> None:
4949 # Cancellation needs to be checked first, otherwise the other methods
5050 # could raise a CancelledError
5151 if task .cancelled ():
52- _logger .info ("The actor %s was cancelled" , task .get_name ())
52+ _logger .info (
53+ "Actor %s: Cancelled while %s." ,
54+ task .get_name (),
55+ error_str ,
56+ )
5357 elif exception := task .exception ():
5458 _logger .error (
55- "The actor %s was finished due to an uncaught exception" ,
59+ "Actor %s: Raised an exception while %s. " ,
5660 task .get_name (),
61+ error_str ,
5762 exc_info = exception ,
5863 )
5964 else :
60- _logger .info ("The actor %s finished normally" , task .get_name ())
65+ _logger .info (
66+ "Actor %s: %s normally." , task .get_name (), success_str .capitalize ()
67+ )
0 commit comments