66import threading
77from asyncio import Task
88from contextlib import asynccontextmanager , contextmanager
9+ from datetime import datetime , timezone
910from threading import Thread
1011from typing import TYPE_CHECKING , Any , cast
1112
@@ -209,6 +210,9 @@ class StreamedLog:
209210 It uses buffer to deal with possibly chunked logs. Chunked logs are stored in buffer. Chunks are expected to contain
210211 specific markers that indicate the start of the log message. Each time a new chunk with complete split marker
211212 arrives, the buffer is processed, logged and emptied.
213+
214+ This works only if the logs have datetime marker in ISO format. For example, `2025-05-12T15:35:59.429Z` This is the
215+ default log standard for the actors.
212216 """
213217
214218 # Test related flag to enable propagation of logs to the `caplog` fixture during tests.
@@ -230,8 +234,8 @@ def __init__(self, to_logger: logging.Logger, *, from_start: bool = True) -> Non
230234 if self ._force_propagate :
231235 to_logger .propagate = True
232236 self ._stream_buffer = list [str ]()
233- self ._split_marker = re .compile (r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)' ) # Ex:2025-05-12T15:35:59.429Z
234- self ._from_start = from_start
237+ self ._split_marker = re .compile (r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)' )
238+ self ._relevancy_time_limit : datetime | None = None if from_start else datetime . now ( tz = timezone . utc )
235239
236240 def _process_new_data (self , data : bytes ) -> None :
237241 new_chunk = data .decode ('utf-8' )
@@ -258,6 +262,11 @@ def _log_buffer_content(self, *, include_last_part: bool = False) -> None:
258262 self ._stream_buffer = all_parts [- 2 :]
259263
260264 for marker , content in zip (message_markers , message_contents ):
265+ if self ._relevancy_time_limit :
266+ log_time = datetime .fromisoformat (marker .replace ('Z' , '+00:00' ))
267+ if log_time < self ._relevancy_time_limit :
268+ # Skip irrelevant logs
269+ continue
261270 message = marker + content
262271 self ._to_logger .log (level = self ._guess_log_level_from_message (message ), msg = message .strip ())
263272
@@ -314,12 +323,7 @@ def _stream_log(self) -> None:
314323 with self ._log_client .stream (raw = True ) as log_stream :
315324 if not log_stream :
316325 return
317- # The first chunk contains all older logs from the start of the actor run until now.
318- skip_first_chunk = not self ._from_start
319326 for data in log_stream .iter_bytes ():
320- if skip_first_chunk :
321- skip_first_chunk = False
322- continue
323327 self ._process_new_data (data )
324328 if self ._stop_logging :
325329 break
@@ -363,12 +367,7 @@ async def _stream_log(self) -> None:
363367 async with self ._log_client .stream (raw = True ) as log_stream :
364368 if not log_stream :
365369 return
366- # The first chunk contains all older logs from the start of the actor run until now.
367- skip_first_chunk = not self ._from_start
368370 async for data in log_stream .aiter_bytes ():
369- if skip_first_chunk :
370- skip_first_chunk = False
371- continue
372371 self ._process_new_data (data )
373372
374373 # If the stream is finished, then the last part will be also processed.
0 commit comments