diff --git a/jellybench_py/constant.py b/jellybench_py/constant.py index 95ec56b..092eed3 100644 --- a/jellybench_py/constant.py +++ b/jellybench_py/constant.py @@ -11,6 +11,7 @@ class CommandConfig: class Constants: DEFAULT_OUTPUT_JSON: str = "./output.json" DEFAULT_SERVER_URL: str = "https://hwa.jellyfin.org" + DEFAULT_TIMEOUT: int = 120 NVENC_TEST_WINDOWS = CommandConfig( BASE_CMD="{ffmpeg} -y -hwaccel cuda -hwaccel_output_format cuda -t 50 -hwaccel_device {gpu} -f lavfi -i testsrc ", WORKER_CMD="-vf hwupload -fps_mode passthrough -c:a copy -c:v h264_nvenc -b:v {bitrate} -f null -", diff --git a/jellybench_py/core.py b/jellybench_py/core.py index 64efa5d..820ec67 100644 --- a/jellybench_py/core.py +++ b/jellybench_py/core.py @@ -275,7 +275,7 @@ def benchmark(ffmpeg_cmd: str, debug_flag: bool, prog_bar, limit=0) -> tuple: total_workers *= floor(total_workers * output[1]["speed"]) if args.debug_flag: - print(f'completed with speed {output[1]["speed"]:.02f}') + print(f"completed with speed {output[1]['speed']:.02f}") # make sure we don't go into already benchmarked region if total_workers >= min_fail: diff --git a/jellybench_py/worker.py b/jellybench_py/worker.py index 4ab6da4..3f0e374 100644 --- a/jellybench_py/worker.py +++ b/jellybench_py/worker.py @@ -18,22 +18,23 @@ # ########################################################################################## -import concurrent.futures import re import shlex -import subprocess +import subprocess as sp +import time from jellybench_py import ffmpeg_log +from jellybench_py.constant import Constants def run_ffmpeg(pid: int, ffmpeg_cmd: list) -> tuple: # Process ID, - # print(f"{pid} |> Running FFMPEG Process: {pid}") + print(f"{pid} |> Running FFMPEG Process: {pid}") timeout = 120 # Stop any process that runs for more then 120sec failure_reason = None try: - process_output = subprocess.run( + process_output = sp.run( ffmpeg_cmd, - stdin=subprocess.PIPE, + stdin=sp.PIPE, capture_output=True, universal_newlines=True, timeout=timeout, @@ -41,11 +42,11 @@ def run_ffmpeg(pid: int, ffmpeg_cmd: list) -> tuple: # Process ID, retcode = process_output.returncode ffmpeg_stderr = process_output.stderr - except subprocess.TimeoutExpired: + except sp.TimeoutExpired: ffmpeg_stderr = "" retcode = 0 failure_reason = "failed_timeout" - + print(f"{pid} |> Finished FFMPEG Process: {pid}") if 0 < retcode < 255: ffmpeg_log.set_test_error(ffmpeg_stderr) failure_reason = "generic_ffmpeg_failure" @@ -74,27 +75,60 @@ def run_ffmpeg(pid: int, ffmpeg_cmd: list) -> tuple: # Process ID, re.search(r"^Error (.*)", ffmpeg_stderr).group(1).strip() ) break + return ffmpeg_stderr, failure_reason def workMan(worker_count: int, ffmpeg_cmd: str) -> tuple: + print() + print() + print("Starting a run") + print(f"HERE IS FFMPEG CMD: {type(ffmpeg_cmd)} {ffmpeg_cmd}") + print(f"Here is worker count: {worker_count}") + ffmpeg_cmd_list = shlex.split(ffmpeg_cmd) raw_worker_data = {} failure_reason = None - # print(f"> Run with {worker_count} Processes") - with concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) as executor: - futures = { - executor.submit(run_ffmpeg, nr, ffmpeg_cmd_list): nr - for nr in range(worker_count) - } - for future in concurrent.futures.as_completed(futures): - pid = futures[future] - raw_worker_data[pid] = future.result() - # print(f"> > > Finished Worker Process: {pid}") - if raw_worker_data[pid][1]: - failure_reason = raw_worker_data[pid][1] - - if failure_reason: + procs, results = {}, {} + + # Start processes + for i in range(worker_count): + procs[i] = sp.Popen( + ffmpeg_cmd_list, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE, text=True + ) + then = time.time() + keep_waiting = True + while keep_waiting is True and failure_reason is None: + now = time.time() + if now - then >= Constants.DEFAULT_TIMEOUT: + print("Timeout") + failure_reason = "failed_timeout" + else: + keep_waiting = False + for idx, copy_of_proc in list( + procs.items() + ): # iterate over a copy to mutate original + if copy_of_proc.poll() is not None: + stdout, stderr = copy_of_proc.communicate() + results[idx] = stdout.strip() + if copy_of_proc.returncode == 0: + print(f"Process {idx} finished with output:\n{stdout}") + raw_worker_data[idx] = [stderr, failure_reason] + else: + print( + f"Process {idx} failed with return code {copy_of_proc.returncode}" + ) + failure_reason = f"Worker {idx} failed with return code {copy_of_proc.returncode}" + break + del procs[idx] + else: + keep_waiting = True + time.sleep(1) + + if failure_reason is not None: + for _, proc in procs.items(): + proc.kill() + print("There was a failure so I killed all the procs") raw_worker_data = None # Deleting all the Raw Data, since run with failed Worker is not counted @@ -148,9 +182,10 @@ def workMan(worker_count: int, ffmpeg_cmd: str) -> tuple: } run_data_raw.append(worker_data) - return False, evaluateRunData(run_data_raw) + print("Evaluating data now!") + return False, evaluateRunData(run_data_raw), None else: - return True, failure_reason + return True, None, failure_reason def evaluateRunData(run_data_raw: list) -> dict: