@@ -222,13 +222,11 @@ def __init__(self, to_logger: logging.Logger, *, from_start: bool = True) -> Non
222222 """Initialize `StreamedLog`.
223223
224224 Args:
225- to_logger: The logger to which the logs will be redirected
225+ to_logger: The logger to which the logs will be redirected.
226226 from_start: If `True`, all logs from the start of the actor run will be redirected. If `False`, only newly
227227 arrived logs will be redirected. This can be useful for redirecting only a small portion of relevant
228228 logs for long-running actors in stand-by.
229229
230- Returns:
231- The created logger.
232230 """
233231 self ._to_logger = to_logger
234232 if self ._force_propagate :
@@ -248,7 +246,7 @@ def _log_buffer_content(self, *, include_last_part: bool = False) -> None:
248246 """Merge the whole buffer and split it into parts based on the marker.
249247
250248 Log the messages created from the split parts and remove them from buffer.
251- The last part could be incomplete, and so it can be left unprocessed and in the buffer until later.
249+ The last part could be incomplete, and so it can be left unprocessed in the buffer until later.
252250 """
253251 all_parts = re .split (self ._split_marker , '' .join (self ._stream_buffer ))[1 :] # First split is empty string
254252 if include_last_part :
@@ -292,32 +290,34 @@ def __init__(self, log_client: LogClient, *, to_logger: logging.Logger, from_sta
292290 self ._streaming_thread : Thread | None = None
293291 self ._stop_logging = False
294292
295- def __call__ (self ) -> Thread :
296- """Start the streaming thread. The caller has to handle any cleanup."""
293+ def start (self ) -> Thread :
294+ """Start the streaming thread. The caller has to handle any cleanup by manually calling the `stop` method ."""
297295 if self ._streaming_thread :
298296 raise RuntimeError ('Streaming thread already active' )
299297 self ._stop_logging = False
300298 self ._streaming_thread = threading .Thread (target = self ._stream_log )
301299 self ._streaming_thread .start ()
302300 return self ._streaming_thread
303301
302+ def stop (self ) -> None :
303+ """Signal the streaming thread to stop logging and wait for it to finish."""
304+ if not self ._streaming_thread :
305+ raise RuntimeError ('Streaming thread is not active' )
306+ self ._stop_logging = True
307+ self ._streaming_thread .join ()
308+ self ._streaming_thread = None
309+ self ._stop_logging = False
310+
304311 def __enter__ (self ) -> Self :
305312 """Start the streaming thread within the context. Exiting the context will finish the streaming thread."""
306- self ()
313+ self . start ()
307314 return self
308315
309316 def __exit__ (
310317 self , exc_type : type [BaseException ] | None , exc_val : BaseException | None , exc_tb : TracebackType | None
311318 ) -> None :
312319 """Stop the streaming thread."""
313- if not self ._streaming_thread :
314- raise RuntimeError ('Streaming thread is not active' )
315-
316- # Signal the thread to stop logging and wait for it to finish.
317- self ._stop_logging = True
318- self ._streaming_thread .join ()
319- self ._streaming_thread = None
320- self ._stop_logging = False
320+ self .stop ()
321321
322322 def _stream_log (self ) -> None :
323323 with self ._log_client .stream (raw = True ) as log_stream :
@@ -341,27 +341,31 @@ def __init__(self, log_client: LogClientAsync, *, to_logger: logging.Logger, fro
341341 self ._log_client = log_client
342342 self ._streaming_task : Task | None = None
343343
344- def __call__ (self ) -> Task :
345- """Start the streaming task. The caller has to handle any cleanup."""
344+ def start (self ) -> Task :
345+ """Start the streaming task. The caller has to handle any cleanup by manually calling the `stop` method ."""
346346 if self ._streaming_task :
347347 raise RuntimeError ('Streaming task already active' )
348348 self ._streaming_task = asyncio .create_task (self ._stream_log ())
349349 return self ._streaming_task
350350
351+ def stop (self ) -> None :
352+ """Stop the streaming task."""
353+ if not self ._streaming_task :
354+ raise RuntimeError ('Streaming task is not active' )
355+
356+ self ._streaming_task .cancel ()
357+ self ._streaming_task = None
358+
351359 async def __aenter__ (self ) -> Self :
352360 """Start the streaming task within the context. Exiting the context will cancel the streaming task."""
353- self ()
361+ self . start ()
354362 return self
355363
356364 async def __aexit__ (
357365 self , exc_type : type [BaseException ] | None , exc_val : BaseException | None , exc_tb : TracebackType | None
358366 ) -> None :
359367 """Cancel the streaming task."""
360- if not self ._streaming_task :
361- raise RuntimeError ('Streaming task is not active' )
362-
363- self ._streaming_task .cancel ()
364- self ._streaming_task = None
368+ self .stop ()
365369
366370 async def _stream_log (self ) -> None :
367371 async with self ._log_client .stream (raw = True ) as log_stream :
0 commit comments