Skip to content

Commit b1480f2

Browse files
committed
Construct staging and output manifests from FileStager and ResultsCollector
At the moment, JSON staging and outputs manifests are constructed by tracking all actions mapped by the `FileActionMapper` using a list `FileActionMapper.actions`. This makes the `FileActionMapper` stateful, requires including `file_type` as keyword argument for `BaseAction` and its children, requires defining a finalize()` method for `FileActionMapper` and for `BaseAction` and its children. Paying the small price of refactoring `JsonTransferAction`, generate the staging manifest from `FileStager.transfer_tracker.remote_staging_actions` and the output manifest as `ResultsCollector` collects the outputs.
1 parent 7c8371a commit b1480f2

File tree

5 files changed

+58
-52
lines changed

5 files changed

+58
-52
lines changed

pulsar/client/action_mapper.py

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
Any,
2222
Dict,
2323
List,
24-
Optional,
2524
Type,
2625
)
2726
from urllib.parse import urlencode
@@ -191,9 +190,6 @@ def __init__(self, client=None, config=None):
191190
self.ssh_port = config.get("ssh_port", None)
192191
self.mappers = mappers_from_dicts(config.get("paths", []))
193192
self.files_endpoint = config.get("files_endpoint", None)
194-
self.actions = []
195-
# Might want to make the working directory available here so that we know where to place archive
196-
# for archive action
197193

198194
def action(self, source, type, mapper=None):
199195
path = source.get("path", None)
@@ -204,14 +200,10 @@ def action(self, source, type, mapper=None):
204200
if mapper:
205201
file_lister = mapper.file_lister
206202
action_kwds = mapper.action_kwds
207-
action = action_class(source, file_lister=file_lister, file_type=type, **action_kwds)
203+
action = action_class(source, file_lister=file_lister, **action_kwds)
208204
self.__process_action(action, type)
209-
self.actions.append(action)
210205
return action
211206

212-
def finalize(self):
213-
return [_ for _ in (action.finalize() for action in self.actions) if _]
214-
215207
def unstructured_mappers(self):
216208
""" Return mappers that will map 'unstructured' files (i.e. go beyond
217209
mapping inputs, outputs, and config files).
@@ -273,7 +265,6 @@ def __process_action(self, action, file_type):
273265
""" Extension point to populate extra action information after an
274266
action has been created.
275267
"""
276-
action.file_type = file_type
277268
if getattr(action, "inject_url", False):
278269
self.__inject_url(action, file_type)
279270
if getattr(action, "inject_ssh_properties", False):
@@ -309,12 +300,10 @@ class BaseAction:
309300
whole_directory_transfer_supported = False
310301
action_spec: Dict[str, Any] = {}
311302
action_type: str
312-
file_type: Optional[str] = None
313303

314-
def __init__(self, source, file_lister=None, file_type=None):
304+
def __init__(self, source, file_lister=None):
315305
self.source = source
316306
self.file_lister = file_lister or DEFAULT_FILE_LISTER
317-
self.file_type = file_type
318307

319308
@property
320309
def path(self):
@@ -353,9 +342,6 @@ def _extend_base_dict(self, **kwds):
353342
base_dict.update(**kwds)
354343
return base_dict
355344

356-
def finalize(self):
357-
pass
358-
359345
def to_dict(self):
360346
return self._extend_base_dict()
361347

@@ -404,8 +390,8 @@ class RewriteAction(BaseAction):
404390
action_type = "rewrite"
405391
staging = STAGING_ACTION_NONE
406392

407-
def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None, file_type=None):
408-
super().__init__(source, file_lister=file_lister, file_type=file_type)
393+
def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None):
394+
super().__init__(source, file_lister=file_lister)
409395
self.source_directory = source_directory
410396
self.destination_directory = destination_directory
411397

@@ -481,8 +467,8 @@ class RemoteTransferAction(BaseAction):
481467
action_type = "remote_transfer"
482468
staging = STAGING_ACTION_REMOTE
483469

484-
def __init__(self, source, file_lister=None, url=None, file_type=None):
485-
super().__init__(source, file_lister=file_lister, file_type=file_type)
470+
def __init__(self, source, file_lister=None, url=None):
471+
super().__init__(source, file_lister=file_lister)
486472
self.url = url
487473

488474
def to_dict(self):
@@ -509,8 +495,8 @@ class RemoteTransferTusAction(BaseAction):
509495
action_type = "remote_transfer_tus"
510496
staging = STAGING_ACTION_REMOTE
511497

512-
def __init__(self, source, file_lister=None, url=None, file_type=None):
513-
super().__init__(source, file_lister=file_lister, file_type=file_type)
498+
def __init__(self, source, file_lister=None, url=None):
499+
super().__init__(source, file_lister=file_lister)
514500
self.url = url
515501

516502
def to_dict(self):
@@ -537,30 +523,39 @@ class JsonTransferAction(BaseAction):
537523
action_type = "json_transfer"
538524
staging = STAGING_ACTION_REMOTE
539525

540-
def __init__(self, source, file_lister=None, url=None, file_type=None):
541-
super().__init__(source, file_lister, file_type)
526+
def __init__(self, source, file_lister=None, url=None, from_path=None, to_path=None):
527+
super().__init__(source, file_lister)
542528
self.url = url
543-
self._from_path = None
544-
self._to_path = None
529+
530+
self._from_path = from_path
531+
self._to_path = to_path
532+
# `from_path` and `to_path` are mutually exclusive, only one of them should be set
545533

546534
@classmethod
547535
def from_dict(cls, action_dict):
548-
return JsonTransferAction(source=action_dict["source"], url=action_dict["url"])
536+
return JsonTransferAction(
537+
source=action_dict["source"],
538+
url=action_dict["url"],
539+
from_path=action_dict.get("from_path"),
540+
to_path=action_dict.get("to_path")
541+
)
549542

550543
def to_dict(self):
551-
return self._extend_base_dict(url=self.url)
544+
return self._extend_base_dict(**self.to_staging_manifest_entry())
552545

553546
def write_to_path(self, path):
554-
self._to_path = path
547+
self._from_path, self._to_path = None, path
555548

556549
def write_from_path(self, pulsar_path: str):
557-
self._from_path = pulsar_path
550+
self._from_path, self._to_path = pulsar_path, None
558551

559-
def finalize(self):
552+
def to_staging_manifest_entry(self):
553+
staging_manifest_entry = dict(url=self.url)
554+
if self._from_path:
555+
staging_manifest_entry["from_path"] = self._from_path
560556
if self._to_path:
561-
return {"url": self.url, "to_path": self._to_path}
562-
else:
563-
return {"url": self.url, "from_path": self._from_path}
557+
staging_manifest_entry["to_path"] = self._to_path
558+
return staging_manifest_entry
564559

565560

566561
class RemoteObjectStoreCopyAction(BaseAction):
@@ -911,6 +906,7 @@ def unstructured_map(self, path):
911906

912907
__all__ = (
913908
'FileActionMapper',
909+
'JsonTransferAction',
914910
'path_type',
915911
'from_dict',
916912
'MessageAction',

pulsar/client/staging/down.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
from contextlib import contextmanager
44
from json import loads
55
from logging import getLogger
6+
from typing import Optional
67
from os.path import (
78
join,
89
relpath,
910
)
1011

11-
from ..action_mapper import FileActionMapper
12+
from ..action_mapper import FileActionMapper, JsonTransferAction
1213
from ..staging import COMMAND_VERSION_FILENAME
1314

1415
log = getLogger(__name__)
@@ -64,16 +65,17 @@ def __init__(self, output_collector, action_mapper, client_outputs, pulsar_outpu
6465
self.working_directory_contents = pulsar_outputs.working_directory_contents or []
6566
self.metadata_directory_contents = pulsar_outputs.metadata_directory_contents or []
6667
self.job_directory_contents = pulsar_outputs.job_directory_contents or []
68+
self.output_manifest: Optional[list] = None
6769

6870
def collect(self):
71+
self.output_manifest = []
72+
6973
self.__collect_working_directory_outputs()
7074
self.__collect_outputs()
7175
self.__collect_version_file()
7276
self.__collect_other_working_directory_files()
7377
self.__collect_metadata_directory_files()
7478
self.__collect_job_directory_files()
75-
# Give actions that require a final action, like those that write a manifest, to write out their content
76-
self.__finalize_action_mapper()
7779
# finalize collection here for executors that need this ?
7880
return self.exception_tracker.collection_failure_exceptions
7981

@@ -137,9 +139,6 @@ def __collect_job_directory_files(self):
137139
'output_jobdir',
138140
)
139141

140-
def __finalize_action_mapper(self):
141-
self.action_mapper.finalize()
142-
143142
def __realized_dynamic_file_source_references(self):
144143
references = {"filename": [], "extra_files": []}
145144

@@ -212,7 +211,10 @@ def _attempt_collect_output(self, output_type, path, name=None):
212211
def _collect_output(self, output_type, action, name):
213212
log.info("collecting output {} with action {}".format(name, action))
214213
try:
215-
return self.output_collector.collect_output(self, output_type, action, name)
214+
collect_result = self.output_collector.collect_output(self, output_type, action, name)
215+
if isinstance(action, JsonTransferAction):
216+
self.output_manifest.append(action.to_staging_manifest_entry())
217+
return collect_result
216218
except Exception as e:
217219
if _allow_collect_failure(output_type):
218220
log.warning(

pulsar/client/staging/up.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
from ..action_mapper import (
1818
FileActionMapper,
19+
JsonTransferAction,
1920
MessageAction,
2021
path_type,
2122
)
@@ -72,15 +73,19 @@ def submit_job(client, client_job_description, job_config=None):
7273
launch_kwds["dynamic_file_sources"] = client_job_description.client_outputs.dynamic_file_sources
7374
launch_kwds["token_endpoint"] = client.token_endpoint
7475

75-
# populate `to_path`
76+
# generate staging manifest
7677
staging_manifest = []
77-
for action in file_stager.action_mapper.actions:
78-
if action.file_type not in ("output", "output_workdir"):
78+
for action_description in remote_staging_actions:
79+
action_dict = action_description["action"]
80+
is_json_transfer_action = action_dict.get("action_type") == JsonTransferAction.action_type
81+
is_not_output_action = action_description.get("type") not in ("output", "output_workdir")
82+
if is_json_transfer_action and is_not_output_action:
83+
file_type = action_description.get("type")
84+
action = JsonTransferAction.from_dict(action_dict)
7985
name = basename(action.path)
80-
path = file_stager.job_directory.calculate_path(name, action.file_type)
86+
path = file_stager.job_directory.calculate_path(name, file_type)
8187
action.write_to_path(path)
82-
staging_manifest.append(action.finalize())
83-
88+
staging_manifest.append(action.to_staging_manifest_entry())
8489
if staging_manifest:
8590
launch_kwds["staging_manifest"] = staging_manifest
8691

pulsar/managers/staging/post.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ def postprocess(job_directory, action_executor, was_cancelled):
2020
staging_config = job_directory.load_metadata("launch_config").get("remote_staging", None)
2121
else:
2222
staging_config = None
23-
file_action_mapper, collected = _collect_outputs(job_directory, staging_config, action_executor, was_cancelled)
23+
file_action_mapper, _, collected = _collect_outputs(
24+
job_directory, staging_config, action_executor, was_cancelled
25+
)
2426
return collected
2527
finally:
2628
job_directory.write_file("postprocessed", "")
@@ -29,6 +31,7 @@ def postprocess(job_directory, action_executor, was_cancelled):
2931

3032
def _collect_outputs(job_directory, staging_config, action_executor, was_cancelled):
3133
collected = True
34+
output_manifest = None
3235
if "action_mapper" in staging_config:
3336
file_action_mapper = action_mapper.FileActionMapper(config=staging_config["action_mapper"])
3437
client_outputs = staging.ClientOutputs.from_dict(staging_config["client_outputs"])
@@ -39,7 +42,8 @@ def _collect_outputs(job_directory, staging_config, action_executor, was_cancell
3942
if collection_failure_exceptions:
4043
log.warn("Failures collecting results %s" % collection_failure_exceptions)
4144
collected = False
42-
return file_action_mapper, collected
45+
output_manifest = results_collector.output_manifest
46+
return file_action_mapper, output_manifest, collected
4347

4448

4549
def realized_dynamic_file_sources(job_directory):

pulsar/scripts/collect_output_manifest.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,14 @@ def collect_outputs(job_directory: str, staging_config_path: str, output_manifes
2323
with open(staging_config_path) as staging_fh:
2424
staging_config = json.load(staging_fh)
2525

26-
action_mapper, _ = _collect_outputs(
26+
action_mapper, output_manifest, _ = _collect_outputs(
2727
job_directory_,
2828
staging_config=staging_config,
2929
action_executor=RetryActionExecutor(),
3030
was_cancelled=lambda: False
3131
)
32-
new_manifest = action_mapper.finalize()
3332
with open(output_manifest_path, "w") as manifest_fh:
34-
json.dump(new_manifest, manifest_fh)
33+
json.dump(output_manifest, manifest_fh)
3534

3635

3736
def main():

0 commit comments

Comments
 (0)