Skip to content

Commit cabcb39

Browse files
committed
Support handling referenced tool directories
1 parent 7ce99d8 commit cabcb39

File tree

1 file changed

+32
-6
lines changed
  • pulsar/client/staging

1 file changed

+32
-6
lines changed

pulsar/client/staging/up.py

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@
66
basename,
77
dirname,
88
exists,
9+
isdir,
910
join,
1011
relpath,
1112
)
1213
from re import (
1314
escape,
1415
findall,
1516
)
17+
from typing import Optional
1618

1719
from ..action_mapper import (
1820
FileActionMapper,
@@ -235,8 +237,18 @@ def __initialize_referenced_arbitrary_files(self):
235237
self.arbitrary_files.update(unstructured_map)
236238

237239
def __upload_tool_files(self):
238-
for (referenced_tool_file, name) in self.referenced_tool_files:
239-
self.transfer_tracker.handle_transfer_path(referenced_tool_file, path_type.TOOL, name=name)
240+
for referenced_tool_file, name in self.referenced_tool_files:
241+
if isdir(referenced_tool_file):
242+
self.transfer_tracker.handle_transfer_directory(
243+
path_type.TOOL,
244+
directory=referenced_tool_file,
245+
mode=StageDirectoryType.WHOLE_DIRECTORY,
246+
rel_path_to=self.tool_dir,
247+
)
248+
else:
249+
self.transfer_tracker.handle_transfer_path(
250+
referenced_tool_file, path_type.TOOL, name=name
251+
)
240252

241253
def __upload_job_directory_files(self):
242254
for job_directory_file in self.job_directory_files:
@@ -466,7 +478,14 @@ def handle_transfer_path(self, path, type, name=None, contents=None):
466478
source = {"path": path}
467479
return self.handle_transfer_source(source, type, name=name, contents=contents)
468480

469-
def handle_transfer_directory(self, type, directory=None, action_source=None, mode: StageDirectoryType = StageDirectoryType.CONTENTS):
481+
def handle_transfer_directory(
482+
self,
483+
type,
484+
directory=None,
485+
action_source=None,
486+
mode: StageDirectoryType = StageDirectoryType.CONTENTS,
487+
rel_path_to: Optional[str] = None,
488+
):
470489
# TODO: needs to happen else where if using remote object store staging
471490
# but we don't have the action type yet.
472491
if directory is None:
@@ -481,14 +500,21 @@ def handle_transfer_directory(self, type, directory=None, action_source=None, mo
481500
self.__add_remote_staging_input(action, None, type)
482501
return
483502

484-
directory = action_source['path']
503+
directory = action_source["path"]
485504
else:
486505
assert action_source is None
487506

488507
for directory_file_name in directory_files(directory):
489508
directory_file_path = join(directory, directory_file_name)
490-
rel_path_to = directory if mode == StageDirectoryType.CONTENTS else dirname(directory)
491-
remote_name = self.path_helper.remote_name(relpath(directory_file_path, rel_path_to))
509+
if not rel_path_to:
510+
rel_path_to = (
511+
directory
512+
if mode == StageDirectoryType.CONTENTS
513+
else dirname(directory)
514+
)
515+
remote_name = self.path_helper.remote_name(
516+
relpath(directory_file_path, rel_path_to)
517+
)
492518
self.handle_transfer_path(directory_file_path, type, name=remote_name)
493519

494520
def handle_transfer_source(self, source, type, name=None, contents=None):

0 commit comments

Comments
 (0)