Skip to content

Commit 0736902

Browse files
committed
Added check data location exists
1 parent 69b4711 commit 0736902

File tree

3 files changed

+83
-11
lines changed

3 files changed

+83
-11
lines changed

streamflow/cwl/step.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -715,9 +715,9 @@ async def _update_file_token(
715715
path=location, dst_deployment=dst_connector.deployment_name
716716
)
717717
):
718+
filepath = dst_path or (dst_dir / selected_location.relpath)
718719
try:
719720
# Build unique destination path
720-
filepath = dst_path or (dst_dir / selected_location.relpath)
721721
if not self.prefix_path:
722722
filepath = cast(CWLWorkflow, self.workflow).get_unique_output_path(
723723
path=filepath, src_location=selected_location
@@ -732,13 +732,18 @@ async def _update_file_token(
732732
)
733733
except FileExistsError:
734734
# Error thrown in the `get_unique_output_path` method
735-
filepath = dst_path or (dst_dir / selected_location.relpath)
736-
for loc in self.workflow.context.data_manager.get_data_locations(
737-
filepath,
738-
selected_location.deployment,
739-
selected_location.location.name,
735+
while (
736+
len(
737+
self.workflow.context.data_manager.get_data_locations(
738+
str(filepath),
739+
selected_location.deployment,
740+
selected_location.location.name,
741+
)
742+
)
743+
== 0
740744
):
741-
await loc.available.wait()
745+
# The concurrent task did not created the `DataLocation` yet
746+
await asyncio.sleep(2)
742747
# Transform token value
743748
new_token_value = {
744749
"class": token_class,

streamflow/data/manager.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,7 @@ async def transfer_data(
441441
data_location.path,
442442
context=self.context,
443443
location=data_location.location,
444+
is_available=True,
444445
)
445446
data_location.data_type = (
446447
DataType.SYMBOLIC_LINK

streamflow/data/remotepath.py

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,19 @@ def _file_checksum_local(path: str) -> str:
4747
return sha1_checksum.hexdigest()
4848

4949

50+
async def _get_data_location(
51+
context: StreamFlowContext, location: ExecutionLocation, path: str
52+
) -> MutableSequence[DataLocation]:
53+
"""Get data locations and wait on the data availability needed"""
54+
if locations := context.data_manager.get_data_locations(
55+
path=path, deployment=location.deployment, location_name=location.name
56+
):
57+
await asyncio.gather(
58+
*(asyncio.create_task(loc.available.wait()) for loc in locations)
59+
)
60+
return locations
61+
62+
5063
def _get_filename_from_response(response: ClientResponse, url: str):
5164
if cd_header := response.headers.get("Content-Disposition"):
5265
message = Message()
@@ -146,7 +159,12 @@ def get_inner_path(
146159

147160
class StreamFlowPath(PurePath, ABC):
148161
def __new__(
149-
cls, *args, context: StreamFlowContext, location: ExecutionLocation, **kwargs
162+
cls,
163+
*args,
164+
context: StreamFlowContext,
165+
location: ExecutionLocation,
166+
is_available: bool = False,
167+
**kwargs,
150168
):
151169
if cls is StreamFlowPath:
152170
cls = LocalStreamFlowPath if location.local else RemoteStreamFlowPath
@@ -304,15 +322,24 @@ def __init__(
304322
self,
305323
*args,
306324
context: StreamFlowContext,
325+
is_available: bool = False,
307326
location: ExecutionLocation | None = None,
308327
):
309328
if sys.version_info < (3, 12):
310329
super().__init__()
311330
else:
312331
super().__init__(*args)
313332
self.context: StreamFlowContext = context
333+
self.is_available: bool = is_available
314334
self.location: ExecutionLocation = location
315335

336+
async def _check_availability(self) -> None:
337+
if not self.is_available:
338+
await _get_data_location(
339+
context=self.context, location=self.location, path=self.__str__()
340+
)
341+
self.is_available = True
342+
316343
async def checksum(self) -> str | None:
317344
if await self.is_file():
318345
loop = asyncio.get_running_loop()
@@ -323,6 +350,7 @@ async def checksum(self) -> str | None:
323350
return None
324351

325352
async def exists(self) -> bool:
353+
await self._check_availability()
326354
return cast(Path, super()).exists()
327355

328356
async def glob(
@@ -332,12 +360,15 @@ async def glob(
332360
yield self.with_segments(path)
333361

334362
async def is_dir(self) -> bool:
363+
await self._check_availability()
335364
return cast(Path, super()).is_dir()
336365

337366
async def is_file(self) -> bool:
367+
await self._check_availability()
338368
return cast(Path, super()).is_file()
339369

340370
async def is_symlink(self) -> bool:
371+
await self._check_availability()
341372
return cast(Path, super()).is_symlink()
342373

343374
async def mkdir(self, mode=0o777, parents=False, exist_ok=False) -> None:
@@ -375,6 +406,7 @@ def __getitem__(self, idx):
375406
async def read_text(self, n=-1, encoding=None, errors=None) -> str:
376407
if sys.version_info >= (3, 10):
377408
encoding = io.text_encoding(encoding)
409+
await self._check_availability()
378410
with self.open(mode="r", encoding=encoding, errors=errors) as f:
379411
return f.read(n)
380412

@@ -433,7 +465,12 @@ async def walk(
433465
yield dirpath, dirnames, filenames
434466

435467
def with_segments(self, *pathsegments):
436-
return type(self)(*pathsegments, context=self.context, location=self.location)
468+
return type(self)(
469+
*pathsegments,
470+
context=self.context,
471+
is_available=self.is_available,
472+
location=self.location,
473+
)
437474

438475
async def write_text(self, data: str, **kwargs) -> int:
439476
return cast(Path, super()).write_text(data=data, **kwargs)
@@ -445,7 +482,13 @@ class RemoteStreamFlowPath(
445482
):
446483
__slots__ = ("context", "connector", "location")
447484

448-
def __init__(self, *args, context: StreamFlowContext, location: ExecutionLocation):
485+
def __init__(
486+
self,
487+
*args,
488+
context: StreamFlowContext,
489+
location: ExecutionLocation,
490+
is_available: bool = False,
491+
):
449492
if sys.version_info < (3, 12):
450493
super().__init__()
451494
else:
@@ -454,9 +497,17 @@ def __init__(self, *args, context: StreamFlowContext, location: ExecutionLocatio
454497
self.connector: Connector = self.context.deployment_manager.get_connector(
455498
location.deployment
456499
)
500+
self.is_available: bool = is_available
457501
self.location: ExecutionLocation = location
458502
self._inner_path: StreamFlowPath | None = None
459503

504+
async def _check_availability(self) -> None:
505+
if not self.is_available:
506+
await _get_data_location(
507+
context=self.context, location=self.location, path=self.__str__()
508+
)
509+
self.is_available = True
510+
460511
def _is_valid_inner_path(self, location: DataLocation) -> bool:
461512
return (
462513
# The data is valid in the location
@@ -539,6 +590,7 @@ async def checksum(self) -> str | None:
539590
if (inner_path := await self._get_inner_path()) != self:
540591
return await inner_path.checksum()
541592
else:
593+
await self._check_availability()
542594
command = [
543595
"test",
544596
"-f",
@@ -565,6 +617,7 @@ async def exists(self) -> bool:
565617
if (inner_path := await self._get_inner_path()) != self:
566618
return await inner_path.exists()
567619
else:
620+
await self._check_availability()
568621
return await self._test(command=(["-e", f"'{self.__str__()}'"]))
569622

570623
async def glob(
@@ -580,6 +633,7 @@ async def glob(
580633
else:
581634
if not pattern:
582635
raise ValueError(f"Unacceptable pattern: {pattern!r}")
636+
await self._check_availability()
583637
command = [
584638
"printf",
585639
'"%s\\0"',
@@ -605,15 +659,18 @@ async def is_dir(self) -> bool:
605659
if (inner_path := await self._get_inner_path()) != self:
606660
return await inner_path.is_dir()
607661
else:
662+
await self._check_availability()
608663
return await self._test(command=["-d", f"'{self.__str__()}'"])
609664

610665
async def is_file(self) -> bool:
611666
if (inner_path := await self._get_inner_path()) != self:
612667
return await inner_path.is_file()
613668
else:
669+
await self._check_availability()
614670
return await self._test(command=["-f", f"'{self.__str__()}'"])
615671

616672
async def is_symlink(self) -> bool:
673+
await self._check_availability()
617674
return await self._test(command=["-L", f"'{self.__str__()}'"])
618675

619676
async def mkdir(self, mode=0o777, parents=False, exist_ok=False) -> None:
@@ -633,6 +690,7 @@ async def read_text(self, n=-1, encoding=None, errors=None) -> str:
633690
if (inner_path := await self._get_inner_path()) != self:
634691
return await inner_path.read_text(n=n, encoding=encoding, errors=errors)
635692
else:
693+
await self._check_availability()
636694
command = ["head", "-c", str(n)] if n >= 0 else ["cat"]
637695
command.append(self.__str__())
638696
result, status = await self.connector.run(
@@ -652,6 +710,7 @@ async def resolve(self, strict=False) -> RemoteStreamFlowPath | None:
652710
if loc.data_type == DataType.PRIMARY:
653711
return self.with_segments(loc.path)
654712
# Otherwise, analyse the remote path
713+
await self._check_availability()
655714
command = [
656715
"test",
657716
"-e",
@@ -676,6 +735,7 @@ async def rmtree(self) -> None:
676735
if (inner_path := await self._get_inner_path()) != self:
677736
await inner_path.rmtree()
678737
else:
738+
await self._check_availability()
679739
command = ["rm", "-rf ", self.__str__()]
680740
result, status = await self.connector.run(
681741
location=self.location, command=command, capture_output=True
@@ -686,6 +746,7 @@ async def size(self) -> int:
686746
if (inner_path := await self._get_inner_path()) != self:
687747
return await inner_path.size()
688748
else:
749+
await self._check_availability()
689750
command = [
690751
"".join(
691752
[
@@ -783,7 +844,12 @@ async def walk(
783844
paths += [path._make_child_relpath(d) for d in reversed(dirnames)]
784845

785846
def with_segments(self, *pathsegments):
786-
return type(self)(*pathsegments, context=self.context, location=self.location)
847+
return type(self)(
848+
*pathsegments,
849+
context=self.context,
850+
is_available=self.is_available,
851+
location=self.location,
852+
)
787853

788854
async def write_text(self, data: str, **kwargs) -> int:
789855
if (inner_path := await self._get_inner_path()) != self:

0 commit comments

Comments
 (0)