From 7f57c327bfa9e6fbeb61bb6d5d254455bc37785a Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 5 Jun 2025 10:21:31 +0200 Subject: [PATCH 1/2] use aiohttp to download output ports --- packages/simcore-sdk/requirements/_base.in | 3 +- packages/simcore-sdk/requirements/_base.txt | 25 +++++++++++++++ .../node_ports_common/file_io_utils.py | 32 ++++++++++++++----- 3 files changed, 51 insertions(+), 9 deletions(-) diff --git a/packages/simcore-sdk/requirements/_base.in b/packages/simcore-sdk/requirements/_base.in index 9be327aed363..4ce6caec6571 100644 --- a/packages/simcore-sdk/requirements/_base.in +++ b/packages/simcore-sdk/requirements/_base.in @@ -13,9 +13,10 @@ aiocache aiofiles aiohttp +httpx packaging pint -sqlalchemy[asyncio] pydantic[email] +sqlalchemy[asyncio] tenacity tqdm diff --git a/packages/simcore-sdk/requirements/_base.txt b/packages/simcore-sdk/requirements/_base.txt index 97a80119b7f0..32ba8c8d7a5a 100644 --- a/packages/simcore-sdk/requirements/_base.txt +++ b/packages/simcore-sdk/requirements/_base.txt @@ -44,6 +44,7 @@ anyio==4.8.0 # via # fast-depends # faststream + # httpx arrow==1.3.0 # via # -r requirements/../../../packages/models-library/requirements/_base.in @@ -72,6 +73,8 @@ certifi==2025.1.31 # -c requirements/../../../packages/settings-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../requirements/constraints.txt + # httpcore + # httpx # requests charset-normalizer==3.4.1 # via requests @@ -109,10 +112,32 @@ greenlet==3.1.1 # via sqlalchemy grpcio==1.70.0 # via opentelemetry-exporter-otlp-proto-grpc +h11==0.16.0 + # via httpcore +httpcore==1.0.9 + # via httpx +httpx==0.28.1 + # via + # -c requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/postgres-database/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/postgres-database/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/service-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/service-library/requirements/../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/service-library/requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/service-library/requirements/../../../packages/settings-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/service-library/requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/service-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/settings-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../requirements/constraints.txt + # -r requirements/_base.in idna==3.10 # via # anyio # email-validator + # httpx # requests # yarl importlib-metadata==8.5.0 diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/file_io_utils.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/file_io_utils.py index 51aa3bae3c1c..13d1d4b7c93c 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/file_io_utils.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/file_io_utils.py @@ -7,10 +7,10 @@ from typing import IO, Any, Final, Protocol, runtime_checkable import aiofiles +import httpx from aiohttp import ( ClientConnectionError, ClientError, - ClientPayloadError, ClientResponse, ClientResponseError, ClientSession, @@ -39,6 +39,7 @@ from tqdm.contrib.logging import tqdm_logging_redirect from yarl import URL +from ..config.http_clients import client_request_settings from . import exceptions from .constants import CHUNK_SIZE @@ -148,13 +149,13 @@ async def __call__(self, log: str) -> None: ... async def _file_chunk_writer( file: Path, - response: ClientResponse, + response: httpx.Response, pbar: tqdm, io_log_redirect_cb: LogRedirectCB | None, progress_bar: ProgressBarData, ): async with aiofiles.open(file, "wb") as file_pointer: - while chunk := await response.content.read(CHUNK_SIZE): + async for chunk in response.aiter_bytes(CHUNK_SIZE): await file_pointer.write(chunk) if io_log_redirect_cb and pbar.update(len(chunk)): with log_catch(_logger, reraise=False): @@ -180,21 +181,36 @@ async def download_link_to_file( io_log_redirect_cb: LogRedirectCB | None, progress_bar: ProgressBarData, ): + _ = session # TODO: remove since using httpx + _logger.debug("Downloading from %s to %s", url, file_path) + _logger.debug( + "Download timeout %s ", + client_request_settings.HTTP_CLIENT_REQUEST_TOTAL_TIMEOUT, + ) async for attempt in AsyncRetrying( reraise=True, wait=wait_exponential(min=1, max=10), stop=stop_after_attempt(num_retries), - retry=retry_if_exception_type(ClientConnectionError), + retry=retry_if_exception_type(httpx.TransportError), before_sleep=before_sleep_log(_logger, logging.WARNING, exc_info=True), after=after_log(_logger, log_level=logging.ERROR), ): with attempt: async with AsyncExitStack() as stack: - response = await stack.enter_async_context(session.get(url)) - if response.status == status.HTTP_404_NOT_FOUND: + client = await stack.enter_async_context( + httpx.AsyncClient( + timeout=httpx.Timeout( + client_request_settings.HTTP_CLIENT_REQUEST_TOTAL_TIMEOUT + ) + ) + ) + response = await stack.enter_async_context( + client.stream("GET", f"{url}") + ) + if response.status_code == status.HTTP_404_NOT_FOUND: raise exceptions.InvalidDownloadLinkError(url) - if response.status > _VALID_HTTP_STATUS_CODES: + if response.status_code > _VALID_HTTP_STATUS_CODES: raise exceptions.TransferError(url) file_path.parent.mkdir(parents=True, exist_ok=True) # SEE https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Length @@ -231,7 +247,7 @@ async def download_link_to_file( sub_progress, ) _logger.debug("Download complete") - except ClientPayloadError as exc: + except httpx.HTTPError as exc: raise exceptions.TransferError(url) from exc From 3d75c03cfc080df2c2a19cedcecc60b77ccb99f4 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 5 Jun 2025 10:29:16 +0200 Subject: [PATCH 2/2] removed unused session parameter --- .../node_ports_common/file_io_utils.py | 7 ------- .../node_ports_common/filemanager.py | 19 ++++++++----------- 2 files changed, 8 insertions(+), 18 deletions(-) diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/file_io_utils.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/file_io_utils.py index 13d1d4b7c93c..be5cde27a24a 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/file_io_utils.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/file_io_utils.py @@ -173,7 +173,6 @@ async def _file_chunk_writer( async def download_link_to_file( - session: ClientSession, url: URL, file_path: Path, *, @@ -181,13 +180,7 @@ async def download_link_to_file( io_log_redirect_cb: LogRedirectCB | None, progress_bar: ProgressBarData, ): - _ = session # TODO: remove since using httpx - _logger.debug("Downloading from %s to %s", url, file_path) - _logger.debug( - "Download timeout %s ", - client_request_settings.HTTP_CLIENT_REQUEST_TOTAL_TIMEOUT, - ) async for attempt in AsyncRetrying( reraise=True, wait=wait_exponential(min=1, max=10), diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/filemanager.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/filemanager.py index 5fdd631474d1..0849e8a0732a 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/filemanager.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/filemanager.py @@ -217,7 +217,6 @@ async def download_path_from_s3( return await download_file_from_link( download_link, local_path, - client_session=session, io_log_redirect_cb=io_log_redirect_cb, progress_bar=progress_bar, ) @@ -229,7 +228,6 @@ async def download_file_from_link( *, io_log_redirect_cb: LogRedirectCB | None, file_name: str | None = None, - client_session: ClientSession | None = None, progress_bar: ProgressBarData, ) -> Path: # a download link looks something like: @@ -242,15 +240,14 @@ async def download_file_from_link( if io_log_redirect_cb: await io_log_redirect_cb(f"downloading {local_file_path}, please wait...") - async with ClientSessionContextManager(client_session) as session: - await download_link_to_file( - session, - download_link, - local_file_path, - num_retries=NodePortsSettings.create_from_envs().NODE_PORTS_IO_NUM_RETRY_ATTEMPTS, - io_log_redirect_cb=io_log_redirect_cb, - progress_bar=progress_bar, - ) + + await download_link_to_file( + download_link, + local_file_path, + num_retries=NodePortsSettings.create_from_envs().NODE_PORTS_IO_NUM_RETRY_ATTEMPTS, + io_log_redirect_cb=io_log_redirect_cb, + progress_bar=progress_bar, + ) if io_log_redirect_cb: await io_log_redirect_cb(f"download of {local_file_path} complete.") return local_file_path