diff --git a/poncho/src/poncho/library_network_code.py b/poncho/src/poncho/library_network_code.py index db6127cc71..31dc5b2927 100755 --- a/poncho/src/poncho/library_network_code.py +++ b/poncho/src/poncho/library_network_code.py @@ -28,6 +28,9 @@ r, w = os.pipe() exec_method = None +# infile load mode for function tasks inside this library +function_infile_load_mode = None + # This class captures how results from FunctionCalls are conveyed from # the library to the manager. @@ -84,6 +87,18 @@ def sigchld_handler(signum, frame): os.writev(w, [b"a"]) +# Load the infile for a function task inside this library +def load_function_infile(in_file_path): + if function_infile_load_mode == "cloudpickle": + with open(in_file_path, "rb") as f: + return cloudpickle.load(f) + elif function_infile_load_mode == "json": + with open(in_file_path, "r", encoding="utf-8") as f: + return json.load(f) + else: + raise ValueError(f"invalid infile load mode: {function_infile_load_mode}") + + # Read data from worker, start function, and dump result to `outfile`. def start_function(in_pipe_fd, thread_limit=1): # read length of buffer to read @@ -130,8 +145,7 @@ def start_function(in_pipe_fd, thread_limit=1): os.chdir(function_sandbox) # parameters are represented as infile. - with open("infile", "rb") as f: - event = cloudpickle.load(f) + event = load_function_infile("infile") # output of execution should be dumped to outfile. result = globals()[function_name](event) @@ -160,11 +174,10 @@ def start_function(in_pipe_fd, thread_limit=1): return -1, function_id else: try: - arg_infile = os.path.join(function_sandbox, "infile") - with open(arg_infile, "rb") as f: - event = cloudpickle.load(f) + infile_path = os.path.join(function_sandbox, "infile") + event = load_function_infile(infile_path) except Exception: - stdout_timed_message(f"TASK {function_id} error: can't load the arguments from {arg_infile}") + stdout_timed_message(f"TASK {function_id} error: can't load the arguments from {infile_path}") return p = os.fork() if p == 0: @@ -382,11 +395,16 @@ def main(): global exec_method exec_method = library_info['exec_mode'] + # set infile load mode of functions in this library + global function_infile_load_mode + function_infile_load_mode = library_info['function_infile_load_mode'] + # send configuration of library, just its name for now config = { "name": library_info['library_name'], "taskid": args.task_id, "exec_mode": exec_method, + "function_infile_load_mode": function_infile_load_mode, } send_configuration(config, out_pipe_fd, args.worker_pid) diff --git a/poncho/src/poncho/package_serverize.py b/poncho/src/poncho/package_serverize.py index 4a6e5e7a29..cfc789a11b 100755 --- a/poncho/src/poncho/package_serverize.py +++ b/poncho/src/poncho/package_serverize.py @@ -178,6 +178,7 @@ def pack_library_code(path, envpath): # @param exec_mode The execution mode of functions in this library. # @param hoisting_modules A list of modules imported at the preamble of library, including packages, functions and classes. # @param library_context_info A list containing [library_context_func, library_context_args, library_context_kwargs]. Used to create the library context on remote nodes. +# @param function_infile_load_mode The mode to load infile for function tasks inside this library. # @return A hash value. def generate_library_hash(library_name, function_list, @@ -186,7 +187,8 @@ def generate_library_hash(library_name, add_env, exec_mode, hoisting_modules, - library_context_info): + library_context_info, + function_infile_load_mode): library_info = [library_name] function_list = list(function_list) function_names = set() @@ -234,6 +236,8 @@ def generate_library_hash(library_name, for kwarg in library_context_info[2]: library_info.append(str(kwarg)) library_info.append(str(library_context_info[2][kwarg])) + + library_info.append(str(function_infile_load_mode)) library_info = ''.join(library_info) # linear time complexity msg = hashlib.sha1() @@ -293,6 +297,7 @@ def generate_taskvine_library_code(library_path, hoisting_modules=None): # @param exec_mode execution mode of functions in this library # @param hoisting_modules a list of modules to be imported at the preamble of library # @param library_context_info a list containing a library's context to be created remotely +# @param function_infile_load_mode The mode to load infile for function tasks inside this library. # @return name of the file containing serialized information about the library def generate_library(library_cache_path, library_code_path, @@ -303,7 +308,8 @@ def generate_library(library_cache_path, need_pack=True, exec_mode='fork', hoisting_modules=None, - library_context_info=None + library_context_info=None, + function_infile_load_mode='cloudpickle' ): # create library_info.clpk library_info = {} @@ -313,6 +319,7 @@ def generate_library(library_cache_path, library_info['library_name'] = library_name library_info['exec_mode'] = exec_mode library_info['context_info'] = cloudpickle.dumps(library_context_info) + library_info['function_infile_load_mode'] = function_infile_load_mode with open(library_info_path, 'wb') as f: cloudpickle.dump(library_info, f) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/manager.py b/taskvine/src/bindings/python3/ndcctools/taskvine/manager.py index b6767a6b56..55be0e835d 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/manager.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/manager.py @@ -936,8 +936,9 @@ def check_library_exists(self, library_name): # @param hoisting_modules A list of modules imported at the preamble of library, including packages, functions and classes. # @param exec_mode Execution mode that the library should use to run function calls. Either 'direct' or 'fork' # @param library_context_info A list containing [library_context_func, library_context_args, library_context_kwargs]. Used to create the library context on remote nodes. + # @param function_infile_load_mode The mode to load infile for function tasks inside this library. # @returns A task to be used with @ref ndcctools.taskvine.manager.Manager.install_library. - def create_library_from_functions(self, library_name, *function_list, poncho_env=None, init_command=None, add_env=True, hoisting_modules=None, exec_mode='fork', library_context_info=None): + def create_library_from_functions(self, library_name, *function_list, poncho_env=None, init_command=None, add_env=True, hoisting_modules=None, exec_mode='fork', library_context_info=None, function_infile_load_mode='cloudpickle'): # Delay loading of poncho until here, to avoid bringing in poncho dependencies unless needed. # Ensure poncho python library is available. from ndcctools.poncho import package_serverize @@ -959,7 +960,8 @@ def create_library_from_functions(self, library_name, *function_list, poncho_env add_env=add_env, exec_mode=exec_mode, hoisting_modules=hoisting_modules, - library_context_info=library_context_info) + library_context_info=library_context_info, + function_infile_load_mode=function_infile_load_mode) # Create path for caching library code and environment based on function hash. library_cache_dir_name = "vine-library-cache" @@ -1007,7 +1009,8 @@ def create_library_from_functions(self, library_name, *function_list, poncho_env need_pack=need_pack, exec_mode=exec_mode, hoisting_modules=hoisting_modules, - library_context_info=library_context_info) + library_context_info=library_context_info, + function_infile_load_mode=function_infile_load_mode) # enable correct permissions for library code os.chmod(library_code_path, 0o775)