55# file, You can obtain one at http://mozilla.org/MPL/2.0/.
66from __future__ import annotations
77
8- from typing import Iterable
8+ from typing import Any , Iterable
99from pathlib import Path
1010import asyncio
11+ import json
12+ import logging
1113import mimetypes
1214import os
1315import shutil
@@ -34,7 +36,89 @@ def __init__(self) -> None:
3436
3537
3638ffmpeg_path = _abswhich ("ffmpeg" )
37- ffmpeg_default_params = ("-hide_banner" , "-loglevel" , "warning" )
39+ ffmpeg_default_params = ("-hide_banner" , "-loglevel" , "warning" , "-y" )
40+
41+ ffprobe_path = _abswhich ("ffprobe" )
42+ ffprobe_default_params = (
43+ "-loglevel" ,
44+ "quiet" ,
45+ "-print_format" ,
46+ "json" ,
47+ "-show_optional_fields" ,
48+ "1" ,
49+ "-show_format" ,
50+ "-show_streams" ,
51+ )
52+
53+
54+ async def probe_path (
55+ input_file : os .PathLike [str ] | str ,
56+ logger : logging .Logger | None = None ,
57+ ) -> Any :
58+ """
59+ Probes a media file on the disk using ffprobe.
60+
61+ Args:
62+ input_file: The full path to the file.
63+
64+ Returns:
65+ A Python object containing the parsed JSON response from ffprobe
66+
67+ Raises:
68+ ConverterError: if ffprobe returns a non-zero exit code.
69+ """
70+ if ffprobe_path is None :
71+ raise NotInstalledError ()
72+
73+ input_file = Path (input_file )
74+ proc = await asyncio .create_subprocess_exec (
75+ ffprobe_path ,
76+ * ffprobe_default_params ,
77+ str (input_file ),
78+ stdout = asyncio .subprocess .PIPE ,
79+ stderr = asyncio .subprocess .PIPE ,
80+ stdin = asyncio .subprocess .PIPE ,
81+ )
82+ stdout , stderr = await proc .communicate ()
83+ if proc .returncode != 0 :
84+ err_text = stderr .decode ("utf-8" ) if stderr else f"unknown ({ proc .returncode } )"
85+ raise ConverterError (f"ffprobe error: { err_text } " )
86+ elif stderr and logger :
87+ logger .warn (f"ffprobe warning: { stderr .decode ('utf-8' )} " )
88+ return json .loads (stdout )
89+
90+
91+ async def probe_bytes (
92+ data : bytes ,
93+ input_mime : str | None = None ,
94+ logger : logging .Logger | None = None ,
95+ ) -> Any :
96+ """
97+ Probe media file data using ffprobe.
98+
99+ Args:
100+ data: The bytes of the file to probe.
101+ input_mime: The mime type of the input data. If not specified, will be guessed using magic.
102+
103+ Returns:
104+ A Python object containing the parsed JSON response from ffprobe
105+
106+ Raises:
107+ ConverterError: if ffprobe returns a non-zero exit code.
108+ """
109+ if ffprobe_path is None :
110+ raise NotInstalledError ()
111+
112+ if input_mime is None :
113+ if magic is None :
114+ raise ValueError ("input_mime was not specified and magic is not installed" )
115+ input_mime = magic .mimetype (data )
116+ input_extension = mimetypes .guess_extension (input_mime )
117+ with tempfile .TemporaryDirectory (prefix = "mautrix_ffmpeg_" ) as tmpdir :
118+ input_file = Path (tmpdir ) / f"data{ input_extension } "
119+ with open (input_file , "wb" ) as file :
120+ file .write (data )
121+ return await probe_path (input_file = input_file , logger = logger )
38122
39123
40124async def convert_path (
@@ -44,6 +128,7 @@ async def convert_path(
44128 output_args : Iterable [str ] | None = None ,
45129 remove_input : bool = False ,
46130 output_path_override : os .PathLike [str ] | str | None = None ,
131+ logger : logging .Logger | None = None ,
47132) -> Path | bytes :
48133 """
49134 Convert a media file on the disk using ffmpeg.
@@ -76,6 +161,10 @@ async def convert_path(
76161 else :
77162 input_file = Path (input_file )
78163 output_file = input_file .parent / f"{ input_file .stem } { output_extension } "
164+ if input_file == output_file :
165+ output_file = Path (output_file )
166+ output_file = output_file .parent / f"{ output_file .stem } -new{ output_extension } "
167+
79168 proc = await asyncio .create_subprocess_exec (
80169 ffmpeg_path ,
81170 * ffmpeg_default_params ,
@@ -92,9 +181,8 @@ async def convert_path(
92181 if proc .returncode != 0 :
93182 err_text = stderr .decode ("utf-8" ) if stderr else f"unknown ({ proc .returncode } )"
94183 raise ConverterError (f"ffmpeg error: { err_text } " )
95- elif stderr :
96- # TODO log warnings?
97- pass
184+ elif stderr and logger :
185+ logger .warn (f"ffmpeg warning: { stderr .decode ('utf-8' )} " )
98186 if remove_input and isinstance (input_file , Path ):
99187 input_file .unlink (missing_ok = True )
100188 return stdout if output_file == "-" else output_file
@@ -106,6 +194,7 @@ async def convert_bytes(
106194 input_args : Iterable [str ] | None = None ,
107195 output_args : Iterable [str ] | None = None ,
108196 input_mime : str | None = None ,
197+ logger : logging .Logger | None = None ,
109198) -> bytes :
110199 """
111200 Convert media file data using ffmpeg.
@@ -140,6 +229,7 @@ async def convert_bytes(
140229 output_extension = output_extension ,
141230 input_args = input_args ,
142231 output_args = output_args ,
232+ logger = logger ,
143233 )
144234 with open (output_file , "rb" ) as file :
145235 return file .read ()
@@ -152,4 +242,6 @@ async def convert_bytes(
152242 "NotInstalledError" ,
153243 "convert_bytes" ,
154244 "convert_path" ,
245+ "probe_bytes" ,
246+ "probe_path" ,
155247]
0 commit comments