Skip to content

Commit aa0e448

Browse files
authored
Add support streamer data transfers (#197)
* Support streamer external uploads * Add support for streamer upload * Add support for streamer download * Add sched_state_running function * Revert changes * Drop support for python 3.7 * Fix type error * Fixes in sync client * Set version of firecrest-streamer * Fix externalupload init * Add missing type * Fix PR comments
1 parent c03c2c4 commit aa0e448

File tree

6 files changed

+353
-44
lines changed

6 files changed

+353
-44
lines changed

.github/workflows/testing.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ jobs:
1111
runs-on: ubuntu-22.04
1212
strategy:
1313
matrix:
14-
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"]
14+
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
1515

1616
steps:
1717
- uses: actions/checkout@v2

.mypy.ini

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,5 @@
11
[mypy-unasync]
22
ignore_missing_imports = True
3+
4+
[mypy-streamer]
5+
ignore_missing_imports = True

firecrest/utilities.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,17 @@ def sched_state_completed(state):
4141
return False
4242

4343

44+
def sched_state_running(state):
45+
running_states = {
46+
'RUNNING',
47+
'R', # PBS state 'R': job Running
48+
}
49+
return any(
50+
any(rs in s for rs in running_states)
51+
for s in state.split(',')
52+
)
53+
54+
4455
def parse_retry_after(retry_after_header, log_func):
4556
"""
4657
Parse the Retry-After header.

firecrest/v2/_async/Client.py

Lines changed: 161 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616
import ssl
1717

1818
from packaging.version import Version, parse
19+
from streamer import streamer_client as cli
1920
from typing import Any, Optional, List
2021

2122
from firecrest.utilities import (
2223
parse_retry_after,
2324
part_checksum_xml,
2425
sched_state_completed,
26+
sched_state_running,
2527
time_block,
2628
)
2729
from firecrest.FirecrestException import (
@@ -61,7 +63,72 @@ def sleep_generator():
6163
value *= 2
6264

6365

64-
class AsyncExternalUpload:
66+
class AsyncExternalTransfer:
67+
async def wait_for_transfer_job(self, timeout=None):
68+
await self._client._wait_for_transfer_job(
69+
self._transfer_info,
70+
timeout=timeout
71+
)
72+
73+
async def wait_for_streamer_job_to_listen(self):
74+
job_id = self._transfer_info.get("transferJob", {}).get("jobId")
75+
if job_id is None:
76+
raise MultipartUploadException(
77+
self._transfer_info,
78+
"Could not find transfer job ID in the transfer info"
79+
)
80+
81+
system_name = self._transfer_info.get("transferJob", {}).get("system")
82+
if system_name is None:
83+
raise MultipartUploadException(
84+
self._transfer_info,
85+
"Could not find transfer job system name in the transfer info"
86+
)
87+
88+
for i in sleep_generator():
89+
try:
90+
job = await self._client.job_info(system_name, job_id)
91+
except FirecrestException as e:
92+
if (
93+
e.responses[-1].status_code == 404 and
94+
"Job not found" in e.responses[-1].json()['message']
95+
):
96+
self._client.log(
97+
logging.DEBUG,
98+
f"Job {job_id} information is not yet available, will "
99+
f"sleep for {i} seconds."
100+
)
101+
await asyncio.sleep(i)
102+
continue
103+
else:
104+
raise e
105+
106+
state = job[0]["status"]["state"]
107+
if isinstance(state, list):
108+
state = ",".join(state)
109+
110+
if sched_state_running(state):
111+
self._client.log(
112+
logging.DEBUG,
113+
f"Job {job_id} is running with state: {state}."
114+
)
115+
break
116+
117+
if sched_state_completed(state):
118+
raise MultipartUploadException(
119+
self._transfer_info,
120+
f"Job {job_id} completed before listening for "
121+
f"connections. Current state: {state}."
122+
)
123+
124+
self._client.log(
125+
logging.DEBUG,
126+
f"Job {job_id} state is {state}. Will sleep for {i} seconds."
127+
)
128+
await asyncio.sleep(i)
129+
130+
131+
class AsyncExternalUpload(AsyncExternalTransfer):
65132
def __init__(self, client, transfer_info, local_file):
66133
self._client = client
67134
self._local_file = local_file
@@ -104,12 +171,6 @@ async def upload_file_to_stage(self):
104171
checksum
105172
)
106173

107-
async def wait_for_transfer_job(self, timeout=None):
108-
await self._client._wait_for_transfer_job(
109-
self._transfer_info,
110-
timeout=timeout
111-
)
112-
113174
async def _upload_part(self, url, index):
114175
chunk_size = self._transfer_info.get("maxPartSize")
115176
if chunk_size is None:
@@ -200,8 +261,35 @@ async def _complete_upload(self, checksum):
200261
f"Failed to finish upload: {resp.status_code}: {resp.text}"
201262
)
202263

264+
async def upload_file_streamer(self):
265+
coordinates = self._transfer_info.get(
266+
"transferDirectives", {}
267+
).get("coordinates")
268+
269+
if coordinates is None:
270+
raise MultipartUploadException(
271+
self._transfer_info,
272+
"Could not find upload coordinates in the transfer info"
273+
)
274+
275+
self._client.log(
276+
logging.DEBUG,
277+
f"Uploading file {self._local_file} with `{coordinates}` "
278+
f"coordinates"
279+
)
280+
281+
config = cli.set_coordinates(coordinates)
282+
config.target = self._local_file
283+
await cli.client_send(config)
284+
285+
self._client.log(
286+
logging.DEBUG,
287+
f"Uploaded file {self._local_file} to {coordinates} "
288+
f"using Streamer client"
289+
)
290+
203291

204-
class AsyncExternalDownload:
292+
class AsyncExternalDownload(AsyncExternalTransfer):
205293
def __init__(self, client, transfer_info, file_path):
206294
self._client = client
207295
self._transfer_info = transfer_info
@@ -250,10 +338,32 @@ async def download_file_from_stage(self, file_path=None):
250338
f"Downloaded file from {download_url} to {file_name}"
251339
)
252340

253-
async def wait_for_transfer_job(self, timeout=None):
254-
await self._client._wait_for_transfer_job(
255-
self._transfer_info,
256-
timeout=timeout
341+
async def download_file_streamer(self, file_path=None):
342+
file_name = file_path or self._file_path
343+
coordinates = self._transfer_info.get(
344+
"transferDirectives", {}
345+
).get("coordinates")
346+
347+
if coordinates is None:
348+
raise MultipartUploadException(
349+
self._transfer_info,
350+
"Could not find download coordinates in the transfer info"
351+
)
352+
353+
self._client.log(
354+
logging.DEBUG,
355+
f"Downloading file {file_name} with `{coordinates}` "
356+
f"coordinates"
357+
)
358+
359+
config = cli.set_coordinates(coordinates)
360+
config.target = file_name
361+
await cli.client_receive(config)
362+
363+
self._client.log(
364+
logging.DEBUG,
365+
f"Downloaded file {file_name} from {coordinates} "
366+
f"using Streamer client"
257367
)
258368

259369

@@ -1366,13 +1476,13 @@ async def upload(
13661476
relevant when the file is larger than
13671477
`MAX_DIRECT_UPLOAD_SIZE`)
13681478
:param transfer_method: the method to be used for the upload of large
1369-
files. Currently only "s3" is supported.
1479+
files. Supported methods: "s3", "streamer".
13701480
:calls: POST `/filesystem/{system_name}/transfer/upload`
13711481
"""
1372-
if transfer_method != "s3":
1482+
if transfer_method not in ["s3", "streamer"]:
13731483
raise ValueError(
13741484
f"Unsupported transfer_method '{transfer_method}'. Only 's3' "
1375-
f"is currently supported."
1485+
f"and 'streamer' are currently supported."
13761486
)
13771487

13781488
if not isinstance(local_file, (str, pathlib.Path)):
@@ -1440,17 +1550,28 @@ async def upload(
14401550
local_file=local_file,
14411551
)
14421552

1553+
actual_transfer_method = transfer_info.get(
1554+
"transferDirectives", {}
1555+
).get("transfer_method", "s3")
1556+
14431557
if blocking:
14441558
self.log(
14451559
logging.DEBUG,
14461560
f"Blocking until ({local_file}) is transfered to the "
14471561
f"filesystem."
14481562
)
1449-
# Upload the file in parts
1450-
await ext_upload.upload_file_to_stage()
1563+
if actual_transfer_method == "s3":
1564+
# Upload the file in parts
1565+
await ext_upload.upload_file_to_stage()
1566+
1567+
# Wait for the file to be available in the target directory
1568+
await ext_upload.wait_for_transfer_job()
1569+
elif actual_transfer_method == "streamer":
1570+
# Wait for job to start listening
1571+
await ext_upload.wait_for_streamer_job_to_listen()
14511572

1452-
# Wait for the file to be available in the target directory
1453-
await ext_upload.wait_for_transfer_job()
1573+
# Directly upload the file via the streamer
1574+
await ext_upload.upload_file_streamer()
14541575

14551576
return ext_upload
14561577

@@ -1460,7 +1581,8 @@ async def download(
14601581
source_path: str,
14611582
target_path: str | pathlib.Path,
14621583
account: Optional[str] = None,
1463-
blocking: bool = True
1584+
blocking: bool = True,
1585+
transfer_method: str = "s3"
14641586
) -> Optional[AsyncExternalDownload]:
14651587
"""Download a file from the remote system.
14661588
@@ -1472,8 +1594,16 @@ async def download(
14721594
relevant when the file is larger than
14731595
`MAX_DIRECT_UPLOAD_SIZE`)
14741596
:param blocking: whether to wait for the job to complete
1597+
:param transfer_method: the method to be used for the download of large
1598+
files. Supported methods: "s3", "streamer".
14751599
:calls: POST `/filesystem/{system_name}/transfer/download`
14761600
"""
1601+
if transfer_method not in ["s3", "streamer"]:
1602+
raise ValueError(
1603+
f"Unsupported transfer_method '{transfer_method}'. Only 's3' "
1604+
f"and 'streamer' are currently supported."
1605+
)
1606+
14771607
if not isinstance(target_path, (str, pathlib.Path)):
14781608
raise TypeError(
14791609
f"`target_path` must be a string or pathlib.Path, got "
@@ -1546,14 +1676,23 @@ async def download(
15461676
transfer_info=transfer_info,
15471677
file_path=target_path
15481678
)
1679+
1680+
actual_transfer_method = transfer_info.get(
1681+
"transferDirectives", {}
1682+
).get("transfer_method", "s3")
1683+
15491684
if blocking:
15501685
self.log(
15511686
logging.DEBUG,
15521687
f"Blocking until ({source_path}) is transfered to the "
15531688
f"filesystem."
15541689
)
1555-
await download_obj.wait_for_transfer_job()
1556-
await download_obj.download_file_from_stage()
1690+
if actual_transfer_method == "s3":
1691+
await download_obj.wait_for_transfer_job()
1692+
await download_obj.download_file_from_stage()
1693+
elif actual_transfer_method == "streamer":
1694+
await download_obj.wait_for_streamer_job_to_listen()
1695+
await download_obj.download_file_streamer()
15571696

15581697
return download_obj
15591698

0 commit comments

Comments
 (0)