Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions jellybench_py/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class CommandConfig:
class Constants:
DEFAULT_OUTPUT_JSON: str = "./output.json"
DEFAULT_SERVER_URL: str = "https://hwa.jellyfin.org"
DEFAULT_TIMEOUT: int = 120
NVENC_TEST_WINDOWS = CommandConfig(
BASE_CMD="{ffmpeg} -y -hwaccel cuda -hwaccel_output_format cuda -t 50 -hwaccel_device {gpu} -f lavfi -i testsrc ",
WORKER_CMD="-vf hwupload -fps_mode passthrough -c:a copy -c:v h264_nvenc -b:v {bitrate} -f null -",
Expand Down
2 changes: 1 addition & 1 deletion jellybench_py/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def benchmark(ffmpeg_cmd: str, debug_flag: bool, prog_bar, limit=0) -> tuple:
total_workers *= floor(total_workers * output[1]["speed"])

if args.debug_flag:
print(f'completed with speed {output[1]["speed"]:.02f}')
print(f"completed with speed {output[1]['speed']:.02f}")

# make sure we don't go into already benchmarked region
if total_workers >= min_fail:
Expand Down
81 changes: 58 additions & 23 deletions jellybench_py/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,35 @@
#
##########################################################################################

import concurrent.futures
import re
import shlex
import subprocess
import subprocess as sp
import time

from jellybench_py import ffmpeg_log
from jellybench_py.constant import Constants


def run_ffmpeg(pid: int, ffmpeg_cmd: list) -> tuple: # Process ID,
# print(f"{pid} |> Running FFMPEG Process: {pid}")
print(f"{pid} |> Running FFMPEG Process: {pid}")
timeout = 120 # Stop any process that runs for more then 120sec
failure_reason = None
try:
process_output = subprocess.run(
process_output = sp.run(
ffmpeg_cmd,
stdin=subprocess.PIPE,
stdin=sp.PIPE,
capture_output=True,
universal_newlines=True,
timeout=timeout,
)
retcode = process_output.returncode
ffmpeg_stderr = process_output.stderr

except subprocess.TimeoutExpired:
except sp.TimeoutExpired:
ffmpeg_stderr = ""
retcode = 0
failure_reason = "failed_timeout"

print(f"{pid} |> Finished FFMPEG Process: {pid}")
if 0 < retcode < 255:
ffmpeg_log.set_test_error(ffmpeg_stderr)
failure_reason = "generic_ffmpeg_failure"
Expand Down Expand Up @@ -74,27 +75,60 @@ def run_ffmpeg(pid: int, ffmpeg_cmd: list) -> tuple: # Process ID,
re.search(r"^Error (.*)", ffmpeg_stderr).group(1).strip()
)
break

return ffmpeg_stderr, failure_reason


def workMan(worker_count: int, ffmpeg_cmd: str) -> tuple:
print()
print()
print("Starting a run")
print(f"HERE IS FFMPEG CMD: {type(ffmpeg_cmd)} {ffmpeg_cmd}")
print(f"Here is worker count: {worker_count}")

ffmpeg_cmd_list = shlex.split(ffmpeg_cmd)
raw_worker_data = {}
failure_reason = None
# print(f"> Run with {worker_count} Processes")
with concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) as executor:
futures = {
executor.submit(run_ffmpeg, nr, ffmpeg_cmd_list): nr
for nr in range(worker_count)
}
for future in concurrent.futures.as_completed(futures):
pid = futures[future]
raw_worker_data[pid] = future.result()
# print(f"> > > Finished Worker Process: {pid}")
if raw_worker_data[pid][1]:
failure_reason = raw_worker_data[pid][1]

if failure_reason:
procs, results = {}, {}

# Start processes
for i in range(worker_count):
procs[i] = sp.Popen(
ffmpeg_cmd_list, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE, text=True
)
then = time.time()
keep_waiting = True
while keep_waiting is True and failure_reason is None:
now = time.time()
if now - then >= Constants.DEFAULT_TIMEOUT:
print("Timeout")
failure_reason = "failed_timeout"
else:
keep_waiting = False
for idx, copy_of_proc in list(
procs.items()
): # iterate over a copy to mutate original
if copy_of_proc.poll() is not None:
stdout, stderr = copy_of_proc.communicate()
results[idx] = stdout.strip()
if copy_of_proc.returncode == 0:
print(f"Process {idx} finished with output:\n{stdout}")
raw_worker_data[idx] = [stderr, failure_reason]
else:
print(
f"Process {idx} failed with return code {copy_of_proc.returncode}"
)
failure_reason = f"Worker {idx} failed with return code {copy_of_proc.returncode}"
break
del procs[idx]
else:
keep_waiting = True
time.sleep(1)

if failure_reason is not None:
for _, proc in procs.items():
proc.kill()
print("There was a failure so I killed all the procs")
raw_worker_data = None
# Deleting all the Raw Data, since run with failed Worker is not counted

Expand Down Expand Up @@ -148,9 +182,10 @@ def workMan(worker_count: int, ffmpeg_cmd: str) -> tuple:
}

run_data_raw.append(worker_data)
return False, evaluateRunData(run_data_raw)
print("Evaluating data now!")
return False, evaluateRunData(run_data_raw), None
else:
return True, failure_reason
return True, None, failure_reason


def evaluateRunData(run_data_raw: list) -> dict:
Expand Down