|
| 1 | +import subprocess |
| 2 | +import signal |
| 3 | +import shlex |
| 4 | +import os |
| 5 | +from typing import Union, List |
| 6 | + |
| 7 | + |
| 8 | +def run_terminal_command(command: Union[str, List[str]]) -> str: |
| 9 | + """ |
| 10 | + Executes a terminal command and returns the combined stdout and stderr |
| 11 | + output as a single string. |
| 12 | +
|
| 13 | + This function handles both string commands (which are split safely) |
| 14 | + and list-of-string commands (the preferred secure method). |
| 15 | +
|
| 16 | + Args: |
| 17 | + command: The command to execute, either as a space-separated string |
| 18 | + (e.g., "ls -l /tmp") or a list of arguments (e.g., ["ls", "-l", "/tmp"]). |
| 19 | +
|
| 20 | + Returns: |
| 21 | + A string containing the output of the command, or an error message |
| 22 | + if the command fails to execute. |
| 23 | + """ |
| 24 | + |
| 25 | + # 1. Safely parse the command if it's provided as a string |
| 26 | + if isinstance(command, str): |
| 27 | + # shlex.split safely handles quotes and spaces in arguments |
| 28 | + command_list = shlex.split(command) |
| 29 | + elif isinstance(command, list): |
| 30 | + command_list = command |
| 31 | + else: |
| 32 | + return f"Error: Invalid command format. Expected string or list, got {type(command).__name__}." |
| 33 | + |
| 34 | + if not command_list: |
| 35 | + return "Error: Command list is empty." |
| 36 | + |
| 37 | + def set_sigint_ignore(): |
| 38 | + """ |
| 39 | + Sets the signal handler for SIGINT to ignore in the child process, |
| 40 | + preventing external interrupts from killing the running command. |
| 41 | + """ |
| 42 | + # Only set if running on POSIX systems (e.g., Linux, macOS) |
| 43 | + if os.name == 'posix': |
| 44 | + signal.signal(signal.SIGINT, signal.SIG_IGN) |
| 45 | + |
| 46 | + try: |
| 47 | + # 2. Execute the command |
| 48 | + # capture_output=True collects stdout and stderr |
| 49 | + # text=True decodes the output as text (using the system default encoding) |
| 50 | + # check=False ensures that we don't raise an exception just because the command |
| 51 | + # returns a non-zero exit code (failure); we'll check the exit code manually. |
| 52 | + result = subprocess.run( |
| 53 | + command_list, |
| 54 | + capture_output=True, |
| 55 | + text=True, |
| 56 | + check=False, # Allows us to handle the return code ourselves |
| 57 | + preexec_fn=set_sigint_ignore |
| 58 | + ) |
| 59 | + |
| 60 | + # 3. Check the return code and format the output |
| 61 | + if result.returncode != 0: |
| 62 | + # If the command failed, return a specific error message including stderr |
| 63 | + return ( |
| 64 | + f"Command failed with exit code {result.returncode}:\n" |
| 65 | + f"Command: {' '.join(command_list)}\n" |
| 66 | + f"--- Standard Error ---\n{result.stderr.strip() |
| 67 | + or 'No error output'}\n" |
| 68 | + f"--- Standard Output ---\n{result.stdout.strip() |
| 69 | + or 'No standard output'}" |
| 70 | + ) |
| 71 | + else: |
| 72 | + # If successful, return the combined standard output |
| 73 | + # We prioritize stdout but include stderr if it exists (e.g., for warnings) |
| 74 | + output = result.stdout.strip() |
| 75 | + if result.stderr: |
| 76 | + output += f"\n\n[Warning: The command also produced the following standard error (stderr) output]:\n{ |
| 77 | + result.stderr.strip()}" |
| 78 | + |
| 79 | + return output if output else "Command executed successfully, but produced no output." |
| 80 | + |
| 81 | + except FileNotFoundError: |
| 82 | + # This occurs if the executable itself (the first item in the list) is not found |
| 83 | + return f"Error: Command executable not found: '{command_list[0]}'. Check your PATH." |
| 84 | + except Exception as e: |
| 85 | + # Catch any unexpected errors during execution setup |
| 86 | + return f"Unexpected error during command execution: {e}" |
| 87 | + |
| 88 | + |
| 89 | +def run_ffmpeg_encode( |
| 90 | + uid, |
| 91 | + data, |
| 92 | + data_lock, |
| 93 | + quality, |
| 94 | + audio_map, |
| 95 | + subtitle_map |
| 96 | +): |
| 97 | + with data_lock: |
| 98 | + cmd = f""" |
| 99 | +ffmpeg -hide_banner -loglevel info -stats -progress pipe:1 |
| 100 | +-i {data[uid]["input_file"]} |
| 101 | +-map 0:v:0 |
| 102 | +{audio_map} |
| 103 | +{subtitle_map} |
| 104 | +-c:v libx265 |
| 105 | +-preset {quality["preset"]} |
| 106 | +-crf {quality["crf"]} |
| 107 | +-x265-params "aq-mode={quality['aq_mode']}" |
| 108 | +-c:a aac -b:a {quality['bitrate']} |
| 109 | +-c:s copy |
| 110 | +{data[uid]["encoded_file"]} |
| 111 | +""" |
| 112 | + shlex.split(cmd) |
| 113 | + process = subprocess.Popen( |
| 114 | + cmd, |
| 115 | + stdout=subprocess.PIPE, |
| 116 | + stderr=subprocess.PIPE, |
| 117 | + universal_newlines=True, |
| 118 | + bufsize=1 |
| 119 | + ) |
| 120 | + |
| 121 | + while True: |
| 122 | + line = process.stderr.readline() |
| 123 | + if not line and process.poll() is not None: |
| 124 | + break |
| 125 | + |
| 126 | + if line and line.startswith("frame="): |
| 127 | + try: |
| 128 | + curframe = int(line.split("=")[1]) |
| 129 | + with data_lock: |
| 130 | + data[uid]["current_frame"] = curframe |
| 131 | + |
| 132 | + except ValueError: |
| 133 | + pass |
| 134 | + |
| 135 | + # Wait for the process to fully finish and get the return code |
| 136 | + return process.wait() |
| 137 | + |
| 138 | + |
| 139 | +if __name__ == "__main__": |
| 140 | + print("--- Example 1 (Successful Command) ---") |
| 141 | + print(run_terminal_command("echo Hello World")) |
| 142 | + print("--- Example 2 (Command with arguments) ---") |
| 143 | + print(run_terminal_command("ls -a /")) |
| 144 | + print("--- Example 3 (Failing Command) ---") |
| 145 | + print(run_terminal_command("cat non_existent_file.txt")) |
| 146 | + print("--- Example 4 (Command without output)") |
| 147 | + print(run_terminal_command("touch nothing")) |
0 commit comments