@@ -204,7 +204,12 @@ async def stream(self, *, raw: bool = False) -> AsyncIterator[httpx.Response | N
204204
205205
206206class StreamedLog :
207- """Utility class for streaming logs from another actor."""
207+ """Utility class for streaming logs from another actor.
208+
209+ It uses buffer to deal with possibly chunked logs. Chunked logs are stored in buffer. Chunks are expected to contain
210+ specific markers that indicate the start of the log message. Each time a new chunk with complete split marker
211+ arrives, the buffer is processed, logged and emptied.
212+ """
208213
209214 # Test related flag to enable propagation of logs to the `caplog` fixture during tests.
210215 _force_propagate = False
@@ -214,15 +219,13 @@ def __init__(self, to_logger: logging.Logger) -> None:
214219 if self ._force_propagate :
215220 to_logger .propagate = True
216221 self ._stream_buffer = list [str ]()
217- # Redirected logs are forwarded to logger as soon as there are at least two split markers present in the buffer.
218- # For example, 2025-05-12T15:35:59.429Z
219- self ._split_marker = re .compile (r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)' )
222+ self ._split_marker = re .compile (r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)' ) # Ex:2025-05-12T15:35:59.429Z
220223
221224 def _process_new_data (self , data : bytes ) -> None :
222225 new_chunk = data .decode ('utf-8' )
223226 self ._stream_buffer .append (new_chunk )
224227 if re .findall (self ._split_marker , new_chunk ):
225- # If complete split marker was found in new chunk, then process the buffer.
228+ # If complete split marker was found in new chunk, then log the buffer.
226229 self ._log_buffer_content (include_last_part = False )
227230
228231 def _log_buffer_content (self , * , include_last_part : bool = False ) -> None :
@@ -231,15 +234,14 @@ def _log_buffer_content(self, *, include_last_part: bool = False) -> None:
231234 Log the messages created from the split parts and remove them from buffer.
232235 The last part could be incomplete, and so it can be left unprocessed and in the buffer until later.
233236 """
234- all_parts = re .split (self ._split_marker , '' .join (self ._stream_buffer ))
235- # First split is empty string
237+ all_parts = re .split (self ._split_marker , '' .join (self ._stream_buffer ))[1 :] # First split is empty string
236238 if include_last_part :
237- message_markers = all_parts [1 ::2 ]
238- message_contents = all_parts [2 ::2 ]
239+ message_markers = all_parts [0 ::2 ]
240+ message_contents = all_parts [1 ::2 ]
239241 self ._stream_buffer = []
240242 else :
241- message_markers = all_parts [1 :- 2 :2 ]
242- message_contents = all_parts [2 :- 2 :2 ]
243+ message_markers = all_parts [0 :- 2 :2 ]
244+ message_contents = all_parts [1 :- 2 :2 ]
243245 # The last two parts (marker and message) are possibly not complete and will be left in the buffer
244246 self ._stream_buffer = all_parts [- 2 :]
245247
@@ -301,11 +303,7 @@ def _stream_log(self) -> None:
301303 if not log_stream :
302304 return
303305 for data in log_stream .iter_bytes ():
304- new_chunk = data .decode ('utf-8' )
305- self ._stream_buffer .append (new_chunk )
306- if re .findall (self ._split_marker , new_chunk ):
307- # If complete split marker was found in new chunk, then process the buffer.
308- self ._log_buffer_content (include_last_part = False )
306+ self ._process_new_data (data )
309307 if self ._stop_logging :
310308 break
311309
0 commit comments