From a78dc95adf8d3d26fa28feb4d0be7513e2066aba Mon Sep 17 00:00:00 2001 From: Jin Zhou Date: Mon, 13 Oct 2025 16:18:57 -0400 Subject: [PATCH 1/2] specify utf-8 --- poncho/src/poncho/library_network_code.py | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/poncho/src/poncho/library_network_code.py b/poncho/src/poncho/library_network_code.py index db6127cc71..9e533ee53f 100755 --- a/poncho/src/poncho/library_network_code.py +++ b/poncho/src/poncho/library_network_code.py @@ -174,12 +174,7 @@ def start_function(in_pipe_fd, thread_limit=1): stdout_timed_message(f"TASK {function_id} {function_name} arrives, starting to run in process {os.getpid()}") - try: - exit_status = 1 - except Exception: - stdout_timed_message(f"TASK {function_id} error: can't load the arguments from infile") - exit_status = 2 - raise + exit_status = 1 try: # setup stdout/err for a function call so we can capture them. @@ -199,7 +194,7 @@ def start_function(in_pipe_fd, thread_limit=1): os.dup2(library_fd, sys.stdout.fileno()) except Exception: stdout_timed_message(f"TASK {function_id} error: can't execute this function") - exit_status = 3 + exit_status = 2 raise finally: if function_stdout_fd in locals(): @@ -210,14 +205,14 @@ def start_function(in_pipe_fd, thread_limit=1): cloudpickle.dump(result, f) except Exception: stdout_timed_message(f"TASK {function_id} error: can't load the result from outfile") - exit_status = 4 - if os.path.exits("outfile"): + exit_status = 3 + if os.path.exists("outfile"): os.remove("outfile") raise try: if not result["Success"]: - exit_status = 5 + exit_status = 4 except Exception: stdout_timed_message(f"TASK {function_id} error: the result is invalid") exit_status = 5 @@ -232,14 +227,12 @@ def start_function(in_pipe_fd, thread_limit=1): os._exit(exit_status) elif p < 0: stdout_timed_message(f"TASK {function_id} error: unable to fork to execute {function_name}") - return -1 + return -1, function_id # return pid and function id of child process to parent. else: return p, function_id - return -1 - # Send result of a function execution to worker. Wake worker up to do work with SIGCHLD. def send_result(out_pipe_fd, worker_pid, task_id, exit_code): From d6f2fea0cb83b27693b36bd4795224679b93d0e5 Mon Sep 17 00:00:00 2001 From: Jin Zhou Date: Mon, 13 Oct 2025 16:56:48 -0400 Subject: [PATCH 2/2] vine: some bug fixes for the library code --- poncho/src/poncho/library_network_code.py | 51 +++++++++++------------ 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/poncho/src/poncho/library_network_code.py b/poncho/src/poncho/library_network_code.py index 9e533ee53f..b696ee5e57 100755 --- a/poncho/src/poncho/library_network_code.py +++ b/poncho/src/poncho/library_network_code.py @@ -81,7 +81,10 @@ def remote_wrapper(event): # Handler to sigchld when child exits. def sigchld_handler(signum, frame): # write any byte to signal that there's at least 1 child - os.writev(w, [b"a"]) + try: + os.write(w, b"a") + except OSError: + pass # Read data from worker, start function, and dump result to `outfile`. @@ -150,10 +153,7 @@ def start_function(in_pipe_fd, thread_limit=1): raise except Exception as e: - stdout_timed_message( - f"Library code: Function call failed due to {e}", - file=sys.stderr, - ) + stdout_timed_message(f"Library code: Function call failed due to {e}") sys.exit(1) finally: os.chdir(library_sandbox) @@ -165,40 +165,31 @@ def start_function(in_pipe_fd, thread_limit=1): event = cloudpickle.load(f) except Exception: stdout_timed_message(f"TASK {function_id} error: can't load the arguments from {arg_infile}") - return + return -1, function_id p = os.fork() if p == 0: + exit_status = 1 + try: # change the working directory to the function's sandbox os.chdir(function_sandbox) stdout_timed_message(f"TASK {function_id} {function_name} arrives, starting to run in process {os.getpid()}") - exit_status = 1 - try: - # setup stdout/err for a function call so we can capture them. - function_stdout_fd = os.open( - function_stdout_filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC - ) - # store the library's stdout fd - library_fd = os.dup(sys.stdout.fileno()) + # each child process independently redirects its own stdout/stderr. + with open(function_stdout_filename, "wb", buffering=0) as f: + os.dup2(f.fileno(), 1) # redirect stdout + os.dup2(f.fileno(), 2) # redirect stderr - # only redirect the stdout of a specific FunctionCall task into its own stdout fd, - # otherwise use the library's stdout - os.dup2(function_stdout_fd, sys.stdout.fileno()) - os.dup2(function_stdout_fd, sys.stderr.fileno()) - result = globals()[function_name](event) + stdout_timed_message(f"TASK {function_id} {function_name} starts in PID {os.getpid()}") + result = globals()[function_name](event) + stdout_timed_message(f"TASK {function_id} {function_name} finished") - # restore to the library's stdout fd on completion - os.dup2(library_fd, sys.stdout.fileno()) except Exception: - stdout_timed_message(f"TASK {function_id} error: can't execute this function") + stdout_timed_message(f"TASK {function_id} error: can't execute {function_name} due to {traceback.format_exc()}") exit_status = 2 raise - finally: - if function_stdout_fd in locals(): - os.close(function_stdout_fd) try: with open("outfile", "wb") as f: @@ -424,7 +415,15 @@ def main(): ) else: pid, func_id = start_function(in_pipe_fd, thread_limit) - pid_to_func_id[pid] = func_id + if pid == -1: + send_result( + out_pipe_fd, + args.worker_pid, + func_id, + 1, + ) + else: + pid_to_func_id[pid] = func_id else: # at least 1 child exits, reap all. # read only once as os.read is blocking if there's nothing to read.