Skip to content

Commit 83ed013

Browse files
authored
Implement stage-in for zip staging provider (#3395)
1 parent a9b7350 commit 83ed013

File tree

3 files changed

+118
-0
lines changed

3 files changed

+118
-0
lines changed

parsl/data_provider/zip.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ class ZipFileStaging(Staging):
4242
"""
4343

4444
def can_stage_out(self, file: File) -> bool:
45+
return self.is_zip_url(file)
46+
47+
def can_stage_in(self, file: File) -> bool:
48+
return self.is_zip_url(file)
49+
50+
def is_zip_url(self, file: File) -> bool:
4551
logger.debug("archive provider checking File {}".format(repr(file)))
4652

4753
# First check if this is the scheme we care about
@@ -76,6 +82,20 @@ def stage_out(self, dm, executor, file, parent_fut):
7682
app_fut = stage_out_app(zip_path, inside_path, working_dir, inputs=[file], _parsl_staging_inhibit=True, parent_fut=parent_fut)
7783
return app_fut
7884

85+
def stage_in(self, dm, executor, file, parent_fut):
86+
assert file.scheme == 'zip'
87+
88+
zip_path, inside_path = zip_path_split(file.path)
89+
90+
working_dir = dm.dfk.executors[executor].working_dir
91+
92+
if working_dir:
93+
file.local_path = os.path.join(working_dir, inside_path)
94+
95+
stage_in_app = _zip_stage_in_app(dm)
96+
app_fut = stage_in_app(zip_path, inside_path, working_dir, outputs=[file], _parsl_staging_inhibit=True, parent_fut=parent_fut)
97+
return app_fut._outputs[0]
98+
7999

80100
def _zip_stage_out(zip_file, inside_path, working_dir, parent_fut=None, inputs=[], _parsl_staging_inhibit=True):
81101
file = inputs[0]
@@ -93,6 +113,18 @@ def _zip_stage_out_app(dm):
93113
return parsl.python_app(executors=['_parsl_internal'], data_flow_kernel=dm.dfk)(_zip_stage_out)
94114

95115

116+
def _zip_stage_in(zip_file, inside_path, working_dir, *, parent_fut, outputs, _parsl_staging_inhibit=True):
117+
with filelock.FileLock(zip_file + ".lock"):
118+
with zipfile.ZipFile(zip_file, mode='r') as z:
119+
content = z.read(inside_path)
120+
with open(outputs[0], "wb") as of:
121+
of.write(content)
122+
123+
124+
def _zip_stage_in_app(dm):
125+
return parsl.python_app(executors=['_parsl_internal'], data_flow_kernel=dm.dfk)(_zip_stage_in)
126+
127+
96128
def zip_path_split(path: str) -> Tuple[str, str]:
97129
"""Split zip: path into a zipfile name and a contained-file name.
98130
"""
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import parsl
2+
import pytest
3+
import random
4+
import zipfile
5+
6+
from parsl.data_provider.files import File
7+
from parsl.data_provider.zip import ZipAuthorityError, ZipFileStaging
8+
9+
from parsl.providers import LocalProvider
10+
from parsl.channels import LocalChannel
11+
from parsl.launchers import SimpleLauncher
12+
13+
from parsl.config import Config
14+
from parsl.executors import HighThroughputExecutor
15+
16+
from parsl.tests.configs.htex_local import fresh_config as local_config
17+
18+
19+
@parsl.python_app
20+
def count_lines(file):
21+
with open(file, "r") as f:
22+
return len(f.readlines())
23+
24+
25+
@pytest.mark.local
26+
def test_zip_in(tmpd_cwd):
27+
# basic test of zip file stage-in
28+
zip_path = tmpd_cwd / "container.zip"
29+
file_base = "data.txt"
30+
zip_file = File(f"zip:{zip_path / file_base}")
31+
32+
# create a zip file containing one file with some abitrary number of lines
33+
n_lines = random.randint(0, 1000)
34+
35+
with zipfile.ZipFile(zip_path, mode='w') as z:
36+
with z.open(file_base, mode='w') as f:
37+
for _ in range(n_lines):
38+
f.write(b'someline\n')
39+
40+
app_future = count_lines(zip_file)
41+
42+
assert app_future.result() == n_lines
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import parsl
2+
import pytest
3+
import random
4+
import zipfile
5+
6+
from parsl.data_provider.files import File
7+
from parsl.data_provider.zip import ZipAuthorityError, ZipFileStaging
8+
9+
from parsl.providers import LocalProvider
10+
from parsl.channels import LocalChannel
11+
from parsl.launchers import SimpleLauncher
12+
13+
from parsl.config import Config
14+
from parsl.executors import HighThroughputExecutor
15+
16+
from parsl.tests.configs.htex_local import fresh_config as local_config
17+
18+
19+
@parsl.python_app
20+
def generate_lines(n: int, *, outputs):
21+
with open(outputs[0], "w") as f:
22+
for x in range(n):
23+
# write numbered lines
24+
f.write(str(x) + "\n")
25+
26+
27+
@parsl.python_app
28+
def count_lines(file):
29+
with open(file, "r") as f:
30+
return len(f.readlines())
31+
32+
33+
@pytest.mark.local
34+
def test_zip_pipeline(tmpd_cwd):
35+
# basic test of zip file stage-in
36+
zip_path = tmpd_cwd / "container.zip"
37+
file_base = "data.txt"
38+
zip_file = File(f"zip:{zip_path / file_base}")
39+
40+
n_lines = random.randint(0, 1000)
41+
generate_fut = generate_lines(n_lines, outputs=[zip_file])
42+
n_lines_out = count_lines(generate_fut.outputs[0]).result()
43+
44+
assert n_lines == n_lines_out

0 commit comments

Comments
 (0)