|
| 1 | +import shutil |
| 2 | +import subprocess |
| 3 | +from typing import Iterator, Union |
| 4 | + |
| 5 | + |
| 6 | +def is_installed(lib_name: str) -> bool: |
| 7 | + lib = shutil.which(lib_name) |
| 8 | + if lib is None: |
| 9 | + return False |
| 10 | + return True |
| 11 | + |
| 12 | + |
| 13 | +def play( |
| 14 | + audio: Union[bytes, Iterator[bytes]], |
| 15 | + notebook: bool = False, |
| 16 | + use_ffmpeg: bool = True |
| 17 | +) -> None: |
| 18 | + if isinstance(audio, Iterator): |
| 19 | + audio = b"".join(audio) |
| 20 | + if notebook: |
| 21 | + try: |
| 22 | + from IPython.display import Audio, display # type: ignore |
| 23 | + except ModuleNotFoundError: |
| 24 | + message = ( |
| 25 | + "`pip install ipython` required when `notebook=False` " |
| 26 | + ) |
| 27 | + raise ValueError(message) |
| 28 | + |
| 29 | + display(Audio(audio, rate=44100, autoplay=True)) |
| 30 | + elif use_ffmpeg: |
| 31 | + if not is_installed("ffplay"): |
| 32 | + message = ( |
| 33 | + "ffplay from ffmpeg not found, necessary to play audio. " |
| 34 | + "On mac you can install it with 'brew install ffmpeg'. " |
| 35 | + "On linux and windows you can install it from https://ffmpeg.org/" |
| 36 | + ) |
| 37 | + raise ValueError(message) |
| 38 | + args = ["ffplay", "-autoexit", "-", "-nodisp"] |
| 39 | + proc = subprocess.Popen( |
| 40 | + args=args, |
| 41 | + stdout=subprocess.PIPE, |
| 42 | + stdin=subprocess.PIPE, |
| 43 | + stderr=subprocess.PIPE, |
| 44 | + ) |
| 45 | + out, err = proc.communicate(input=audio) |
| 46 | + proc.poll() |
| 47 | + else: |
| 48 | + try: |
| 49 | + import io |
| 50 | + |
| 51 | + import sounddevice as sd # type: ignore |
| 52 | + import soundfile as sf # type: ignore |
| 53 | + except ModuleNotFoundError: |
| 54 | + message = ( |
| 55 | + "`pip install sounddevice soundfile` required when `use_ffmpeg=False` " |
| 56 | + ) |
| 57 | + raise ValueError(message) |
| 58 | + sd.play(*sf.read(io.BytesIO(audio))) |
| 59 | + sd.wait() |
| 60 | + |
| 61 | + |
| 62 | +def save(audio: Union[bytes, Iterator[bytes]], filename: str) -> None: |
| 63 | + if isinstance(audio, Iterator): |
| 64 | + audio = b"".join(audio) |
| 65 | + with open(filename, "wb") as f: |
| 66 | + f.write(audio) |
| 67 | + |
| 68 | + |
| 69 | +def stream(audio_stream: Iterator[bytes]) -> bytes: |
| 70 | + if not is_installed("mpv"): |
| 71 | + message = ( |
| 72 | + "mpv not found, necessary to stream audio. " |
| 73 | + "On mac you can install it with 'brew install mpv'. " |
| 74 | + "On linux and windows you can install it from https://mpv.io/" |
| 75 | + ) |
| 76 | + raise ValueError(message) |
| 77 | + |
| 78 | + mpv_command = ["mpv", "--no-cache", "--no-terminal", "--", "fd://0"] |
| 79 | + mpv_process = subprocess.Popen( |
| 80 | + mpv_command, |
| 81 | + stdin=subprocess.PIPE, |
| 82 | + stdout=subprocess.DEVNULL, |
| 83 | + stderr=subprocess.DEVNULL, |
| 84 | + ) |
| 85 | + |
| 86 | + audio = b"" |
| 87 | + |
| 88 | + for chunk in audio_stream: |
| 89 | + if chunk is not None: |
| 90 | + mpv_process.stdin.write(chunk) # type: ignore |
| 91 | + mpv_process.stdin.flush() # type: ignore |
| 92 | + audio += chunk |
| 93 | + if mpv_process.stdin: |
| 94 | + mpv_process.stdin.close() |
| 95 | + mpv_process.wait() |
| 96 | + |
| 97 | + return audio |
0 commit comments