|
| 1 | +#!/usr/bin/env python3 |
| 2 | +""" |
| 3 | +Match audio files to videos using transcription comparison. |
| 4 | +
|
| 5 | +Uses Whisper to transcribe both the video audio and external audio files, |
| 6 | +then matches them by comparing the transcribed text. |
| 7 | +
|
| 8 | +Usage: uv run --with openai-whisper match-audio-transcription.py --dry-run |
| 9 | +
|
| 10 | +Requires: openai-whisper, torch |
| 11 | +""" |
| 12 | + |
| 13 | +import argparse |
| 14 | +import json |
| 15 | +import os |
| 16 | +import subprocess |
| 17 | +import sys |
| 18 | +import tempfile |
| 19 | +from difflib import SequenceMatcher |
| 20 | +from pathlib import Path |
| 21 | + |
| 22 | +# Configuration |
| 23 | +TAGS_FILE = os.path.expanduser("~/videos/quadrant-tags.json") |
| 24 | +AUDIO_DIR = os.path.expanduser("~/downloads/Second Room Recordings/Audio") |
| 25 | +PROCESSED_VIDEO_DIR = os.path.expanduser("~/videos/take-2") |
| 26 | +OUTPUT_DIR = os.path.expanduser("~/videos/synced") |
| 27 | +TRANSCRIPTS_CACHE = os.path.expanduser("~/videos/transcripts.json") |
| 28 | + |
| 29 | +# How much audio to transcribe for matching (seconds) |
| 30 | +TRANSCRIBE_DURATION = 120 |
| 31 | + |
| 32 | + |
| 33 | +def is_second_room_video(path: str) -> bool: |
| 34 | + """Check if a video is from the Second Room.""" |
| 35 | + return "Second Room" in path |
| 36 | + |
| 37 | + |
| 38 | +def extract_audio(input_file: str, output_file: str, duration: int = None) -> bool: |
| 39 | + """Extract audio from video/audio file to WAV.""" |
| 40 | + cmd = [ |
| 41 | + "ffmpeg", "-y", "-hide_banner", "-loglevel", "error", |
| 42 | + "-i", input_file, |
| 43 | + ] |
| 44 | + if duration: |
| 45 | + cmd.extend(["-t", str(duration)]) |
| 46 | + cmd.extend([ |
| 47 | + "-ac", "1", |
| 48 | + "-ar", "16000", # Whisper expects 16kHz |
| 49 | + "-f", "wav", |
| 50 | + output_file |
| 51 | + ]) |
| 52 | + result = subprocess.run(cmd, capture_output=True) |
| 53 | + return result.returncode == 0 |
| 54 | + |
| 55 | + |
| 56 | +def transcribe_audio(audio_file: str, model) -> str: |
| 57 | + """Transcribe audio file using Whisper.""" |
| 58 | + result = model.transcribe(audio_file, language="en", fp16=False) |
| 59 | + return result["text"].strip() |
| 60 | + |
| 61 | + |
| 62 | +def text_similarity(text1: str, text2: str) -> float: |
| 63 | + """Calculate similarity between two texts (0-1).""" |
| 64 | + # Normalize texts |
| 65 | + t1 = text1.lower().split() |
| 66 | + t2 = text2.lower().split() |
| 67 | + |
| 68 | + # Use SequenceMatcher for fuzzy matching |
| 69 | + matcher = SequenceMatcher(None, t1, t2) |
| 70 | + return matcher.ratio() |
| 71 | + |
| 72 | + |
| 73 | +def load_transcripts_cache() -> dict: |
| 74 | + """Load cached transcripts.""" |
| 75 | + if Path(TRANSCRIPTS_CACHE).exists(): |
| 76 | + with open(TRANSCRIPTS_CACHE) as f: |
| 77 | + return json.load(f) |
| 78 | + return {} |
| 79 | + |
| 80 | + |
| 81 | +def save_transcripts_cache(cache: dict): |
| 82 | + """Save transcripts cache.""" |
| 83 | + with open(TRANSCRIPTS_CACHE, 'w') as f: |
| 84 | + json.dump(cache, f, indent=2) |
| 85 | + |
| 86 | + |
| 87 | +def find_audio_files(audio_dir: str) -> list[Path]: |
| 88 | + """Find all audio files in directory.""" |
| 89 | + audio_path = Path(audio_dir) |
| 90 | + audio_files = [] |
| 91 | + for ext in ['*.wav', '*.mp3', '*.m4a', '*.aac', '*.flac', '*.ogg', '*.WAV', '*.MP3']: |
| 92 | + audio_files.extend(audio_path.glob(f"**/{ext}")) |
| 93 | + return sorted(audio_files) |
| 94 | + |
| 95 | + |
| 96 | +def get_audio_duration(file_path: str) -> float: |
| 97 | + """Get duration of audio/video file in seconds.""" |
| 98 | + cmd = [ |
| 99 | + "ffprobe", "-v", "error", |
| 100 | + "-show_entries", "format=duration", |
| 101 | + "-of", "csv=p=0", |
| 102 | + file_path |
| 103 | + ] |
| 104 | + result = subprocess.run(cmd, capture_output=True, text=True) |
| 105 | + try: |
| 106 | + return float(result.stdout.strip()) |
| 107 | + except: |
| 108 | + return 0 |
| 109 | + |
| 110 | + |
| 111 | +def find_time_offset_by_words(video_transcript: str, audio_transcript: str) -> float: |
| 112 | + """ |
| 113 | + Estimate time offset by finding where in the audio the video text appears. |
| 114 | + Returns offset in seconds (positive = audio starts before video). |
| 115 | + """ |
| 116 | + # This is a rough estimate - for precise sync we'd need word timestamps |
| 117 | + video_words = video_transcript.lower().split()[:50] # First 50 words |
| 118 | + audio_words = audio_transcript.lower().split() |
| 119 | + |
| 120 | + if not video_words or not audio_words: |
| 121 | + return 0 |
| 122 | + |
| 123 | + # Find best match position in audio |
| 124 | + best_pos = 0 |
| 125 | + best_score = 0 |
| 126 | + |
| 127 | + for i in range(len(audio_words) - len(video_words) + 1): |
| 128 | + chunk = audio_words[i:i + len(video_words)] |
| 129 | + score = SequenceMatcher(None, video_words, chunk).ratio() |
| 130 | + if score > best_score: |
| 131 | + best_score = score |
| 132 | + best_pos = i |
| 133 | + |
| 134 | + # Estimate time offset (rough: assume ~2 words per second) |
| 135 | + words_per_second = 2.5 |
| 136 | + offset_seconds = best_pos / words_per_second |
| 137 | + |
| 138 | + return offset_seconds |
| 139 | + |
| 140 | + |
| 141 | +def replace_audio(video_file: str, audio_file: str, offset_seconds: float, output_file: str) -> bool: |
| 142 | + """Replace video audio with synced external audio.""" |
| 143 | + |
| 144 | + if offset_seconds >= 0: |
| 145 | + # Trim start of external audio |
| 146 | + audio_filter = f"atrim=start={offset_seconds},asetpts=PTS-STARTPTS" |
| 147 | + else: |
| 148 | + # Delay the external audio |
| 149 | + audio_filter = f"adelay={int(-offset_seconds * 1000)}|{int(-offset_seconds * 1000)}" |
| 150 | + |
| 151 | + cmd = [ |
| 152 | + "ffmpeg", "-y", "-hide_banner", "-loglevel", "error", |
| 153 | + "-i", video_file, |
| 154 | + "-i", audio_file, |
| 155 | + "-c:v", "copy", |
| 156 | + "-af", audio_filter, |
| 157 | + "-map", "0:v:0", |
| 158 | + "-map", "1:a:0", |
| 159 | + "-shortest", |
| 160 | + output_file |
| 161 | + ] |
| 162 | + |
| 163 | + result = subprocess.run(cmd, capture_output=True, text=True) |
| 164 | + return result.returncode == 0 |
| 165 | + |
| 166 | + |
| 167 | +def main(): |
| 168 | + parser = argparse.ArgumentParser(description="Match audio using transcription") |
| 169 | + parser.add_argument("--dry-run", action="store_true", help="Only show matches, don't process") |
| 170 | + parser.add_argument("--model", default="base", help="Whisper model (tiny/base/small/medium/large)") |
| 171 | + parser.add_argument("--audio-dir", default=AUDIO_DIR, help="Audio files directory") |
| 172 | + parser.add_argument("--output-dir", default=OUTPUT_DIR, help="Output directory") |
| 173 | + |
| 174 | + args = parser.parse_args() |
| 175 | + |
| 176 | + # Import whisper here so we fail fast if not installed |
| 177 | + try: |
| 178 | + import whisper |
| 179 | + except ImportError: |
| 180 | + print("Whisper not installed. Run:") |
| 181 | + print(" uv run --with openai-whisper --with torch <script>") |
| 182 | + sys.exit(1) |
| 183 | + |
| 184 | + # Load tags |
| 185 | + if not Path(TAGS_FILE).exists(): |
| 186 | + print(f"Tags file not found: {TAGS_FILE}") |
| 187 | + sys.exit(1) |
| 188 | + |
| 189 | + with open(TAGS_FILE) as f: |
| 190 | + tags = json.load(f) |
| 191 | + |
| 192 | + # Filter to Second Room videos |
| 193 | + second_room_videos = { |
| 194 | + name: data for name, data in tags.items() |
| 195 | + if is_second_room_video(data.get("path", "")) |
| 196 | + } |
| 197 | + |
| 198 | + if not second_room_videos: |
| 199 | + print("No Second Room videos found") |
| 200 | + sys.exit(1) |
| 201 | + |
| 202 | + print(f"Found {len(second_room_videos)} Second Room video(s)") |
| 203 | + |
| 204 | + # Find audio files |
| 205 | + audio_files = find_audio_files(args.audio_dir) |
| 206 | + if not audio_files: |
| 207 | + print(f"No audio files found in {args.audio_dir}") |
| 208 | + sys.exit(1) |
| 209 | + |
| 210 | + print(f"Found {len(audio_files)} audio file(s)") |
| 211 | + |
| 212 | + # Load Whisper model |
| 213 | + print(f"\nLoading Whisper model '{args.model}'...") |
| 214 | + model = whisper.load_model(args.model) |
| 215 | + print("Model loaded!\n") |
| 216 | + |
| 217 | + # Load transcript cache |
| 218 | + cache = load_transcripts_cache() |
| 219 | + |
| 220 | + # Create output directory |
| 221 | + Path(args.output_dir).mkdir(parents=True, exist_ok=True) |
| 222 | + |
| 223 | + # Transcribe all audio files first |
| 224 | + print("=" * 60) |
| 225 | + print("TRANSCRIBING AUDIO FILES") |
| 226 | + print("=" * 60) |
| 227 | + |
| 228 | + audio_transcripts = {} |
| 229 | + with tempfile.TemporaryDirectory() as tmpdir: |
| 230 | + for i, audio_file in enumerate(audio_files, 1): |
| 231 | + cache_key = f"audio:{audio_file}" |
| 232 | + |
| 233 | + if cache_key in cache: |
| 234 | + print(f"[{i}/{len(audio_files)}] {audio_file.name} (cached)") |
| 235 | + audio_transcripts[str(audio_file)] = cache[cache_key] |
| 236 | + continue |
| 237 | + |
| 238 | + print(f"[{i}/{len(audio_files)}] Transcribing {audio_file.name}...") |
| 239 | + |
| 240 | + wav_file = f"{tmpdir}/audio.wav" |
| 241 | + if not extract_audio(str(audio_file), wav_file, TRANSCRIBE_DURATION): |
| 242 | + print(" Failed to extract audio") |
| 243 | + continue |
| 244 | + |
| 245 | + transcript = transcribe_audio(wav_file, model) |
| 246 | + audio_transcripts[str(audio_file)] = transcript |
| 247 | + cache[cache_key] = transcript |
| 248 | + |
| 249 | + # Show preview |
| 250 | + preview = transcript[:100] + "..." if len(transcript) > 100 else transcript |
| 251 | + print(f" \"{preview}\"") |
| 252 | + |
| 253 | + save_transcripts_cache(cache) |
| 254 | + |
| 255 | + # Now match videos |
| 256 | + print("\n" + "=" * 60) |
| 257 | + print("MATCHING VIDEOS TO AUDIO") |
| 258 | + print("=" * 60 + "\n") |
| 259 | + |
| 260 | + results = [] |
| 261 | + |
| 262 | + with tempfile.TemporaryDirectory() as tmpdir: |
| 263 | + for i, (name, data) in enumerate(second_room_videos.items(), 1): |
| 264 | + processed_name = Path(name).stem + ".mp4" |
| 265 | + video_path = os.path.join(PROCESSED_VIDEO_DIR, processed_name) |
| 266 | + |
| 267 | + print(f"[{i}/{len(second_room_videos)}] {name}") |
| 268 | + |
| 269 | + if not Path(video_path).exists(): |
| 270 | + print(f" Video not found: {video_path}") |
| 271 | + results.append((name, None, 0, 0)) |
| 272 | + continue |
| 273 | + |
| 274 | + # Get video transcript (from cache or transcribe) |
| 275 | + cache_key = f"video:{video_path}" |
| 276 | + if cache_key in cache: |
| 277 | + video_transcript = cache[cache_key] |
| 278 | + print(" Using cached transcript") |
| 279 | + else: |
| 280 | + print(" Transcribing video audio...") |
| 281 | + wav_file = f"{tmpdir}/video.wav" |
| 282 | + if not extract_audio(video_path, wav_file, TRANSCRIBE_DURATION): |
| 283 | + print(" Failed to extract audio") |
| 284 | + results.append((name, None, 0, 0)) |
| 285 | + continue |
| 286 | + |
| 287 | + video_transcript = transcribe_audio(wav_file, model) |
| 288 | + cache[cache_key] = video_transcript |
| 289 | + save_transcripts_cache(cache) |
| 290 | + |
| 291 | + # Find best matching audio |
| 292 | + best_match = None |
| 293 | + best_score = 0 |
| 294 | + best_offset = 0 |
| 295 | + |
| 296 | + for audio_file, audio_transcript in audio_transcripts.items(): |
| 297 | + score = text_similarity(video_transcript, audio_transcript) |
| 298 | + |
| 299 | + if score > best_score: |
| 300 | + best_score = score |
| 301 | + best_match = audio_file |
| 302 | + # Estimate offset |
| 303 | + best_offset = find_time_offset_by_words(video_transcript, audio_transcript) |
| 304 | + |
| 305 | + if best_match: |
| 306 | + match_name = Path(best_match).name |
| 307 | + print(f" Match: {match_name}") |
| 308 | + print(f" Score: {best_score:.2%}, Offset: ~{best_offset:.1f}s") |
| 309 | + results.append((name, match_name, best_score, best_offset)) |
| 310 | + |
| 311 | + if not args.dry_run and best_score > 0.3: |
| 312 | + output_file = os.path.join(args.output_dir, processed_name.replace('.mp4', '_synced.mp4')) |
| 313 | + print(f" Syncing audio...") |
| 314 | + if replace_audio(video_path, best_match, best_offset, output_file): |
| 315 | + print(f" Output: {output_file}") |
| 316 | + else: |
| 317 | + print(" FAILED to sync") |
| 318 | + else: |
| 319 | + print(" No match found!") |
| 320 | + results.append((name, None, 0, 0)) |
| 321 | + |
| 322 | + print() |
| 323 | + |
| 324 | + # Summary |
| 325 | + print("=" * 60) |
| 326 | + print("SUMMARY") |
| 327 | + print("=" * 60) |
| 328 | + print(f"{'Video':<30} {'Audio Match':<25} {'Score':<8}") |
| 329 | + print("-" * 60) |
| 330 | + for name, match, score, offset in results: |
| 331 | + name_short = name[:28] if len(name) > 28 else name |
| 332 | + match_short = (match[:22] + "...") if match and len(match) > 25 else (match or "NO MATCH") |
| 333 | + print(f"{name_short:<30} {match_short:<25} {score:.0%}") |
| 334 | + |
| 335 | + |
| 336 | +if __name__ == "__main__": |
| 337 | + main() |
0 commit comments