|
3 | 3 | import asyncio
|
4 | 4 | import logging
|
5 | 5 | import re
|
| 6 | +import threading |
6 | 7 | from asyncio import Task
|
7 | 8 | from contextlib import asynccontextmanager, contextmanager
|
| 9 | +from threading import Thread |
8 | 10 | from typing import TYPE_CHECKING, Any, cast
|
9 | 11 |
|
10 | 12 | from apify_shared.utils import ignore_docs
|
@@ -201,67 +203,33 @@ async def stream(self, *, raw: bool = False) -> AsyncIterator[httpx.Response | N
|
201 | 203 | await response.aclose()
|
202 | 204 |
|
203 | 205 |
|
204 |
| -class StreamedLogSync: |
205 |
| - """Utility class for streaming logs from another actor.""" |
206 |
| - |
207 |
| - |
208 |
| -class StreamedLogAsync: |
| 206 | +class StreamedLog: |
209 | 207 | """Utility class for streaming logs from another actor."""
|
210 | 208 |
|
211 | 209 | # Test related flag to enable propagation of logs to the `caplog` fixture during tests.
|
212 | 210 | _force_propagate = False
|
213 | 211 |
|
214 |
| - def __init__(self, log_client: LogClientAsync, to_logger: logging.Logger) -> None: |
215 |
| - self._log_client = log_client |
| 212 | + def __init__(self, to_logger: logging.Logger) -> None: |
216 | 213 | self._to_logger = to_logger
|
217 |
| - self._streaming_task: Task | None = None |
218 | 214 | if self._force_propagate:
|
219 | 215 | to_logger.propagate = True
|
220 | 216 | self._stream_buffer = list[str]()
|
221 | 217 | # Redirected logs are forwarded to logger as soon as there are at least two split markers present in the buffer.
|
222 | 218 | # For example, 2025-05-12T15:35:59.429Z
|
223 | 219 | self._split_marker = re.compile(r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)')
|
224 | 220 |
|
225 |
| - def __call__(self) -> Task: |
226 |
| - """Start the streaming task. The caller has to handle any cleanup.""" |
227 |
| - return asyncio.create_task(self._stream_log()) |
228 |
| - |
229 |
| - async def __aenter__(self) -> Self: |
230 |
| - """Start the streaming task within the context. Exiting the context will cancel the streaming task.""" |
231 |
| - if self._streaming_task: |
232 |
| - raise RuntimeError('Streaming task already active') |
233 |
| - self._streaming_task = self() |
234 |
| - |
235 |
| - return self |
236 |
| - |
237 |
| - async def __aexit__( |
238 |
| - self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None |
239 |
| - ) -> None: |
240 |
| - """Cancel the streaming task.""" |
241 |
| - if not self._streaming_task: |
242 |
| - raise RuntimeError('Streaming task is not active') |
243 |
| - |
244 |
| - self._streaming_task.cancel() |
245 |
| - self._streaming_task = None |
246 |
| - |
247 |
| - async def _stream_log(self) -> None: |
248 |
| - async with self._log_client.stream(raw=True) as log_stream: |
249 |
| - if not log_stream: |
250 |
| - return |
251 |
| - async for data in log_stream.aiter_bytes(): |
252 |
| - new_chunk = data.decode('utf-8') |
253 |
| - self._stream_buffer.append(new_chunk) |
254 |
| - if re.findall(self._split_marker, new_chunk): |
255 |
| - # If complete split marker was found in new chunk, then process the buffer. |
256 |
| - self._log_buffer_content(include_last_part=False) |
257 |
| - |
258 |
| - # If the stream is finished, then the last part will be also processed. |
259 |
| - self._log_buffer_content(include_last_part=True) |
| 221 | + def _process_new_data(self, data: bytes) -> None: |
| 222 | + new_chunk = data.decode('utf-8') |
| 223 | + self._stream_buffer.append(new_chunk) |
| 224 | + if re.findall(self._split_marker, new_chunk): |
| 225 | + # If complete split marker was found in new chunk, then process the buffer. |
| 226 | + self._log_buffer_content(include_last_part=False) |
260 | 227 |
|
261 | 228 | def _log_buffer_content(self, *, include_last_part: bool = False) -> None:
|
262 |
| - """Merge the whole buffer and plit it into parts based on the marker. |
| 229 | + """Merge the whole buffer and split it into parts based on the marker. |
263 | 230 |
|
264 |
| - The last part could be incomplete, and so it can be left unprocessed and in the buffer. |
| 231 | + Log the messages created from the split parts and remove them from buffer. |
| 232 | + The last part could be incomplete, and so it can be left unprocessed and in the buffer until later. |
265 | 233 | """
|
266 | 234 | all_parts = re.split(self._split_marker, ''.join(self._stream_buffer))
|
267 | 235 | # First split is empty string
|
@@ -290,3 +258,98 @@ def _guess_log_level_from_message(message: str) -> int:
|
290 | 258 | return cast('int', logging.getLevelName(level))
|
291 | 259 | # Unknown log level. Fall back to the default.
|
292 | 260 | return logging.INFO
|
| 261 | + |
| 262 | + |
| 263 | +class StreamedLogSync(StreamedLog): |
| 264 | + """Sync variant of `StreamedLog` that is logging in threads.""" |
| 265 | + |
| 266 | + def __init__(self, log_client: LogClient, to_logger: logging.Logger) -> None: |
| 267 | + super().__init__(to_logger=to_logger) |
| 268 | + self._log_client = log_client |
| 269 | + self._streaming_thread: Thread | None = None |
| 270 | + self._stop_logging = False |
| 271 | + |
| 272 | + def __call__(self) -> Thread: |
| 273 | + """Start the streaming thread. The caller has to handle any cleanup.""" |
| 274 | + if self._streaming_thread: |
| 275 | + raise RuntimeError('Streaming thread already active') |
| 276 | + self._stop_logging = False |
| 277 | + self._streaming_thread = threading.Thread(target=self._stream_log) |
| 278 | + self._streaming_thread.start() |
| 279 | + return self._streaming_thread |
| 280 | + |
| 281 | + def __enter__(self) -> Self: |
| 282 | + """Start the streaming thread within the context. Exiting the context will finish the streaming thread.""" |
| 283 | + self() |
| 284 | + return self |
| 285 | + |
| 286 | + def __exit__( |
| 287 | + self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None |
| 288 | + ) -> None: |
| 289 | + """Stop the streaming thread.""" |
| 290 | + if not self._streaming_thread: |
| 291 | + raise RuntimeError('Streaming thread is not active') |
| 292 | + |
| 293 | + # Signal the thread to stop logging and wait for it to finish. |
| 294 | + self._stop_logging = True |
| 295 | + self._streaming_thread.join() |
| 296 | + self._streaming_thread = None |
| 297 | + self._stop_logging = False |
| 298 | + |
| 299 | + def _stream_log(self) -> None: |
| 300 | + with self._log_client.stream(raw=True) as log_stream: |
| 301 | + if not log_stream: |
| 302 | + return |
| 303 | + for data in log_stream.iter_bytes(): |
| 304 | + new_chunk = data.decode('utf-8') |
| 305 | + self._stream_buffer.append(new_chunk) |
| 306 | + if re.findall(self._split_marker, new_chunk): |
| 307 | + # If complete split marker was found in new chunk, then process the buffer. |
| 308 | + self._log_buffer_content(include_last_part=False) |
| 309 | + if self._stop_logging: |
| 310 | + break |
| 311 | + |
| 312 | + # If the stream is finished, then the last part will be also processed. |
| 313 | + self._log_buffer_content(include_last_part=True) |
| 314 | + return |
| 315 | + |
| 316 | + |
| 317 | +class StreamedLogAsync(StreamedLog): |
| 318 | + """Async variant of `StreamedLog` that is logging in tasks.""" |
| 319 | + |
| 320 | + def __init__(self, log_client: LogClientAsync, to_logger: logging.Logger) -> None: |
| 321 | + super().__init__(to_logger=to_logger) |
| 322 | + self._log_client = log_client |
| 323 | + self._streaming_task: Task | None = None |
| 324 | + |
| 325 | + def __call__(self) -> Task: |
| 326 | + """Start the streaming task. The caller has to handle any cleanup.""" |
| 327 | + if self._streaming_task: |
| 328 | + raise RuntimeError('Streaming task already active') |
| 329 | + self._streaming_task = asyncio.create_task(self._stream_log()) |
| 330 | + return self._streaming_task |
| 331 | + |
| 332 | + async def __aenter__(self) -> Self: |
| 333 | + """Start the streaming task within the context. Exiting the context will cancel the streaming task.""" |
| 334 | + self() |
| 335 | + return self |
| 336 | + |
| 337 | + async def __aexit__( |
| 338 | + self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None |
| 339 | + ) -> None: |
| 340 | + """Cancel the streaming task.""" |
| 341 | + if not self._streaming_task: |
| 342 | + raise RuntimeError('Streaming task is not active') |
| 343 | + |
| 344 | + self._streaming_task.cancel() |
| 345 | + self._streaming_task = None |
| 346 | + |
| 347 | + async def _stream_log(self) -> None: |
| 348 | + async with self._log_client.stream(raw=True) as log_stream: |
| 349 | + if not log_stream: |
| 350 | + return |
| 351 | + async for data in log_stream.aiter_bytes(): |
| 352 | + self._process_new_data(data) |
| 353 | + |
| 354 | + # If the stream is finished, then the last part will be also processed. |
| 355 | + self._log_buffer_content(include_last_part=True) |
0 commit comments