-
-
Notifications
You must be signed in to change notification settings - Fork 1
feat(wistia): add subtitle download support #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
a4dca4a
f2cd39d
80129f8
7f4a58a
60f8273
e02aca2
72573ac
bd00862
c705cc3
08b0129
47192c4
98112af
462b3b6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,19 +1,233 @@ | ||
| import json | ||
| import os | ||
| import re | ||
| import requests | ||
| import zlib | ||
| from typing import Optional, List | ||
| from pathlib import Path | ||
| import os | ||
| from typing import Any, Dict, List, Optional | ||
| from urllib.parse import urlparse | ||
|
|
||
| import requests | ||
|
|
||
| from .file_utils import filter_filename | ||
| from .download_manager import DownloadManager | ||
| # Local imports inside functions to avoid circular dependency during module import | ||
|
|
||
| # Handles video proxy and wistia direct downloads | ||
|
|
||
| WISTIA_JSON_URL = "https://fast.wistia.com/embed/medias/{id}.json" | ||
|
|
||
| VIDEO_PROXY_JSONP_ID_PATTERN = re.compile(r"medias/(\w+)\.jsonp") | ||
| DEFAULT_SUBTITLE_EXTENSION = "vtt" | ||
| _LANGUAGE_SANITIZE_PATTERN = re.compile(r'[^A-Za-z0-9\-]+') | ||
|
|
||
|
|
||
| def _normalize_wistia_track_url(url: Optional[str]) -> Optional[str]: | ||
| """Normalize Wistia caption track URLs to absolute HTTPS URLs.""" | ||
| if not url or not isinstance(url, str): | ||
| return None | ||
|
|
||
| normalized = url.strip() | ||
| if not normalized: | ||
| return None | ||
|
|
||
| if normalized.startswith('//'): | ||
| normalized = f"https:{normalized}" | ||
| elif normalized.startswith('/'): | ||
| normalized = f"https://fast.wistia.com{normalized}" | ||
| elif not re.match(r'^https?://', normalized, re.IGNORECASE): | ||
| normalized = f"https://fast.wistia.com/{normalized.lstrip('/')}" | ||
|
|
||
| return normalized | ||
|
|
||
|
|
||
| def _build_caption_url(hashed_id: Optional[str], language: Optional[str], extension: Optional[str] = None) -> Optional[str]: | ||
| """Construct a Wistia caption URL when only hashedId and language are available.""" | ||
| if not hashed_id or not language: | ||
| return None | ||
|
|
||
| ext = (extension or DEFAULT_SUBTITLE_EXTENSION).lstrip('.') or DEFAULT_SUBTITLE_EXTENSION | ||
| return f"https://fast.wistia.com/embed/captions/{hashed_id}.{ext}?language={language}" | ||
|
|
||
|
|
||
| def _infer_track_extension(url: str, fallback: str = DEFAULT_SUBTITLE_EXTENSION) -> str: | ||
| """Infer file extension from track URL.""" | ||
| try: | ||
| parsed = urlparse(url) | ||
| suffix = Path(parsed.path).suffix | ||
| if suffix: | ||
| return suffix.lstrip('.').lower() or fallback | ||
| except Exception: | ||
| pass | ||
kovyrin marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return fallback | ||
|
|
||
|
|
||
| def extract_wistia_subtitle_tracks(media: Dict[str, Any]) -> List[Dict[str, Optional[str]]]: | ||
| """Extract subtitle/caption track metadata from Wistia media JSON.""" | ||
| if not isinstance(media, dict): | ||
| return [] | ||
|
|
||
| hashed_id = media.get('hashedId') or media.get('hashed_id') | ||
| tracks: List[Dict[str, Optional[str]]] = [] | ||
|
|
||
| def add_track(url: Optional[str], language: Optional[str], label: Optional[str], ext: Optional[str]): | ||
| normalized = _normalize_wistia_track_url(url) | ||
| if not normalized and hashed_id and language: | ||
| normalized = _build_caption_url(hashed_id, language, ext) | ||
| if not normalized: | ||
| return | ||
| tracks.append({ | ||
| 'url': normalized, | ||
| 'language': language, | ||
| 'label': label, | ||
| 'ext': (ext or '').lstrip('.') or None | ||
| }) | ||
|
|
||
| for track in media.get('captions') or []: | ||
| if isinstance(track, dict): | ||
| add_track( | ||
| track.get('url') or track.get('src'), | ||
| track.get('language') or track.get('lang'), | ||
| track.get('languageName') or track.get('label') or track.get('name'), | ||
| track.get('ext') | ||
| ) | ||
|
|
||
| for track in media.get('text_tracks') or []: | ||
| if not isinstance(track, dict): | ||
| continue | ||
| sources = track.get('sources') or [] | ||
| if sources: | ||
| for source in sources: | ||
| if isinstance(source, dict): | ||
| add_track( | ||
| source.get('url') or source.get('src'), | ||
| track.get('language') or track.get('lang'), | ||
| track.get('name') or track.get('label'), | ||
| source.get('ext') or track.get('ext') | ||
| ) | ||
| else: | ||
| add_track( | ||
| track.get('url') or track.get('src'), | ||
| track.get('language') or track.get('lang'), | ||
| track.get('name') or track.get('label'), | ||
| track.get('ext') | ||
| ) | ||
|
|
||
| for track in media.get('textTracks') or []: | ||
| if not isinstance(track, dict): | ||
| continue | ||
| sources = track.get('sources') or [] | ||
| if sources: | ||
| for source in sources: | ||
| if isinstance(source, dict): | ||
| add_track( | ||
| source.get('url') or source.get('src'), | ||
| track.get('language') or track.get('lang'), | ||
| track.get('name') or track.get('label') or track.get('title'), | ||
| source.get('ext') or track.get('ext') | ||
| ) | ||
| else: | ||
| add_track( | ||
| track.get('url') or track.get('src'), | ||
| track.get('language') or track.get('lang'), | ||
| track.get('name') or track.get('label') or track.get('title'), | ||
| track.get('ext') | ||
| ) | ||
|
|
||
| for asset in media.get('assets') or []: | ||
| if isinstance(asset, dict): | ||
| asset_type = (asset.get('type') or '').lower() | ||
| asset_kind = (asset.get('kind') or '').lower() | ||
| if asset_type in ('caption', 'captions', 'subtitle', 'subtitles') or asset_kind in ('caption', 'captions', 'subtitle', 'subtitles'): | ||
kovyrin marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| add_track( | ||
| asset.get('url') or asset.get('src'), | ||
| asset.get('language') or asset.get('lang'), | ||
| asset.get('display_name') or asset.get('name'), | ||
| asset.get('ext') | ||
| ) | ||
|
|
||
| available_transcripts = media.get('availableTranscripts') or [] | ||
| if hashed_id and available_transcripts: | ||
| for transcript in available_transcripts: | ||
| if not isinstance(transcript, dict) or not transcript.get('hasCaptions'): | ||
| continue | ||
| language = transcript.get('language') or transcript.get('wistiaLanguageCode') or transcript.get('bcp47LanguageTag') | ||
| if not language: | ||
| continue | ||
| add_track( | ||
| _build_caption_url(hashed_id, language, DEFAULT_SUBTITLE_EXTENSION), | ||
| language, | ||
| transcript.get('name') or transcript.get('familyName') or language, | ||
| DEFAULT_SUBTITLE_EXTENSION | ||
| ) | ||
|
|
||
| unique_tracks: Dict[str, Dict[str, Optional[str]]] = {} | ||
| for track in tracks: | ||
| url = track['url'] | ||
| if not url: | ||
| continue | ||
| if url not in unique_tracks: | ||
| unique_tracks[url] = track | ||
| else: | ||
| existing = unique_tracks[url] | ||
| # Prefer track data that includes language/label/ext | ||
| if not existing.get('language') and track.get('language'): | ||
| existing['language'] = track['language'] | ||
| if not existing.get('label') and track.get('label'): | ||
| existing['label'] = track['label'] | ||
| if not existing.get('ext') and track.get('ext'): | ||
| existing['ext'] = track['ext'] | ||
|
|
||
| return list(unique_tracks.values()) | ||
kovyrin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| def queue_wistia_subtitle_downloads(media: Dict[str, Any], dest_dir: Path, video_base_name: str): | ||
| """Queue subtitle download tasks for a Wistia media object.""" | ||
| from .downloader import SETTINGS, add_download_task, init_settings | ||
|
|
||
| if not isinstance(dest_dir, Path): | ||
| dest_dir = Path(dest_dir) | ||
|
|
||
| init_settings() | ||
| settings = SETTINGS | ||
| if settings and hasattr(settings, 'subtitle_download_enabled') and not settings.subtitle_download_enabled: | ||
| return | ||
|
|
||
| tracks = extract_wistia_subtitle_tracks(media) | ||
| if not tracks: | ||
| return | ||
|
|
||
| base_name = Path(video_base_name).stem | ||
| if not base_name: | ||
| fallback_name = media.get('name') or media.get('hashedId') or 'captions' | ||
| base_name = filter_filename(str(fallback_name)) | ||
| else: | ||
| base_name = filter_filename(base_name) | ||
|
|
||
| if not base_name: | ||
| base_name = "captions" | ||
|
|
||
| counter = 1 | ||
| for track in tracks: | ||
| url = track.get('url') | ||
| if not url: | ||
| continue | ||
|
|
||
| ext = (track.get('ext') or _infer_track_extension(url)).lstrip('.').lower() or DEFAULT_SUBTITLE_EXTENSION | ||
| language_part = track.get('language') or track.get('label') or '' | ||
| if isinstance(language_part, (list, dict)): | ||
| language_part = '' | ||
| language_part = str(language_part or '') | ||
|
||
| language_part = _LANGUAGE_SANITIZE_PATTERN.sub('-', language_part).strip('-') | ||
|
|
||
| if not language_part: | ||
| language_part = 'captions' if counter == 1 else f"captions-{counter}" | ||
|
|
||
| subtitle_filename = filter_filename(f"{base_name}.{language_part}.{ext}") | ||
| if not subtitle_filename: | ||
| subtitle_filename = filter_filename(f"{base_name}.captions-{counter}.{ext}") | ||
|
|
||
| print(f" [Subs] Queued subtitles: {subtitle_filename}") | ||
| add_download_task(url, dest_dir / subtitle_filename, "subtitle") | ||
| counter += 1 | ||
|
||
|
|
||
|
|
||
| def video_downloader_videoproxy(video_url: str, file_name: str, quality: str = "720p"): | ||
|
|
@@ -143,6 +357,7 @@ def infer_ext(asset: dict) -> str: | |
| return '.mp4' | ||
|
|
||
| resolved_base = filter_filename(file_name if file_name else media.get('name') or wistia_id) | ||
| current_dir = Path.cwd() | ||
|
|
||
| if all_formats_flag: | ||
| print(f"Downloading all available Wistia assets for {resolved_base}") | ||
|
|
@@ -172,6 +387,7 @@ def infer_ext(asset: dict) -> str: | |
| DOWNLOAD_MANAGER.download_file(a_url, Path(filter_filename(out_name))) | ||
| else: | ||
| print("Download manager not initialized") | ||
| queue_wistia_subtitle_downloads(media, current_dir, resolved_base) | ||
| return | ||
|
|
||
| # Single quality path | ||
|
|
@@ -200,6 +416,6 @@ def infer_ext(asset: dict) -> str: | |
|
|
||
| # Queue video for parallel download with absolute path to current directory | ||
| from .downloader import add_download_task | ||
| current_dir = Path.cwd() # Capture current working directory | ||
| full_path = current_dir / resolved_name # Create absolute path | ||
| add_download_task(video_url, full_path, "video") | ||
| queue_wistia_subtitle_downloads(media, current_dir, resolved_name) | ||
Uh oh!
There was an error while loading. Please reload this page.