Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions poncho/src/poncho/library_network_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
r, w = os.pipe()
exec_method = None

# infile load mode for function tasks inside this library
function_infile_load_mode = None


# This class captures how results from FunctionCalls are conveyed from
# the library to the manager.
Expand Down Expand Up @@ -84,6 +87,18 @@ def sigchld_handler(signum, frame):
os.writev(w, [b"a"])


# Load the infile for a function task inside this library
def load_function_infile(in_file_path):
if function_infile_load_mode == "cloudpickle":
with open(in_file_path, "rb") as f:
return cloudpickle.load(f)
elif function_infile_load_mode == "json":
with open(in_file_path, "r", encoding="utf-8") as f:
return json.load(f)
else:
raise ValueError(f"invalid infile load mode: {function_infile_load_mode}")


# Read data from worker, start function, and dump result to `outfile`.
def start_function(in_pipe_fd, thread_limit=1):
# read length of buffer to read
Expand Down Expand Up @@ -130,8 +145,7 @@ def start_function(in_pipe_fd, thread_limit=1):
os.chdir(function_sandbox)

# parameters are represented as infile.
with open("infile", "rb") as f:
event = cloudpickle.load(f)
event = load_function_infile("infile")

# output of execution should be dumped to outfile.
result = globals()[function_name](event)
Expand Down Expand Up @@ -160,11 +174,10 @@ def start_function(in_pipe_fd, thread_limit=1):
return -1, function_id
else:
try:
arg_infile = os.path.join(function_sandbox, "infile")
with open(arg_infile, "rb") as f:
event = cloudpickle.load(f)
infile_path = os.path.join(function_sandbox, "infile")
event = load_function_infile(infile_path)
except Exception:
stdout_timed_message(f"TASK {function_id} error: can't load the arguments from {arg_infile}")
stdout_timed_message(f"TASK {function_id} error: can't load the arguments from {infile_path}")
return
p = os.fork()
if p == 0:
Expand Down Expand Up @@ -382,11 +395,16 @@ def main():
global exec_method
exec_method = library_info['exec_mode']

# set infile load mode of functions in this library
global function_infile_load_mode
function_infile_load_mode = library_info['function_infile_load_mode']

# send configuration of library, just its name for now
config = {
"name": library_info['library_name'],
"taskid": args.task_id,
"exec_mode": exec_method,
"function_infile_load_mode": function_infile_load_mode,
}
send_configuration(config, out_pipe_fd, args.worker_pid)

Expand Down
11 changes: 9 additions & 2 deletions poncho/src/poncho/package_serverize.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def pack_library_code(path, envpath):
# @param exec_mode The execution mode of functions in this library.
# @param hoisting_modules A list of modules imported at the preamble of library, including packages, functions and classes.
# @param library_context_info A list containing [library_context_func, library_context_args, library_context_kwargs]. Used to create the library context on remote nodes.
# @param function_infile_load_mode The mode to load infile for function tasks inside this library.
# @return A hash value.
def generate_library_hash(library_name,
function_list,
Expand All @@ -186,7 +187,8 @@ def generate_library_hash(library_name,
add_env,
exec_mode,
hoisting_modules,
library_context_info):
library_context_info,
function_infile_load_mode):
library_info = [library_name]
function_list = list(function_list)
function_names = set()
Expand Down Expand Up @@ -234,6 +236,8 @@ def generate_library_hash(library_name,
for kwarg in library_context_info[2]:
library_info.append(str(kwarg))
library_info.append(str(library_context_info[2][kwarg]))

library_info.append(str(function_infile_load_mode))

library_info = ''.join(library_info) # linear time complexity
msg = hashlib.sha1()
Expand Down Expand Up @@ -293,6 +297,7 @@ def generate_taskvine_library_code(library_path, hoisting_modules=None):
# @param exec_mode execution mode of functions in this library
# @param hoisting_modules a list of modules to be imported at the preamble of library
# @param library_context_info a list containing a library's context to be created remotely
# @param function_infile_load_mode The mode to load infile for function tasks inside this library.
# @return name of the file containing serialized information about the library
def generate_library(library_cache_path,
library_code_path,
Expand All @@ -303,7 +308,8 @@ def generate_library(library_cache_path,
need_pack=True,
exec_mode='fork',
hoisting_modules=None,
library_context_info=None
library_context_info=None,
function_infile_load_mode='cloudpickle'
):
# create library_info.clpk
library_info = {}
Expand All @@ -313,6 +319,7 @@ def generate_library(library_cache_path,
library_info['library_name'] = library_name
library_info['exec_mode'] = exec_mode
library_info['context_info'] = cloudpickle.dumps(library_context_info)
library_info['function_infile_load_mode'] = function_infile_load_mode
with open(library_info_path, 'wb') as f:
cloudpickle.dump(library_info, f)

Expand Down
9 changes: 6 additions & 3 deletions taskvine/src/bindings/python3/ndcctools/taskvine/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -936,8 +936,9 @@ def check_library_exists(self, library_name):
# @param hoisting_modules A list of modules imported at the preamble of library, including packages, functions and classes.
# @param exec_mode Execution mode that the library should use to run function calls. Either 'direct' or 'fork'
# @param library_context_info A list containing [library_context_func, library_context_args, library_context_kwargs]. Used to create the library context on remote nodes.
# @param function_infile_load_mode The mode to load infile for function tasks inside this library.
# @returns A task to be used with @ref ndcctools.taskvine.manager.Manager.install_library.
def create_library_from_functions(self, library_name, *function_list, poncho_env=None, init_command=None, add_env=True, hoisting_modules=None, exec_mode='fork', library_context_info=None):
def create_library_from_functions(self, library_name, *function_list, poncho_env=None, init_command=None, add_env=True, hoisting_modules=None, exec_mode='fork', library_context_info=None, function_infile_load_mode='cloudpickle'):
# Delay loading of poncho until here, to avoid bringing in poncho dependencies unless needed.
# Ensure poncho python library is available.
from ndcctools.poncho import package_serverize
Expand All @@ -959,7 +960,8 @@ def create_library_from_functions(self, library_name, *function_list, poncho_env
add_env=add_env,
exec_mode=exec_mode,
hoisting_modules=hoisting_modules,
library_context_info=library_context_info)
library_context_info=library_context_info,
function_infile_load_mode=function_infile_load_mode)

# Create path for caching library code and environment based on function hash.
library_cache_dir_name = "vine-library-cache"
Expand Down Expand Up @@ -1007,7 +1009,8 @@ def create_library_from_functions(self, library_name, *function_list, poncho_env
need_pack=need_pack,
exec_mode=exec_mode,
hoisting_modules=hoisting_modules,
library_context_info=library_context_info)
library_context_info=library_context_info,
function_infile_load_mode=function_infile_load_mode)

# enable correct permissions for library code
os.chmod(library_code_path, 0o775)
Expand Down