Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
runs-on: ubuntu-22.04
strategy:
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]

steps:
- uses: actions/checkout@v2
Expand Down
3 changes: 3 additions & 0 deletions .mypy.ini
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
[mypy-unasync]
ignore_missing_imports = True

[mypy-streamer]
ignore_missing_imports = True
11 changes: 11 additions & 0 deletions firecrest/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ def sched_state_completed(state):
return False


def sched_state_running(state):
running_states = {
'RUNNING',
'R', # PBS state 'R': job Running
}
return any(
any(rs in s for rs in running_states)
for s in state.split(',')
)


def parse_retry_after(retry_after_header, log_func):
"""
Parse the Retry-After header.
Expand Down
183 changes: 161 additions & 22 deletions firecrest/v2/_async/Client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
import ssl

from packaging.version import Version, parse
from streamer import streamer_client as cli
from typing import Any, Optional, List

from firecrest.utilities import (
parse_retry_after,
part_checksum_xml,
sched_state_completed,
sched_state_running,
time_block,
)
from firecrest.FirecrestException import (
Expand Down Expand Up @@ -61,7 +63,72 @@ def sleep_generator():
value *= 2


class AsyncExternalUpload:
class AsyncExternalTransfer:
async def wait_for_transfer_job(self, timeout=None):
await self._client._wait_for_transfer_job(
self._transfer_info,
timeout=timeout
)

async def wait_for_streamer_job_to_listen(self):
job_id = self._transfer_info.get("transferJob", {}).get("jobId")
if job_id is None:
raise MultipartUploadException(
self._transfer_info,
"Could not find transfer job ID in the transfer info"
)

system_name = self._transfer_info.get("transferJob", {}).get("system")
if system_name is None:
raise MultipartUploadException(
self._transfer_info,
"Could not find transfer job system name in the transfer info"
)

for i in sleep_generator():
try:
job = await self._client.job_info(system_name, job_id)
except FirecrestException as e:
if (
e.responses[-1].status_code == 404 and
"Job not found" in e.responses[-1].json()['message']
):
self._client.log(
logging.DEBUG,
f"Job {job_id} information is not yet available, will "
f"sleep for {i} seconds."
)
await asyncio.sleep(i)
continue
else:
raise e

state = job[0]["status"]["state"]
if isinstance(state, list):
state = ",".join(state)

if sched_state_running(state):
self._client.log(
logging.DEBUG,
f"Job {job_id} is running with state: {state}."
)
break

if sched_state_completed(state):
raise MultipartUploadException(
self._transfer_info,
f"Job {job_id} completed before listening for "
f"connections. Current state: {state}."
)

self._client.log(
logging.DEBUG,
f"Job {job_id} state is {state}. Will sleep for {i} seconds."
)
await asyncio.sleep(i)


class AsyncExternalUpload(AsyncExternalTransfer):
def __init__(self, client, transfer_info, local_file):
self._client = client
self._local_file = local_file
Expand Down Expand Up @@ -104,12 +171,6 @@ async def upload_file_to_stage(self):
checksum
)

async def wait_for_transfer_job(self, timeout=None):
await self._client._wait_for_transfer_job(
self._transfer_info,
timeout=timeout
)

async def _upload_part(self, url, index):
chunk_size = self._transfer_info.get("maxPartSize")
if chunk_size is None:
Expand Down Expand Up @@ -200,8 +261,35 @@ async def _complete_upload(self, checksum):
f"Failed to finish upload: {resp.status_code}: {resp.text}"
)

async def upload_file_streamer(self):
coordinates = self._transfer_info.get(
"transferDirectives", {}
).get("coordinates")

if coordinates is None:
raise MultipartUploadException(
self._transfer_info,
"Could not find upload coordinates in the transfer info"
)

self._client.log(
logging.DEBUG,
f"Uploading file {self._local_file} with `{coordinates}` "
f"coordinates"
)

config = cli.set_coordinates(coordinates)
config.target = self._local_file
await cli.client_send(config)

self._client.log(
logging.DEBUG,
f"Uploaded file {self._local_file} to {coordinates} "
f"using Streamer client"
)


class AsyncExternalDownload:
class AsyncExternalDownload(AsyncExternalTransfer):
def __init__(self, client, transfer_info, file_path):
self._client = client
self._transfer_info = transfer_info
Expand Down Expand Up @@ -250,10 +338,32 @@ async def download_file_from_stage(self, file_path=None):
f"Downloaded file from {download_url} to {file_name}"
)

async def wait_for_transfer_job(self, timeout=None):
await self._client._wait_for_transfer_job(
self._transfer_info,
timeout=timeout
async def download_file_streamer(self, file_path=None):
file_name = file_path or self._file_path
coordinates = self._transfer_info.get(
"transferDirectives", {}
).get("coordinates")

if coordinates is None:
raise MultipartUploadException(
self._transfer_info,
"Could not find upload coordinates in the transfer info"
)

self._client.log(
logging.DEBUG,
f"Downloading file {file_name} with `{coordinates}` "
f"coordinates"
)

config = cli.set_coordinates(coordinates)
config.target = file_name
await cli.client_receive(config)

self._client.log(
logging.DEBUG,
f"Downloaded file {file_name} from {coordinates} "
f"using Streamer client"
)


Expand Down Expand Up @@ -1366,13 +1476,13 @@ async def upload(
relevant when the file is larger than
`MAX_DIRECT_UPLOAD_SIZE`)
:param transfer_method: the method to be used for the upload of large
files. Currently only "s3" is supported.
files. Supported methods: "s3", "streamer".
:calls: POST `/filesystem/{system_name}/transfer/upload`
"""
if transfer_method != "s3":
if transfer_method not in ["s3", "streamer"]:
raise ValueError(
f"Unsupported transfer_method '{transfer_method}'. Only 's3' "
f"is currently supported."
f"and 'streamer' are currently supported."
)

