Skip to content

Commit cbcabd3

Browse files
committed
Add chunck processing
1 parent cc0d944 commit cbcabd3

File tree

3 files changed

+68
-23
lines changed

3 files changed

+68
-23
lines changed

src/apify_client/clients/resource_clients/actor.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ def call(
290290
timeout_secs: int | None = None,
291291
webhooks: list[dict] | None = None,
292292
wait_secs: int | None = None,
293+
logger: Logger | None | Literal['default'] = 'default',
293294
) -> dict | None:
294295
"""Start the Actor and wait for it to finish before returning the Run object.
295296
@@ -314,6 +315,9 @@ def call(
314315
a webhook set up for the Actor, you do not have to add it again here.
315316
wait_secs: The maximum number of seconds the server waits for the run to finish. If not provided,
316317
waits indefinitely.
318+
logger: Loger used to redirect logs from the Actor run. By default, it is set to "default" which means that
319+
the default logger will be created and used. Setting `None` will disable any log propagation. Passing
320+
custom logger will redirect logs to the provided logger.
317321
318322
Returns:
319323
The run object.
@@ -730,7 +734,8 @@ async def call(
730734

731735
run_client = self.root_client.run(run_id=started_run['id'])
732736
if logger == 'default':
733-
actor_name = actor_data.get('name', '') if (actor_data := await self.get()) else ''
737+
actor_data = await self.get()
738+
actor_name = actor_data.get('name', '') if actor_data else ''
734739
log_context = await run_client.get_streamed_log(actor_name=actor_name)
735740
else:
736741
log_context = await run_client.get_streamed_log(to_logger=logger)

src/apify_client/clients/resource_clients/log.py

Lines changed: 58 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,14 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
2929
resource_path = kwargs.pop('resource_path', 'logs')
3030
super().__init__(*args, resource_path=resource_path, **kwargs)
3131

32-
def get(self, raw: str = False) -> str | None:
32+
def get(self, *, raw: bool = False) -> str | None:
3333
"""Retrieve the log as text.
3434
3535
https://docs.apify.com/api/v2#/reference/logs/log/get-log
3636
37+
Args:
38+
raw: If true, the log will include formating. For example, coloring character sequences.
39+
3740
Returns:
3841
The retrieved log, or None, if it does not exist.
3942
"""
@@ -51,11 +54,14 @@ def get(self, raw: str = False) -> str | None:
5154

5255
return None
5356

54-
def get_as_bytes(self, raw: str = False) -> bytes | None:
57+
def get_as_bytes(self, *, raw: bool = False) -> bytes | None:
5558
"""Retrieve the log as raw bytes.
5659
5760
https://docs.apify.com/api/v2#/reference/logs/log/get-log
5861
62+
Args:
63+
raw: If true, the log will include formating. For example, coloring character sequences.
64+
5965
Returns:
6066
The retrieved log as raw bytes, or None, if it does not exist.
6167
"""
@@ -75,11 +81,14 @@ def get_as_bytes(self, raw: str = False) -> bytes | None:
7581
return None
7682

7783
@contextmanager
78-
def stream(self, raw: str = False) -> Iterator[httpx.Response | None]:
84+
def stream(self, *, raw: bool = False) -> Iterator[httpx.Response | None]:
7985
"""Retrieve the log as a stream.
8086
8187
https://docs.apify.com/api/v2#/reference/logs/log/get-log
8288
89+
Args:
90+
raw: If true, the log will include formating. For example, coloring character sequences.
91+
8392
Returns:
8493
The retrieved log as a context-managed streaming `Response`, or None, if it does not exist.
8594
"""
@@ -110,11 +119,14 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
110119
resource_path = kwargs.pop('resource_path', 'logs')
111120
super().__init__(*args, resource_path=resource_path, **kwargs)
112121

113-
async def get(self, raw: str = False) -> str | None:
122+
async def get(self, *, raw: bool = False) -> str | None:
114123
"""Retrieve the log as text.
115124
116125
https://docs.apify.com/api/v2#/reference/logs/log/get-log
117126
127+
Args:
128+
raw: If true, the log will include formating. For example, coloring character sequences.
129+
118130
Returns:
119131
The retrieved log, or None, if it does not exist.
120132
"""
@@ -132,11 +144,14 @@ async def get(self, raw: str = False) -> str | None:
132144

133145
return None
134146

135-
async def get_as_bytes(self, raw: str = False) -> bytes | None:
147+
async def get_as_bytes(self, *, raw: bool = False) -> bytes | None:
136148
"""Retrieve the log as raw bytes.
137149
138150
https://docs.apify.com/api/v2#/reference/logs/log/get-log
139151
152+
Args:
153+
raw: If true, the log will include formating. For example, coloring character sequences.
154+
140155
Returns:
141156
The retrieved log as raw bytes, or None, if it does not exist.
142157
"""
@@ -156,11 +171,14 @@ async def get_as_bytes(self, raw: str = False) -> bytes | None:
156171
return None
157172

158173
@asynccontextmanager
159-
async def stream(self, raw: str = False) -> AsyncIterator[httpx.Response | None]:
174+
async def stream(self, *, raw: bool = False) -> AsyncIterator[httpx.Response | None]:
160175
"""Retrieve the log as a stream.
161176
162177
https://docs.apify.com/api/v2#/reference/logs/log/get-log
163178
179+
Args:
180+
raw: If true, the log will include formating. For example, coloring character sequences.
181+
164182
Returns:
165183
The retrieved log as a context-managed streaming `Response`, or None, if it does not exist.
166184
"""
@@ -175,7 +193,7 @@ async def stream(self, raw: str = False) -> AsyncIterator[httpx.Response | None]
175193
)
176194

177195
yield response
178-
except Exception as exc:
196+
except ApifyApiError as exc:
179197
catch_not_found_or_throw(exc)
180198
yield None
181199
finally:
@@ -199,10 +217,14 @@ def __init__(self, log_client: LogClientAsync, to_logger: logging.Logger) -> Non
199217
self._streaming_task: Task | None = None
200218
if self._force_propagate:
201219
to_logger.propagate = True
220+
self._stream_buffer = list[str]()
221+
# Redirected logs are forwarded to logger as soon as there are at least two split markers present in the buffer.
222+
# For example, 2025-05-12T15:35:59.429Z
223+
self._split_marker = re.compile(r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)')
202224

203225
def __call__(self) -> Task:
204226
"""Start the streaming task. The caller has to handle any cleanup."""
205-
return asyncio.create_task(self._stream_log(self._to_logger))
227+
return asyncio.create_task(self._stream_log())
206228

207229
async def __aenter__(self) -> Self:
208230
"""Start the streaming task within the context. Exiting the context will cancel the streaming task."""
@@ -222,22 +244,40 @@ async def __aexit__(
222244
self._streaming_task.cancel()
223245
self._streaming_task = None
224246

225-
async def _stream_log(self, to_logger: logging.Logger) -> None:
247+
async def _stream_log(self) -> None:
226248
async with self._log_client.stream(raw=True) as log_stream:
227249
if not log_stream:
228250
return
229251
async for data in log_stream.aiter_bytes():
230-
# Example split marker: \n2025-05-12T15:35:59.429Z
231-
date_time_marker_pattern = r'(\n\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)'
232-
splits = re.split(date_time_marker_pattern, data.decode('utf-8'))
233-
messages = splits[:1]
252+
new_chunk = data.decode('utf-8')
253+
self._stream_buffer.append(new_chunk)
254+
if re.findall(self._split_marker, new_chunk):
255+
# If complete split marker was found in new chunk, then process the buffer.
256+
self._log_buffer_content(include_last_part=False)
234257

235-
for split_marker, message_without_split_marker in zip(splits[1:-1:2], splits[2::2]):
236-
messages.append(split_marker + message_without_split_marker)
258+
# If the stream is finished, then the last part will be also processed.
259+
self._log_buffer_content(include_last_part=True)
237260

238-
for message in messages:
239-
to_logger.log(level=self._guess_log_level_from_message(message), msg=message.strip())
240-
log_stream.close()
261+
def _log_buffer_content(self, *, include_last_part: bool = False) -> None:
262+
"""Merge the whole buffer and plit it into parts based on the marker.
263+
264+
The last part could be incomplete, and so it can be left unprocessed and in the buffer.
265+
"""
266+
all_parts = re.split(self._split_marker, ''.join(self._stream_buffer))
267+
# First split is empty string
268+
if include_last_part:
269+
message_markers = all_parts[1::2]
270+
message_contents = all_parts[2::2]
271+
self._stream_buffer = []
272+
else:
273+
message_markers = all_parts[1:-2:2]
274+
message_contents = all_parts[2:-2:2]
275+
# The last two parts (marker and message) are possibly not complete and will be left in the buffer
276+
self._stream_buffer = all_parts[-2:]
277+
278+
for marker, content in zip(message_markers, message_contents):
279+
message = marker + content
280+
self._to_logger.log(level=self._guess_log_level_from_message(message), msg=message.strip())
241281

242282
@staticmethod
243283
def _guess_log_level_from_message(message: str) -> int:

tests/unit/test_logging.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@
2424
b'2025-05-13T07:24:14.132Z [apify] INFO multiline \n log',
2525
b'2025-05-13T07:25:14.132Z [apify] WARNING some warning',
2626
b'2025-05-13T07:26:14.132Z [apify] DEBUG c',
27-
b'2025-05-13T0', # Chunked log that got split in the marker, part 1
28-
b'7:26:14.132Z [apify] DEBUG d' # Chunked log that got split in the marker, part 2
29-
b'2025-05-13T07:26:14.132Z [apify] DEB', # Chunked log that got split outside of marker, part 1
30-
b'UG e', # Chunked log that got split outside of marker, part 1
27+
b'2025-05-13T0', # Chunked log that got split in the marker, part 1
28+
b'7:26:14.132Z [apify] DEBUG d' # Chunked log that got split in the marker, part 2
29+
b'2025-05-13T07:26:14.132Z [apify] DEB', # Chunked log that got split outside of marker, part 1
30+
b'UG e', # Chunked log that got split outside of marker, part 1
3131
)
3232

3333
_EXPECTED_MESSAGES_AND_LEVELS = (

0 commit comments

Comments
 (0)