diff --git a/poncho/src/poncho/library_network_code.py b/poncho/src/poncho/library_network_code.py index db6127cc71..b696ee5e57 100755 --- a/poncho/src/poncho/library_network_code.py +++ b/poncho/src/poncho/library_network_code.py @@ -81,7 +81,10 @@ def remote_wrapper(event): # Handler to sigchld when child exits. def sigchld_handler(signum, frame): # write any byte to signal that there's at least 1 child - os.writev(w, [b"a"]) + try: + os.write(w, b"a") + except OSError: + pass # Read data from worker, start function, and dump result to `outfile`. @@ -150,10 +153,7 @@ def start_function(in_pipe_fd, thread_limit=1): raise except Exception as e: - stdout_timed_message( - f"Library code: Function call failed due to {e}", - file=sys.stderr, - ) + stdout_timed_message(f"Library code: Function call failed due to {e}") sys.exit(1) finally: os.chdir(library_sandbox) @@ -165,9 +165,11 @@ def start_function(in_pipe_fd, thread_limit=1): event = cloudpickle.load(f) except Exception: stdout_timed_message(f"TASK {function_id} error: can't load the arguments from {arg_infile}") - return + return -1, function_id p = os.fork() if p == 0: + exit_status = 1 + try: # change the working directory to the function's sandbox os.chdir(function_sandbox) @@ -175,49 +177,33 @@ def start_function(in_pipe_fd, thread_limit=1): stdout_timed_message(f"TASK {function_id} {function_name} arrives, starting to run in process {os.getpid()}") try: - exit_status = 1 - except Exception: - stdout_timed_message(f"TASK {function_id} error: can't load the arguments from infile") - exit_status = 2 - raise + # each child process independently redirects its own stdout/stderr. + with open(function_stdout_filename, "wb", buffering=0) as f: + os.dup2(f.fileno(), 1) # redirect stdout + os.dup2(f.fileno(), 2) # redirect stderr - try: - # setup stdout/err for a function call so we can capture them. - function_stdout_fd = os.open( - function_stdout_filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC - ) - # store the library's stdout fd - library_fd = os.dup(sys.stdout.fileno()) + stdout_timed_message(f"TASK {function_id} {function_name} starts in PID {os.getpid()}") + result = globals()[function_name](event) + stdout_timed_message(f"TASK {function_id} {function_name} finished") - # only redirect the stdout of a specific FunctionCall task into its own stdout fd, - # otherwise use the library's stdout - os.dup2(function_stdout_fd, sys.stdout.fileno()) - os.dup2(function_stdout_fd, sys.stderr.fileno()) - result = globals()[function_name](event) - - # restore to the library's stdout fd on completion - os.dup2(library_fd, sys.stdout.fileno()) except Exception: - stdout_timed_message(f"TASK {function_id} error: can't execute this function") - exit_status = 3 + stdout_timed_message(f"TASK {function_id} error: can't execute {function_name} due to {traceback.format_exc()}") + exit_status = 2 raise - finally: - if function_stdout_fd in locals(): - os.close(function_stdout_fd) try: with open("outfile", "wb") as f: cloudpickle.dump(result, f) except Exception: stdout_timed_message(f"TASK {function_id} error: can't load the result from outfile") - exit_status = 4 - if os.path.exits("outfile"): + exit_status = 3 + if os.path.exists("outfile"): os.remove("outfile") raise try: if not result["Success"]: - exit_status = 5 + exit_status = 4 except Exception: stdout_timed_message(f"TASK {function_id} error: the result is invalid") exit_status = 5 @@ -232,14 +218,12 @@ def start_function(in_pipe_fd, thread_limit=1): os._exit(exit_status) elif p < 0: stdout_timed_message(f"TASK {function_id} error: unable to fork to execute {function_name}") - return -1 + return -1, function_id # return pid and function id of child process to parent. else: return p, function_id - return -1 - # Send result of a function execution to worker. Wake worker up to do work with SIGCHLD. def send_result(out_pipe_fd, worker_pid, task_id, exit_code): @@ -431,7 +415,15 @@ def main(): ) else: pid, func_id = start_function(in_pipe_fd, thread_limit) - pid_to_func_id[pid] = func_id + if pid == -1: + send_result( + out_pipe_fd, + args.worker_pid, + func_id, + 1, + ) + else: + pid_to_func_id[pid] = func_id else: # at least 1 child exits, reap all. # read only once as os.read is blocking if there's nothing to read.