if not isinstance(local_file, (str, pathlib.Path)):
Expand Down Expand Up @@ -1440,17 +1550,28 @@ async def upload(
local_file=local_file,
)

actual_transfer_method = transfer_info.get(
"transferDirectives", {}
).get("transfer_method", "s3")

if blocking:
self.log(
logging.DEBUG,
f"Blocking until ({local_file}) is transfered to the "
f"filesystem."
)
# Upload the file in parts
await ext_upload.upload_file_to_stage()
if actual_transfer_method == "s3":
# Upload the file in parts
await ext_upload.upload_file_to_stage()

# Wait for the file to be available in the target directory
await ext_upload.wait_for_transfer_job()
elif actual_transfer_method == "streamer":
# Wait for job to start listening
await ext_upload.wait_for_streamer_job_to_listen()

# Wait for the file to be available in the target directory
await ext_upload.wait_for_transfer_job()
# Directly upload the file via the streamer
await ext_upload.upload_file_streamer()

return ext_upload

Expand All @@ -1460,7 +1581,8 @@ async def download(
source_path: str,
target_path: str | pathlib.Path,
account: Optional[str] = None,
blocking: bool = True
blocking: bool = True,
transfer_method: str = "s3"
) -> Optional[AsyncExternalDownload]:
"""Download a file from the remote system.

Expand All @@ -1472,8 +1594,16 @@ async def download(
relevant when the file is larger than
`MAX_DIRECT_UPLOAD_SIZE`)
:param blocking: whether to wait for the job to complete
:param transfer_method: the method to be used for the download of large
files. Supported methods: "s3", "streamer".
:calls: POST `/filesystem/{system_name}/transfer/download`
"""
if transfer_method not in ["s3", "streamer"]:
raise ValueError(
f"Unsupported transfer_method '{transfer_method}'. Only 's3' "
f"and 'streamer' are currently supported."
)

if not isinstance(target_path, (str, pathlib.Path)):
raise TypeError(
f"`target_path` must be a string or pathlib.Path, got "
Expand Down Expand Up @@ -1546,14 +1676,23 @@ async def download(
transfer_info=transfer_info,
file_path=target_path
)

actual_transfer_method = transfer_info.get(
"transferDirectives", {}
).get("transfer_method", "s3")

if blocking:
self.log(
logging.DEBUG,
f"Blocking until ({source_path}) is transfered to the "
f"filesystem."
)
await download_obj.wait_for_transfer_job()
await download_obj.download_file_from_stage()
if actual_transfer_method == "s3":
await download_obj.wait_for_transfer_job()
await download_obj.download_file_from_stage()
elif actual_transfer_method == "streamer":
await download_obj.wait_for_streamer_job_to_listen()
await download_obj.download_file_streamer()

return download_obj

Expand Down
Loading