diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index b787572..d96e08b 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -11,7 +11,7 @@ jobs: runs-on: ubuntu-22.04 strategy: matrix: - python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] steps: - uses: actions/checkout@v2 diff --git a/.mypy.ini b/.mypy.ini index f3a193e..af39cfe 100644 --- a/.mypy.ini +++ b/.mypy.ini @@ -1,2 +1,5 @@ [mypy-unasync] ignore_missing_imports = True + +[mypy-streamer] +ignore_missing_imports = True diff --git a/firecrest/utilities.py b/firecrest/utilities.py index 71a5af2..398d33c 100644 --- a/firecrest/utilities.py +++ b/firecrest/utilities.py @@ -41,6 +41,17 @@ def sched_state_completed(state): return False +def sched_state_running(state): + running_states = { + 'RUNNING', + 'R', # PBS state 'R': job Running + } + return any( + any(rs in s for rs in running_states) + for s in state.split(',') + ) + + def parse_retry_after(retry_after_header, log_func): """ Parse the Retry-After header. diff --git a/firecrest/v2/_async/Client.py b/firecrest/v2/_async/Client.py index eb745f9..4b59318 100644 --- a/firecrest/v2/_async/Client.py +++ b/firecrest/v2/_async/Client.py @@ -16,12 +16,14 @@ import ssl from packaging.version import Version, parse +from streamer import streamer_client as cli from typing import Any, Optional, List from firecrest.utilities import ( parse_retry_after, part_checksum_xml, sched_state_completed, + sched_state_running, time_block, ) from firecrest.FirecrestException import ( @@ -61,7 +63,72 @@ def sleep_generator(): value *= 2 -class AsyncExternalUpload: +class AsyncExternalTransfer: + async def wait_for_transfer_job(self, timeout=None): + await self._client._wait_for_transfer_job( + self._transfer_info, + timeout=timeout + ) + + async def wait_for_streamer_job_to_listen(self): + job_id = self._transfer_info.get("transferJob", {}).get("jobId") + if job_id is None: + raise MultipartUploadException( + self._transfer_info, + "Could not find transfer job ID in the transfer info" + ) + + system_name = self._transfer_info.get("transferJob", {}).get("system") + if system_name is None: + raise MultipartUploadException( + self._transfer_info, + "Could not find transfer job system name in the transfer info" + ) + + for i in sleep_generator(): + try: + job = await self._client.job_info(system_name, job_id) + except FirecrestException as e: + if ( + e.responses[-1].status_code == 404 and + "Job not found" in e.responses[-1].json()['message'] + ): + self._client.log( + logging.DEBUG, + f"Job {job_id} information is not yet available, will " + f"sleep for {i} seconds." + ) + await asyncio.sleep(i) + continue + else: + raise e + + state = job[0]["status"]["state"] + if isinstance(state, list): + state = ",".join(state) + + if sched_state_running(state): + self._client.log( + logging.DEBUG, + f"Job {job_id} is running with state: {state}." + ) + break + + if sched_state_completed(state): + raise MultipartUploadException( + self._transfer_info, + f"Job {job_id} completed before listening for " + f"connections. Current state: {state}." + ) + + self._client.log( + logging.DEBUG, + f"Job {job_id} state is {state}. Will sleep for {i} seconds." + ) + await asyncio.sleep(i) + + +class AsyncExternalUpload(AsyncExternalTransfer): def __init__(self, client, transfer_info, local_file): self._client = client self._local_file = local_file @@ -104,12 +171,6 @@ async def upload_file_to_stage(self): checksum ) - async def wait_for_transfer_job(self, timeout=None): - await self._client._wait_for_transfer_job( - self._transfer_info, - timeout=timeout - ) - async def _upload_part(self, url, index): chunk_size = self._transfer_info.get("maxPartSize") if chunk_size is None: @@ -200,8 +261,35 @@ async def _complete_upload(self, checksum): f"Failed to finish upload: {resp.status_code}: {resp.text}" ) + async def upload_file_streamer(self): + coordinates = self._transfer_info.get( + "transferDirectives", {} + ).get("coordinates") + + if coordinates is None: + raise MultipartUploadException( + self._transfer_info, + "Could not find upload coordinates in the transfer info" + ) + + self._client.log( + logging.DEBUG, + f"Uploading file {self._local_file} with `{coordinates}` " + f"coordinates" + ) + + config = cli.set_coordinates(coordinates) + config.target = self._local_file + await cli.client_send(config) + + self._client.log( + logging.DEBUG, + f"Uploaded file {self._local_file} to {coordinates} " + f"using Streamer client" + ) + -class AsyncExternalDownload: +class AsyncExternalDownload(AsyncExternalTransfer): def __init__(self, client, transfer_info, file_path): self._client = client self._transfer_info = transfer_info @@ -250,10 +338,32 @@ async def download_file_from_stage(self, file_path=None): f"Downloaded file from {download_url} to {file_name}" ) - async def wait_for_transfer_job(self, timeout=None): - await self._client._wait_for_transfer_job( - self._transfer_info, - timeout=timeout + async def download_file_streamer(self, file_path=None): + file_name = file_path or self._file_path + coordinates = self._transfer_info.get( + "transferDirectives", {} + ).get("coordinates") + + if coordinates is None: + raise MultipartUploadException( + self._transfer_info, + "Could not find download coordinates in the transfer info" + ) + + self._client.log( + logging.DEBUG, + f"Downloading file {file_name} with `{coordinates}` " + f"coordinates" + ) + + config = cli.set_coordinates(coordinates) + config.target = file_name + await cli.client_receive(config) + + self._client.log( + logging.DEBUG, + f"Downloaded file {file_name} from {coordinates} " + f"using Streamer client" ) @@ -1366,13 +1476,13 @@ async def upload( relevant when the file is larger than `MAX_DIRECT_UPLOAD_SIZE`) :param transfer_method: the method to be used for the upload of large - files. Currently only "s3" is supported. + files. Supported methods: "s3", "streamer". :calls: POST `/filesystem/{system_name}/transfer/upload` """ - if transfer_method != "s3": + if transfer_method not in ["s3", "streamer"]: raise ValueError( f"Unsupported transfer_method '{transfer_method}'. Only 's3' " - f"is currently supported." + f"and 'streamer' are currently supported." ) if not isinstance(local_file, (str, pathlib.Path)): @@ -1440,17 +1550,28 @@ async def upload( local_file=local_file, ) + actual_transfer_method = transfer_info.get( + "transferDirectives", {} + ).get("transfer_method", "s3") + if blocking: self.log( logging.DEBUG, f"Blocking until ({local_file}) is transfered to the " f"filesystem." ) - # Upload the file in parts - await ext_upload.upload_file_to_stage() + if actual_transfer_method == "s3": + # Upload the file in parts + await ext_upload.upload_file_to_stage() + + # Wait for the file to be available in the target directory + await ext_upload.wait_for_transfer_job() + elif actual_transfer_method == "streamer": + # Wait for job to start listening + await ext_upload.wait_for_streamer_job_to_listen() - # Wait for the file to be available in the target directory - await ext_upload.wait_for_transfer_job() + # Directly upload the file via the streamer + await ext_upload.upload_file_streamer() return ext_upload @@ -1460,7 +1581,8 @@ async def download( source_path: str, target_path: str | pathlib.Path, account: Optional[str] = None, - blocking: bool = True + blocking: bool = True, + transfer_method: str = "s3" ) -> Optional[AsyncExternalDownload]: """Download a file from the remote system. @@ -1472,8 +1594,16 @@ async def download( relevant when the file is larger than `MAX_DIRECT_UPLOAD_SIZE`) :param blocking: whether to wait for the job to complete + :param transfer_method: the method to be used for the download of large + files. Supported methods: "s3", "streamer". :calls: POST `/filesystem/{system_name}/transfer/download` """ + if transfer_method not in ["s3", "streamer"]: + raise ValueError( + f"Unsupported transfer_method '{transfer_method}'. Only 's3' " + f"and 'streamer' are currently supported." + ) + if not isinstance(target_path, (str, pathlib.Path)): raise TypeError( f"`target_path` must be a string or pathlib.Path, got " @@ -1546,14 +1676,23 @@ async def download( transfer_info=transfer_info, file_path=target_path ) + + actual_transfer_method = transfer_info.get( + "transferDirectives", {} + ).get("transfer_method", "s3") + if blocking: self.log( logging.DEBUG, f"Blocking until ({source_path}) is transfered to the " f"filesystem." ) - await download_obj.wait_for_transfer_job() - await download_obj.download_file_from_stage() + if actual_transfer_method == "s3": + await download_obj.wait_for_transfer_job() + await download_obj.download_file_from_stage() + elif actual_transfer_method == "streamer": + await download_obj.wait_for_streamer_job_to_listen() + await download_obj.download_file_streamer() return download_obj diff --git a/firecrest/v2/_sync/Client.py b/firecrest/v2/_sync/Client.py index dd416c7..5befb69 100644 --- a/firecrest/v2/_sync/Client.py +++ b/firecrest/v2/_sync/Client.py @@ -6,6 +6,7 @@ # from __future__ import annotations +import asyncio import httpx import json import logging @@ -15,12 +16,14 @@ import time from packaging.version import Version, parse +from streamer import streamer_client as cli from typing import Any, BinaryIO, List, Optional from firecrest.utilities import ( parse_retry_after, part_checksum_xml, sched_state_completed, + sched_state_running, time_block, ) from firecrest.FirecrestException import ( @@ -60,7 +63,72 @@ def sleep_generator(): value *= 2 -class ExternalUpload: +class ExternalTransfer: + def wait_for_transfer_job(self, timeout=None): + self._client._wait_for_transfer_job( + self._transfer_info, + timeout=timeout + ) + + def wait_for_streamer_job_to_listen(self): + job_id = self._transfer_info.get("transferJob", {}).get("jobId") + if job_id is None: + raise MultipartUploadException( + self._transfer_info, + "Could not find transfer job ID in the transfer info" + ) + + system_name = self._transfer_info.get("transferJob", {}).get("system") + if system_name is None: + raise MultipartUploadException( + self._transfer_info, + "Could not find transfer job system name in the transfer info" + ) + + for i in sleep_generator(): + try: + job = self._client.job_info(system_name, job_id) + except FirecrestException as e: + if ( + e.responses[-1].status_code == 404 and + "Job not found" in e.responses[-1].json()['message'] + ): + self._client.log( + logging.DEBUG, + f"Job {job_id} information is not yet available, will " + f"sleep for {i} seconds." + ) + time.sleep(i) + continue + else: + raise e + + state = job[0]["status"]["state"] + if isinstance(state, list): + state = ",".join(state) + + if sched_state_running(state): + self._client.log( + logging.DEBUG, + f"Job {job_id} is running with state: {state}." + ) + break + + if sched_state_completed(state): + raise MultipartUploadException( + self._transfer_info, + f"Job {job_id} completed before listening for " + f"connections. Current state: {state}." + ) + + self._client.log( + logging.DEBUG, + f"Job {job_id} state is {state}. Will sleep for {i} seconds." + ) + time.sleep(i) + + +class ExternalUpload(ExternalTransfer): def __init__(self, client, transfer_info, local_file, file_size): self._client = client self._local_file = local_file @@ -202,8 +270,35 @@ def _complete_upload(self, checksum): f"Failed to finish upload: {resp.status_code}: {resp.text}" ) + def upload_file_streamer(self): + coordinates = self._transfer_info.get( + "transferDirectives", {} + ).get("coordinates") + + if coordinates is None: + raise MultipartUploadException( + self._transfer_info, + "Could not find upload coordinates in the transfer info" + ) + + self._client.log( + logging.DEBUG, + f"Uploading file {self._local_file} with `{coordinates}` " + f"coordinates" + ) + + config = cli.set_coordinates(coordinates) + config.target = self._local_file + asyncio.run(cli.client_send(config)) -class ExternalDownload: + self._client.log( + logging.DEBUG, + f"Uploaded file {self._local_file} to {coordinates} " + f"using Streamer client" + ) + + +class ExternalDownload(ExternalTransfer): def __init__(self, client, transfer_info, file_path): self._client = client self._transfer_info = transfer_info @@ -258,10 +353,32 @@ def download_file_from_stage(self, file_path=None): f"Downloaded file from {download_url} to {file_name}" ) - def wait_for_transfer_job(self, timeout=None): - self._client._wait_for_transfer_job( - self._transfer_info, - timeout=timeout + def download_file_streamer(self, file_path=None): + file_name = file_path or self._file_path + coordinates = self._transfer_info.get( + "transferDirectives", {} + ).get("coordinates") + + if coordinates is None: + raise MultipartUploadException( + self._transfer_info, + "Could not find download coordinates in the transfer info" + ) + + self._client.log( + logging.DEBUG, + f"Downloading file {file_name} with `{coordinates}` " + f"coordinates" + ) + + config = cli.set_coordinates(coordinates) + config.target = file_name + asyncio.run(cli.client_receive(config)) + + self._client.log( + logging.DEBUG, + f"Downloaded file {file_name} from {coordinates} " + f"using Streamer client" ) @@ -1344,7 +1461,7 @@ def upload( blocking: bool = True, transfer_method: str = "s3", file_size: Optional[int] = None, - ) -> Optional["ExternalUpload"]: + ) -> Optional[ExternalUpload]: """Upload a file to the system. Small files will be uploaded directly to FirecREST and will be immediately available. The function will return `None` in this case. @@ -1366,15 +1483,15 @@ def upload( relevant when the file is larger than `MAX_DIRECT_UPLOAD_SIZE`) :param transfer_method: the method to be used for the upload of large - files. Currently only "s3" is supported. + files. Supported methods: "s3", "streamer". :param file_size: the size of the file in bytes. Required for the `local_file` is a file-like object. :calls: POST `/filesystem/{system_name}/transfer/upload` """ - if transfer_method != "s3": + if transfer_method not in ["s3", "streamer"]: raise ValueError( f"Unsupported transfer_method '{transfer_method}'. Only 's3' " - f"is currently supported." + f"and 'streamer' are currently supported." ) # TODO: check local_file is readable if file-like object, otherwise that is exists @@ -1443,17 +1560,28 @@ def upload( file_size=local_file_size, ) + actual_transfer_method = transfer_info.get( + "transferDirectives", {} + ).get("transfer_method", "s3") + if blocking: self.log( logging.DEBUG, f"Blocking until ({local_file}) is transfered to the " f"filesystem." ) - # Upload the file in parts - ext_upload.upload_file_to_stage() + if actual_transfer_method == "s3": + # Upload the file in parts + ext_upload.upload_file_to_stage() - # Wait for the file to be available in the target directory - ext_upload.wait_for_transfer_job() + # Wait for the file to be available in the target directory + ext_upload.wait_for_transfer_job() + elif actual_transfer_method == "streamer": + # Wait for job to start listening + ext_upload.wait_for_streamer_job_to_listen() + + # Directly upload the file via the streamer + ext_upload.upload_file_streamer() return ext_upload @@ -1461,9 +1589,10 @@ def download( self, system_name: str, source_path: str, - target_path: str | pathlib.Path | BinaryIO, + target_path: str | pathlib.Path, account: Optional[str] = None, - blocking: bool = True + blocking: bool = True, + transfer_method: str = "s3" ) -> Optional[ExternalDownload]: """Download a file from the remote system. @@ -1475,8 +1604,23 @@ def download( relevant when the file is larger than `MAX_DIRECT_UPLOAD_SIZE`) :param blocking: whether to wait for the job to complete + :param transfer_method: the method to be used for the download of large + files. Supported methods: "s3", "streamer". :calls: POST `/filesystem/{system_name}/transfer/download` """ + if transfer_method not in ["s3", "streamer"]: + raise ValueError( + f"Unsupported transfer_method '{transfer_method}'. Only 's3' " + f"and 'streamer' are currently supported." + ) + + if not isinstance(target_path, (str, pathlib.Path)): + raise TypeError( + f"`target_path` must be a string or pathlib.Path, got " + f"{type(target_path)}. For more options, consider using the " + "serial Client." + ) + # Check if the file is small enough to be downloaded directly try: file_info = self.stat(system_name, source_path) @@ -1545,14 +1689,23 @@ def download( transfer_info=transfer_info, file_path=target_path ) + + actual_transfer_method = transfer_info.get( + "transferDirectives", {} + ).get("transfer_method", "s3") + if blocking: self.log( logging.DEBUG, f"Blocking until ({source_path}) is transfered to the " f"filesystem." ) - download_obj.wait_for_transfer_job() - download_obj.download_file_from_stage() + if actual_transfer_method == "s3": + download_obj.wait_for_transfer_job() + download_obj.download_file_from_stage() + elif actual_transfer_method == "streamer": + download_obj.wait_for_streamer_job_to_listen() + download_obj.download_file_streamer() return download_obj diff --git a/pyproject.toml b/pyproject.toml index fc3a2e6..ef05901 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,13 +17,15 @@ maintainers = [ readme = "README.md" license = {file = "LICENSE"} classifiers = [ - "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", "License :: OSI Approved :: BSD License", "Operating System :: OS Independent", ] -requires-python = ">=3.7" +requires-python = ">=3.8" dependencies = [ "aiofiles>=23.2.1", "requests>=2.14.0", @@ -31,7 +33,8 @@ dependencies = [ "typer[all]>=0.7.0,<1.0.0", "packaging>=21.0", "httpx>=0.24.0", - "PyYAML>=5.1" + "PyYAML>=5.1", + "firecrest-streamer>=0.00.21" ] [project.urls]