diff --git a/oci-subtitle-translation/README.md b/oci-subtitle-translation/README.md index dbaf267..4ff8909 100644 --- a/oci-subtitle-translation/README.md +++ b/oci-subtitle-translation/README.md @@ -1,150 +1,281 @@ # OCI Subtitle Translation -## Introduction +Automatically transcribe audio files and translate subtitles into multiple languages using Oracle Cloud Infrastructure (OCI) AI services. -In today's global digital landscape, making audio and video content accessible across different languages is crucial. This solution leverages OCI's AI services to automatically generate and translate subtitles for audio content into multiple languages. +## Overview -The solution combines two powerful OCI services: -- **OCI Speech** to transcribe audio into text and generate SRT subtitle files -- **OCI Language** to translate the generated subtitles into multiple target languages +This solution combines two powerful OCI AI services to create multilingual subtitles: +- **OCI Speech**: Transcribes audio files to SRT subtitle format +- **OCI Language**: Translates subtitles into 30+ target languages -This automated approach significantly reduces the time and effort required to create multilingual subtitles, making content more accessible to a global audience. +Perfect for making video content accessible to global audiences with minimal manual effort. -## 0. Prerequisites and setup +## Features -### Prerequisites - -- Python 3.8 or higher -- OCI Account with Speech and Language services enabled -- Required IAM Policies and Permissions -- Object Storage bucket for input/output files -- OCI CLI configured with proper credentials - -### Setup +- 🎧 **Flexible Input**: Local audio files or files in OCI Object Storage +- πŸ“„ **Multiple Formats**: Generates industry-standard SRT subtitle files +- 🌍 **30+ Languages**: Translate to major world languages +- ⚑ **Batch Processing**: Efficient translation for multiple languages +- πŸ”§ **Configurable**: Customize storage, languages, and processing methods +- πŸ“¦ **Complete Workflow**: Single command for transcription + translation -1. Create an OCI account if you don't have one -2. Enable OCI Speech and Language services in your tenancy -3. Set up OCI CLI and create API keys: - ```bash - # Install OCI CLI - bash -c "$(curl -L https://raw.githubusercontent.com/oracle/oci-cli/master/scripts/install/install.sh)" - - # Configure OCI CLI (this will create ~/.oci/config) - oci setup config - ``` -4. Set up the appropriate IAM policies to use both OCI Speech and Language services -5. Create a bucket in OCI Object Storage for your audio files and generated subtitles -6. Take note of your Object Storage namespace (visible in the OCI Console under Object Storage) +## Quick Start -### Docs +### Prerequisites -- [OCI Speech Service Documentation](https://docs.oracle.com/en-us/iaas/api/#/en/speech/20220101) -- [OCI Language Translation Documentation](https://docs.oracle.com/en-us/iaas/language) -- [OCI SDK Documentation](https://docs.oracle.com/en-us/iaas/Content/API/Concepts/sdkconfig.htm) +- Python 3.8+ +- OCI account with Speech and Language services enabled +- OCI CLI configured (`oci setup config`) +- Object Storage bucket for audio/subtitle files -## 1. Getting Started +### Installation -1. Clone this repository: +1. **Clone and install dependencies:** ```bash git clone https://github.com/oracle-devrel/devrel-labs.git cd oci-subtitle-translation + pip install -r requirements.txt ``` -2. Install required dependencies: +2. **Configure your settings:** ```bash - pip install -r requirements.txt + cp config_example.yaml config.yaml + # Edit config.yaml with your OCI details (see Configuration section) ``` -3. Update `config.yaml` with your settings: - ```yaml - # Speech Service Configuration - speech: - compartment_id: "ocid1.compartment.oc1..your-compartment-id" - bucket_name: "your-bucket-name" - namespace: "your-namespace" - - # Language Translation Configuration - language: - compartment_id: "ocid1.compartment.oc1..your-compartment-id" +3. **Run the workflow:** + ```bash + # Transcribe local file and translate to Spanish + python workflow.py --audio-source audio.mp3 --target-language es + + # Transcribe Object Storage file and translate to multiple languages + python workflow.py --audio-source "audio/recording.mp3" --target-languages es fr de ``` -## 2. Usage +## Audio Input Methods -> Before running the script, make sure your input `.mp3` file has already been uploaded to the OCI Object Storage **input bucket** defined in your `config.yaml`. -> The script does **not** accept local files it looks for the file in the cloud bucket only. +### Method 1: Local Audio Files -This solution works in two steps: +For audio files on your local machine: -1. First, we generate SRT from audio: +```bash +# Single language translation +python workflow.py --audio-source /path/to/audio.mp3 --target-language es - ```bash - python generate_srt_from_audio.py --input-file your_audio.mp3 - ``` +# Multiple languages +python workflow.py --audio-source audio.wav --target-languages es fr de pt -2. Then, we translate the generated SRT file to multiple languages: +# Transcription only (no translation) +python workflow.py --transcribe-only --audio-source audio.mp3 +``` - ```bash - python translate_srt.py --input-file input.srt - ``` +**How it works:** +- Script uploads your local file to Object Storage +- Transcribes using OCI Speech +- Downloads and translates the generated SRT files + +### Method 2: Object Storage Audio Files + +For audio files already in your configured Object Storage bucket: + +```bash +# File in bucket root +python workflow.py --audio-source "myfile.mp3" --target-language es + +# File in subfolder +python workflow.py --audio-source "audio/recordings/interview.mp3" --target-languages es fr + +# Complex path (from OCI Speech job output) +python workflow.py --audio-source "transcriptions/audio.mp3/job-abc123/audio.mp3" --target-language es +``` + +**Important:** +- Use the **object path within your bucket** (don't include bucket name) +- Bucket name and namespace come from your `config.yaml` +- If path doesn't exist locally, it's treated as an Object Storage path + +## Configuration + +Edit `config.yaml` with your OCI details: + +```yaml +# OCI Profile (from ~/.oci/config) +profile: "DEFAULT" + +# Speech Service Settings +speech: + compartment_id: "ocid1.compartment.oc1..your-compartment-id" + bucket_name: "your-bucket-name" + namespace: "your-namespace" + language_code: "en-US" # Default transcription language + +# Output Settings +output: + storage_type: "both" # "local", "object_storage", or "both" + local_directory: "./output" + +# Translation Settings +translation: + target_languages: ["es", "fr", "de"] # Default languages + method: "batch" # "batch" or "sync" +``` + +### Finding Your OCI Details + +- **Compartment ID**: OCI Console β†’ Identity β†’ Compartments +- **Namespace**: OCI Console β†’ Object Storage β†’ Bucket Details +- **Bucket Name**: The bucket you created for audio/subtitle files +- **Profile**: Your OCI CLI profile name (usually "DEFAULT") + +## Usage Examples + +### Complete Workflow + +```bash +# Local file β†’ transcribe β†’ translate to Spanish and French +python workflow.py --audio-source interview.mp3 --target-languages es fr + +# Object Storage file β†’ transcribe β†’ translate to German +python workflow.py --audio-source "recordings/meeting.wav" --target-language de + +# Custom output location +python workflow.py --audio-source audio.mp3 --target-language es --output-type local +``` + +### Individual Operations + +**Transcription only:** +```bash +# Local file +python generate_srt_from_audio.py --input-file audio.mp3 + +# Object Storage file +python generate_srt_from_audio.py --input-file "audio/recording.mp3" + +# Specify language and output +python generate_srt_from_audio.py --input-file audio.mp3 --language es-ES --output-type local +``` + +**Translation only:** +```bash +# Translate existing SRT file +python translate_srt.py --input-file subtitles.srt --target-languages es fr de + +# Translate SRT file in Object Storage +python translate_srt.py --input-file "srt/subtitles.srt" --target-language es --method sync +``` -## Annex: Supported Languages - -The solution supports translation to the following languages: - -| Language | Language Code | -|----------|------| -| Arabic | ar | -| Croatian | hr | -| Czech | cs | -| Danish | da | -| Dutch | nl | -| English | en | -| Finnish | fi | -| French | fr | -| French Canadian | fr-CA | -| German | de | -| Greek | el | -| Hebrew | he | -| Hungarian | hu | -| Italian | it | -| Japanese | ja | -| Korean | ko | -| Norwegian | no | -| Polish | pl | -| Portuguese | pt | -| Portuguese Brazilian | pt-BR | -| Romanian | ro | -| Russian | ru | -| Simplified Chinese | zh-CN | -| Slovak | sk | -| Slovenian | sl | -| Spanish | es | -| Swedish | sv | -| Thai | th | -| Traditional Chinese | zh-TW | -| Turkish | tr | -| Vietnamese | vi | - -For an updated list of supported languages, refer to [the OCI Documentation](https://docs.oracle.com/en-us/iaas/language/using/translate.htm#supported-langs). - -## Supported Language Codes - -For the Speech-to-Text transcription service with GENERIC domain, the following language codes are supported: - -| Language | Code | -|----------|------| -| US English | en-US | -| British English | en-GB | -| Australian English | en-AU | -| Indian English | en-IN | -| Spanish (Spain) | es-ES | -| Brazilian Portuguese | pt-BR | -| Hindi (India) | hi-IN | -| French (France) | fr-FR | -| German (Germany) | de-DE | -| Italian (Italy) | it-IT | - -Note: When using the service, make sure to use the exact language code format as shown above. Simple codes like 'en' or 'es' will not work. +## Supported Languages + +### Audio Transcription +| Language | Code | | Language | Code | +|----------|------|---|----------|------| +| English (US) | en-US | | Portuguese (Brazil) | pt-BR | +| English (UK) | en-GB | | Hindi (India) | hi-IN | +| English (Australia) | en-AU | | French (France) | fr-FR | +| English (India) | en-IN | | German (Germany) | de-DE | +| Spanish (Spain) | es-ES | | Italian (Italy) | it-IT | + +### Translation (30+ languages) +| Language | Code | | Language | Code | | Language | Code | +|----------|------|---|----------|------|---|----------|------| +| Spanish | es | | French | fr | | German | de | +| Portuguese | pt | | Italian | it | | Dutch | nl | +| Russian | ru | | Japanese | ja | | Korean | ko | +| Chinese (Simplified) | zh-CN | | Chinese (Traditional) | zh-TW | | Arabic | ar | +| Hebrew | he | | Hindi | hi | | Thai | th | + +[View complete list](https://docs.oracle.com/en-us/iaas/language/using/translate.htm#supported-langs) + +## Command Reference + +### workflow.py + +| Option | Description | Example | +|--------|-------------|---------| +| `--audio-source` | Audio file (local or Object Storage path) | `--audio-source "audio/file.mp3"` | +| `--target-language` | Single target language | `--target-language es` | +| `--target-languages` | Multiple target languages | `--target-languages es fr de` | +| `--transcribe-only` | Only transcribe (no translation) | `--transcribe-only` | +| `--translate-only` | Only translate existing SRT | `--translate-only --srt-file file.srt` | +| `--speech-language` | Override transcription language | `--speech-language es-ES` | +| `--output-type` | Where to store output | `--output-type local` | +| `--config` | Custom config file | `--config my-config.yaml` | + +### Output Storage Options + +| Value | Description | Files Saved To | +|-------|-------------|----------------| +| `local` | Local filesystem only | `./output/` directory | +| `object_storage` | Object Storage only | Your configured bucket | +| `both` | Both locations (default) | Local directory + Object Storage | + +## Translation Methods + +### Batch Translation (Recommended) +- **Best for**: Multiple languages, larger files +- **Limit**: 20MB per file +- **Speed**: Faster for multiple languages +- **Usage**: `--method batch` (default) + +### Synchronous Translation +- **Best for**: Single language, smaller files +- **Limit**: No file size limit +- **Speed**: Slower for multiple languages +- **Usage**: `--method sync` + +## Troubleshooting + +### Common Issues + +**"BucketNotFound" Error:** +- Verify bucket name and namespace in `config.yaml` +- Ensure bucket exists in the correct region +- Check IAM permissions for Object Storage + +**"ObjectNotFound" Error:** +- Verify the object path in your bucket +- Check if file was uploaded successfully +- Ensure correct spelling and case + +**Authentication Issues:** +```bash +# Test OCI CLI configuration +oci iam user get --user-id $(oci iam user list --query 'data[0].id' --raw-output) + +# Reconfigure if needed +oci setup config +``` + +**Large File Handling:** +- Audio files: No limit for OCI Speech +- SRT files: 20MB limit for batch translation +- Large files automatically use sync translation + +### Debug Mode + +Add verbose logging: +```bash +# Set environment variable for detailed logs +export OCI_CLI_PROFILE=your-profile +python workflow.py --audio-source audio.mp3 --target-language es +``` + +## Architecture + +``` +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Audio File β”‚ ──▢│ OCI Speech β”‚ ──▢│ SRT File β”‚ +β”‚ (Local/Storage) β”‚ β”‚ Transcription β”‚ β”‚ Generated β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Translated SRT β”‚ ◀──│ OCI Language β”‚ ◀──│ SRT File β”‚ +β”‚ Files (es, β”‚ β”‚ Translation β”‚ β”‚ Original β”‚ +β”‚ fr, de, etc.) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` ## Contributing @@ -152,7 +283,7 @@ This project is open source. Please submit your contributions by forking this re ## License -Copyright (c) 2024 Oracle and/or its affiliates. +Copyright (c) 2025 Oracle and/or its affiliates. Licensed under the Universal Permissive License (UPL), Version 1.0. diff --git a/oci-subtitle-translation/config_example.yaml b/oci-subtitle-translation/config_example.yaml index 5fa0178..ddbd2e6 100644 --- a/oci-subtitle-translation/config_example.yaml +++ b/oci-subtitle-translation/config_example.yaml @@ -1,11 +1,32 @@ -# Speech Service Configuration -profile: "your-profile" +# OCI Subtitle Translation Configuration +# Copy this file to config.yaml and update with your specific settings + +# OCI Profile Configuration +profile: "DEFAULT" # OCI CLI profile name from ~/.oci/config +# Speech Service Configuration speech: compartment_id: "ocid1.compartment.oc1..your-compartment-id" bucket_name: "your-bucket-name" namespace: "your-namespace" + language_code: "en-US" -# Language Translation Configuration +# Language Translation Configuration language: - compartment_id: "ocid1.compartment.oc1..your-compartment-id" \ No newline at end of file + compartment_id: "ocid1.compartment.oc1..your-compartment-id" + # Optional: Leave empty to use same bucket/namespace as speech service + bucket_name: "" + namespace: "" + +# Output Configuration +output: + storage_type: "both" # "local", "object_storage", or "both" + local_directory: "./output" + +# Translation Settings +translation: + target_languages: + - "es" # Spanish + - "fr" # French + - "de" # German + method: "batch" # "batch" or "sync" diff --git a/oci-subtitle-translation/generate_srt_from_audio.py b/oci-subtitle-translation/generate_srt_from_audio.py index 7b7459f..5271e38 100644 --- a/oci-subtitle-translation/generate_srt_from_audio.py +++ b/oci-subtitle-translation/generate_srt_from_audio.py @@ -1,4 +1,10 @@ -# https://docs.oracle.com/en-us/iaas/api/#/en/speech/20220101/TranscriptionJob/CreateTranscriptionJob +#!/usr/bin/env python3 +""" +OCI Speech Audio Transcription + +Transcribe audio files to SRT subtitles using OCI Speech service. +Supports both local files and Object Storage inputs. +""" import oci import yaml @@ -6,8 +12,9 @@ import sys import time import os -import json from datetime import datetime +from pathlib import Path + def log_step(message, is_error=False): """Print a formatted log message with timestamp""" @@ -15,129 +22,318 @@ def log_step(message, is_error=False): prefix = "ERROR" if is_error else "INFO" print(f"[{timestamp}] {prefix}: {message}") -def wait_for_job_completion(ai_speech_client, job_id, check_interval=15): - """Wait for the transcription job to complete and return the output file name""" + +def load_config(config_file='config.yaml'): + """Load configuration from YAML file""" + try: + with open(config_file, 'r') as f: + config = yaml.safe_load(f) + log_step(f"Successfully loaded configuration from {config_file}") + return config + except FileNotFoundError: + log_step(f"Configuration file {config_file} not found", True) + log_step("Please copy config_example.yaml to config.yaml and update with your settings", True) + sys.exit(1) + except Exception as e: + log_step(f"Failed to load configuration: {str(e)}", True) + sys.exit(1) + + +def upload_audio_file(object_storage_client, config, local_file_path): + """Upload local audio file to Object Storage""" + if not os.path.exists(local_file_path): + raise FileNotFoundError(f"Audio file not found: {local_file_path}") + + file_name = os.path.basename(local_file_path) + namespace = config['speech']['namespace'] + bucket_name = config['speech']['bucket_name'] + object_name = f"audio/{file_name}" + + log_step(f"Uploading {local_file_path} to Object Storage...") + + try: + with open(local_file_path, 'rb') as f: + object_storage_client.put_object( + namespace_name=namespace, + bucket_name=bucket_name, + object_name=object_name, + put_object_body=f + ) + + log_step(f"Successfully uploaded to: {object_name}") + return object_name + + except Exception as e: + log_step(f"Failed to upload file: {str(e)}", True) + raise + + +def wait_for_transcription_job(ai_speech_client, job_id, check_interval=30): + """Wait for transcription job to complete and return job information""" + log_step(f"Waiting for transcription job {job_id} to complete...") + while True: try: - job_response = ai_speech_client.get_transcription_job(job_id) + job_response = ai_speech_client.get_transcription_job(transcription_job_id=job_id) status = job_response.data.lifecycle_state if status == "SUCCEEDED": log_step("Transcription job completed successfully") - # Get the output file name from the job details - input_file = job_response.data.input_location.object_locations[0].object_names[0] - input_file_name = input_file.split("/")[-1] # Get the filename after last slash - output_prefix = job_response.data.output_location.prefix - # Extract just the job ID part (before the first slash) - job_id_part = job_id.split("/")[0] - output_file = f"{output_prefix}/{job_id_part}/{input_file_name}.srt" - return output_file + + if hasattr(job_response.data, 'output_location') and hasattr(job_response.data.output_location, 'prefix'): + output_prefix = job_response.data.output_location.prefix + input_file = job_response.data.input_location.object_locations[0].object_names[0] + input_file_name = input_file.split("/")[-1] + + return { + 'job_id': job_id, + 'output_prefix': output_prefix, + 'input_file_name': input_file_name, + 'namespace': job_response.data.output_location.namespace_name, + 'bucket': job_response.data.output_location.bucket_name if hasattr(job_response.data.output_location, 'bucket_name') else None + } + else: + log_step("Could not get output location from job response", True) + raise Exception("Could not get output location from job response") elif status == "FAILED": log_step("Transcription job failed", True) - sys.exit(1) + raise Exception("Transcription job failed") elif status in ["CANCELED", "DELETED"]: log_step(f"Transcription job was {status.lower()}", True) - sys.exit(1) + raise Exception(f"Transcription job was {status.lower()}") else: log_step(f"Job status: {status}. Waiting {check_interval} seconds...") time.sleep(check_interval) except Exception as e: + if "Transcription job" in str(e) or "Could not get output location" in str(e): + raise log_step(f"Error checking job status: {str(e)}", True) - sys.exit(1) + raise + -# Parse command line arguments -parser = argparse.ArgumentParser(description='Generate SRT file from audio using OCI Speech service') -parser.add_argument('--input-file', required=True, help='Input audio file name in the configured bucket') -args = parser.parse_args() +def find_srt_file_in_bucket(object_storage_client, namespace, bucket_name, output_prefix, job_id, input_file_name): + """Find the actual SRT file in the bucket by listing objects""" + try: + log_step(f"Searching for SRT file in bucket with prefix: {output_prefix}") + + # List objects with the output prefix + list_response = object_storage_client.list_objects( + namespace_name=namespace, + bucket_name=bucket_name, + prefix=output_prefix + ) + + # Look for SRT files + srt_files = [] + for obj in list_response.data.objects: + if obj.name.endswith('.srt') and input_file_name.replace('.mp3', '') in obj.name: + srt_files.append(obj.name) + log_step(f"Found SRT file: {obj.name}") + + if srt_files: + # Return the first matching SRT file (there should only be one) + return srt_files[0] + else: + log_step(f"No SRT files found with prefix {output_prefix}", True) + return None + + except Exception as e: + log_step(f"Error searching for SRT file: {str(e)}", True) + return None -log_step(f"Starting transcription process for file: {args.input_file}") -# Load config from yaml file -def load_config(): - """Load configuration from config.yaml""" +def download_srt_file(object_storage_client, config, job_info, local_path=None): + """Download SRT file from Object Storage to local filesystem""" + + # First, find the actual SRT file in the bucket + srt_object_name = find_srt_file_in_bucket( + object_storage_client, + job_info['namespace'] or config['speech']['namespace'], + job_info['bucket'] or config['speech']['bucket_name'], + job_info['output_prefix'], + job_info['job_id'], + job_info['input_file_name'] + ) + + if not srt_object_name: + raise Exception(f"Could not find SRT file in bucket for job {job_info['job_id']}") + + if local_path is None: + filename = srt_object_name.split("/")[-1] + output_dir = config.get('output', {}).get('local_directory', './output') + # Use a simpler filename for local storage + simple_filename = f"{os.path.splitext(job_info['input_file_name'])[0]}.srt" + local_path = os.path.join(output_dir, simple_filename) + + # Create output directory if it doesn't exist + os.makedirs(os.path.dirname(local_path), exist_ok=True) + try: - with open('config.yaml', 'r') as f: - config = yaml.safe_load(f) - log_step("Successfully loaded config.yaml") - log_step(f"Using bucket: {config['speech']['bucket_name']}") - log_step(f"Using namespace: {config['speech']['namespace']}") - return config + log_step(f"Downloading SRT file to: {local_path}") + + get_response = object_storage_client.get_object( + namespace_name=job_info['namespace'] or config['speech']['namespace'], + bucket_name=job_info['bucket'] or config['speech']['bucket_name'], + object_name=srt_object_name + ) + + with open(local_path, 'wb') as f: + for chunk in get_response.data.raw.stream(1024 * 1024, decode_content=False): + f.write(chunk) + + log_step(f"Successfully downloaded SRT file: {local_path}") + return local_path + except Exception as e: - log_step(f"Failed to load config.yaml: {str(e)}", True) - sys.exit(1) + log_step(f"Failed to download SRT file: {str(e)}", True) + raise + +def main(): + # Parse command line arguments + parser = argparse.ArgumentParser( + description='Generate SRT file from audio using OCI Speech service', + epilog=""" +Examples: + # Transcribe local audio file + python generate_srt_from_audio.py --input-file /path/to/audio.mp3 + + # Transcribe audio file already in Object Storage + python generate_srt_from_audio.py --input-file "audio/myfile.mp3" + + # Specify language and output options + python generate_srt_from_audio.py --input-file audio.mp3 --language es-ES --output-type local + """ + ) + + parser.add_argument('--input-file', required=True, + help='Audio file path (local file or Object Storage object name)') + parser.add_argument('--language', type=str, default=None, + help='Language code for transcription (default: from config)') + parser.add_argument('--output-type', choices=['local', 'object_storage', 'both'], default=None, + help='Where to store output (default: from config)') + parser.add_argument('--config', type=str, default='config.yaml', + help='Configuration file path (default: config.yaml)') + + args = parser.parse_args() + + log_step(f"Starting transcription process for: {args.input_file}") -config_yaml = load_config() - -# Load config based on the profile specificied in the YAML file -try: - config = oci.config.from_file(profile_name=config_yaml.get("profile", "DEFAULT")) - log_step("Successfully loaded OCI configuration") -except Exception as e: - log_step(f"Failed to load OCI configuration: {str(e)}", True) - sys.exit(1) - -# Initialize service client with default config file -try: - ai_speech_client = oci.ai_speech.AIServiceSpeechClient(config) - log_step("Successfully initialized AI Speech client") -except Exception as e: - log_step(f"Failed to initialize AI Speech client: {str(e)}", True) - sys.exit(1) - -# Send the request to service -log_step("Creating transcription job with following settings:") -log_step(f" β€’ Input file: {args.input_file}") -log_step(f" β€’ Output format: SRT") -log_step(f" β€’ Language: en-US") -log_step(f" β€’ Diarization: Enabled (2 speakers)") -log_step(f" β€’ Profanity filter: Enabled (TAG mode)") - -file_name = args.input_file.split("/")[-1] - -try: - create_transcription_job_response = ai_speech_client.create_transcription_job( - create_transcription_job_details=oci.ai_speech.models.CreateTranscriptionJobDetails( - compartment_id=config_yaml['speech']['compartment_id'], - input_location=oci.ai_speech.models.ObjectListInlineInputLocation( - location_type="OBJECT_LIST_INLINE_INPUT_LOCATION", - object_locations=[oci.ai_speech.models.ObjectLocation( - namespace_name=config_yaml['speech']['namespace'], - bucket_name=config_yaml['speech']['bucket_name'], - object_names=[args.input_file])]), - output_location=oci.ai_speech.models.OutputLocation( - namespace_name=config_yaml['speech']['namespace'], - bucket_name=config_yaml['speech']['bucket_name'], - prefix=f"transcriptions/{file_name}"), - additional_transcription_formats=["SRT"], - model_details=oci.ai_speech.models.TranscriptionModelDetails( - domain="GENERIC", - language_code="en-US", - transcription_settings=oci.ai_speech.models.TranscriptionSettings( - diarization=oci.ai_speech.models.Diarization( - is_diarization_enabled=True, - number_of_speakers=2))), - normalization=oci.ai_speech.models.TranscriptionNormalization( - is_punctuation_enabled=True, - filters=[ - oci.ai_speech.models.ProfanityTranscriptionFilter( - type="PROFANITY", - mode="TAG")]), - freeform_tags={}, - defined_tags={})) + # Load configuration + config = load_config(args.config) - log_step("Successfully created transcription job") - log_step("Job details:") - log_step(f" β€’ Job ID: {create_transcription_job_response.data.id}") - log_step(f" β€’ Output location: {create_transcription_job_response.data.output_location}") - log_step(f" β€’ Status: {create_transcription_job_response.data.lifecycle_state}") - log_step(f" β€’ Output will be saved to: {create_transcription_job_response.data.output_location.prefix}{config_yaml['speech']['namespace']}_{config_yaml['speech']['bucket_name']}_{file_name}.srt") + # Override config with command line arguments + if args.language: + config['speech']['language_code'] = args.language + if args.output_type: + config.setdefault('output', {})['storage_type'] = args.output_type - # Wait for job completion and get output file name - output_file = wait_for_job_completion(ai_speech_client, create_transcription_job_response.data.id) - log_step(f"Generated SRT file: {output_file}") + # Set defaults + language_code = config['speech'].get('language_code', 'en-US') + storage_type = config.get('output', {}).get('storage_type', 'both') -except Exception as e: - log_step(f"Failed to create transcription job: {str(e)}", True) - sys.exit(1) \ No newline at end of file + # Load OCI config + profile_name = config.get("profile", "DEFAULT") + try: + oci_config = oci.config.from_file(profile_name=profile_name) + log_step(f"Successfully loaded OCI configuration for profile: {profile_name}") + except Exception as e: + log_step(f"Failed to load OCI configuration: {str(e)}", True) + sys.exit(1) + + # Initialize service clients + try: + ai_speech_client = oci.ai_speech.AIServiceSpeechClient(oci_config) + object_storage_client = oci.object_storage.ObjectStorageClient(oci_config) + log_step("Successfully initialized OCI clients") + except Exception as e: + log_step(f"Failed to initialize OCI clients: {str(e)}", True) + sys.exit(1) + + # Determine if input_file is local file or object storage path + if os.path.exists(args.input_file): + # Local file - upload to object storage first + try: + object_name = upload_audio_file(object_storage_client, config, args.input_file) + file_name = os.path.basename(args.input_file) + except Exception as e: + log_step(f"Failed to upload audio file: {str(e)}", True) + sys.exit(1) + else: + # Assume it's already in object storage + object_name = args.input_file + file_name = object_name.split("/")[-1] + log_step(f"Using audio file from Object Storage: {object_name}") + + # Create output directory if needed + if storage_type in ['local', 'both']: + output_dir = config.get('output', {}).get('local_directory', './output') + os.makedirs(output_dir, exist_ok=True) + + # Log transcription settings + log_step("Creating transcription job with settings:") + log_step(f" β€’ Input file: {object_name}") + log_step(f" β€’ Language: {language_code}") + log_step(f" β€’ Output format: SRT") + log_step(f" β€’ Diarization: Enabled (2 speakers)") + log_step(f" β€’ Profanity filter: Enabled (TAG mode)") + log_step(f" β€’ Storage type: {storage_type}") + + try: + create_transcription_job_response = ai_speech_client.create_transcription_job( + create_transcription_job_details=oci.ai_speech.models.CreateTranscriptionJobDetails( + compartment_id=config['speech']['compartment_id'], + input_location=oci.ai_speech.models.ObjectListInlineInputLocation( + location_type="OBJECT_LIST_INLINE_INPUT_LOCATION", + object_locations=[oci.ai_speech.models.ObjectLocation( + namespace_name=config['speech']['namespace'], + bucket_name=config['speech']['bucket_name'], + object_names=[object_name])]), + output_location=oci.ai_speech.models.OutputLocation( + namespace_name=config['speech']['namespace'], + bucket_name=config['speech']['bucket_name'], + prefix=f"transcriptions/{file_name}"), + additional_transcription_formats=["SRT"], + model_details=oci.ai_speech.models.TranscriptionModelDetails( + domain="GENERIC", + language_code=language_code, + transcription_settings=oci.ai_speech.models.TranscriptionSettings( + diarization=oci.ai_speech.models.Diarization( + is_diarization_enabled=True, + number_of_speakers=2))), + normalization=oci.ai_speech.models.TranscriptionNormalization( + is_punctuation_enabled=True, + filters=[ + oci.ai_speech.models.ProfanityTranscriptionFilter( + type="PROFANITY", + mode="TAG")]), + freeform_tags={}, + defined_tags={})) + + job_id = create_transcription_job_response.data.id + log_step(f"Successfully created transcription job with ID: {job_id}") + + # Wait for job completion and get job info + job_info = wait_for_transcription_job(ai_speech_client, job_id) + + log_step("Transcription completed successfully!") + log_step(f"Job output prefix: {job_info['output_prefix']}") + + result = {'job_info': job_info} + + # Download to local if configured + if storage_type in ['local', 'both']: + local_srt_path = download_srt_file(object_storage_client, config, job_info) + result['local_srt_path'] = local_srt_path + log_step(f"Local SRT file: {local_srt_path}") + + log_step("Transcription workflow completed successfully!") + + except Exception as e: + log_step(f"Transcription failed: {str(e)}", True) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/oci-subtitle-translation/requirements.txt b/oci-subtitle-translation/requirements.txt index 74e0290..b78550c 100644 --- a/oci-subtitle-translation/requirements.txt +++ b/oci-subtitle-translation/requirements.txt @@ -1,2 +1,2 @@ oci>=2.141.1 -pyyaml>=6.0.1 \ No newline at end of file +pyyaml>=6.0.1 diff --git a/oci-subtitle-translation/translate_json.py b/oci-subtitle-translation/translate_json.py deleted file mode 100644 index d4cff03..0000000 --- a/oci-subtitle-translation/translate_json.py +++ /dev/null @@ -1,120 +0,0 @@ -import oci -import yaml -import argparse -import sys -import json -import os -from datetime import datetime - -def log_step(message, is_error=False): - """Print a formatted log message with timestamp""" - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - prefix = "ERROR" if is_error else "INFO" - print(f"[{timestamp}] {prefix}: {message}") - -# Parse command line arguments -parser = argparse.ArgumentParser(description='Translate a JSON subtitle file using OCI AI Translation') -parser.add_argument('--input-file', required=True, help='Input JSON file in the configured bucket') -parser.add_argument('--target-language', required=True, help='Target language code (e.g., fr, es, de)') -args = parser.parse_args() - -# Generate output filename -input_filename = os.path.splitext(args.input_file)[0] # Remove extension -output_file = f"{input_filename}_{args.target_language}.json" - -log_step(f"Starting translation of {args.input_file} to {args.target_language}") - -# Create a default config using DEFAULT profile in default location -try: - config = oci.config.from_file(profile_name="DEVRELCOMM") - log_step("Successfully loaded OCI configuration") -except Exception as e: - log_step(f"Failed to load OCI configuration: {str(e)}", True) - sys.exit(1) - -# Initialize service client with default config file -try: - ai_language_client = oci.ai_language.AIServiceLanguageClient(config) - log_step("Successfully initialized AI Translation client") -except Exception as e: - log_step(f"Failed to initialize AI Translation client: {str(e)}", True) - sys.exit(1) - -# Load config from yaml file -def load_config(): - """Load configuration from config.yaml""" - try: - with open('config.yaml', 'r') as f: - config = yaml.safe_load(f) - log_step("Successfully loaded config.yaml") - log_step(f"Using bucket: {config['speech']['bucket_name']}") - log_step(f"Using namespace: {config['speech']['namespace']}") - return config - except Exception as e: - log_step(f"Failed to load config.yaml: {str(e)}", True) - sys.exit(1) - -config_yaml = load_config() -object_storage_client = oci.object_storage.ObjectStorageClient(config) - -# Reads the JSON file -try: - namespace = config_yaml['speech']['namespace'] - bucket_name = config_yaml['speech']['bucket_name'] - object_name = args.input_file - - get_object_response = object_storage_client.get_object(namespace, bucket_name, object_name) - json_data = json.loads(get_object_response.data.text) # Read and parse JSON data - log_step(f"Loaded JSON file from OCI with {len(json_data.get('transcriptions', []))} transcriptions.") - - - log_step(f"Loaded {len(json_data)} subtitles from {args.input_file}") -except Exception as e: - log_step(f"Failed to read JSON file from OCI Object Storage: {str(e)}", True) - sys.exit(1) - -translated_data = [] -for item in json_data["transcriptions"]: - if "transcription" in item: - try: - document = oci.ai_language.models.Document( - language="en", - text=item["transcription"] - ) - - request_details = oci.ai_language.models.BatchTranslateTextDetails( - documents=[document], - target_language=args.target_language - ) - - response = ai_language_client.batch_translate_text(request_details) - translated_text = response.data[0].translated_text - - except Exception as e: - print(f"Error during translation: {str(e)}", file=sys.stderr) - translated_text = item['transcription'] - - # Update item with translated text - translated_item = item.copy() - translated_item['transcription'] = translated_text - translated_data.append(translated_item) - else: - print(f"Skipping invalid item: {item}") - -log_step(f"Translation completed successfully with {len(translated_data)} items translated") - -translated_json = json.dumps(translated_data, ensure_ascii=False, indent=4) - -try: - # Convert translated data back to JSON format - translated_json = json.dumps(translated_data, ensure_ascii=False, indent=4) - - # Use a temporary file to upload - with open(output_file, 'w', encoding='utf-8') as f: - f.write(translated_json) - - object_storage_client.put_object(namespace, bucket_name, output_file, translated_json.encode('utf-8')) - log_step(f"Translated JSON uploaded to OCI Object Storage as {output_file}") -except Exception as e: - log_step(f"Failed to upload translated JSON to OCI: {str(e)}", True) - sys.exit(1) diff --git a/oci-subtitle-translation/translate_srt.py b/oci-subtitle-translation/translate_srt.py index 53f823a..df2f998 100644 --- a/oci-subtitle-translation/translate_srt.py +++ b/oci-subtitle-translation/translate_srt.py @@ -1,157 +1,617 @@ +#!/usr/bin/env python3 +""" +OCI Language Translation for SRT Files + +Translate SRT subtitle files to multiple languages using OCI Language service. +Supports both local files and Object Storage inputs/outputs. +""" + import oci import yaml import argparse +import sys import os import time -from pathlib import Path - -def load_config(): - """Load configuration from config.yaml""" - with open('config.yaml', 'r') as f: - return yaml.safe_load(f) - -def get_language_client(): - """Initialize and return the OCI Language client""" - config = oci.config.from_file() - return oci.ai_language.AIServiceLanguageClient(config) - -def upload_to_object_storage(object_storage_client, namespace, bucket_name, file_path): - """Upload file to OCI Object Storage""" - file_name = os.path.basename(file_path) - - with open(file_path, 'rb') as f: - object_storage_client.put_object( - namespace, - bucket_name, - file_name, - f - ) - return file_name +import tempfile +from datetime import datetime + + +# Supported languages mapping +SUPPORTED_LANGUAGES = { + 'ar': 'Arabic', 'hr': 'Croatian', 'cs': 'Czech', 'da': 'Danish', + 'nl': 'Dutch', 'en': 'English', 'fi': 'Finnish', 'fr': 'French', + 'fr-CA': 'French Canadian', 'de': 'German', 'el': 'Greek', + 'he': 'Hebrew', 'hu': 'Hungarian', 'it': 'Italian', 'ja': 'Japanese', + 'ko': 'Korean', 'no': 'Norwegian', 'pl': 'Polish', 'pt': 'Portuguese', + 'pt-BR': 'Portuguese Brazilian', 'ro': 'Romanian', 'ru': 'Russian', + 'zh-CN': 'Simplified Chinese', 'sk': 'Slovak', 'sl': 'Slovenian', + 'es': 'Spanish', 'sv': 'Swedish', 'th': 'Thai', 'zh-TW': 'Traditional Chinese', + 'tr': 'Turkish', 'vi': 'Vietnamese' +} + + +def log_step(message, is_error=False): + """Print a formatted log message with timestamp""" + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + prefix = "ERROR" if is_error else "INFO" + print(f"[{timestamp}] {prefix}: {message}") + + +def load_config(config_file='config.yaml'): + """Load configuration from YAML file""" + try: + with open(config_file, 'r') as f: + config = yaml.safe_load(f) + log_step(f"Successfully loaded configuration from {config_file}") + return config + except FileNotFoundError: + log_step(f"Configuration file {config_file} not found", True) + log_step("Please copy config_example.yaml to config.yaml and update with your settings", True) + return None + except Exception as e: + log_step(f"Failed to load configuration: {str(e)}", True) + return None + + +def get_translation_namespace_bucket(config): + """Get namespace and bucket for translations""" + namespace = config.get('language', {}).get('namespace') or config['speech']['namespace'] + bucket_name = config.get('language', {}).get('bucket_name') or config['speech']['bucket_name'] + return namespace, bucket_name + + +def upload_srt_file(object_storage_client, config, local_file_path): + """Upload local SRT file to Object Storage""" + if not os.path.exists(local_file_path): + raise FileNotFoundError(f"SRT file not found: {local_file_path}") + + file_name = os.path.basename(local_file_path) + namespace, bucket_name = get_translation_namespace_bucket(config) + object_name = f"srt_files/{file_name}" + + log_step(f"Uploading {local_file_path} to Object Storage...") + + try: + with open(local_file_path, 'rb') as f: + object_storage_client.put_object( + namespace_name=namespace, + bucket_name=bucket_name, + object_name=object_name, + put_object_body=f + ) + + log_step(f"Successfully uploaded to: {object_name}") + return object_name + + except Exception as e: + log_step(f"Failed to upload SRT file: {str(e)}", True) + raise + -def wait_for_job_completion(client, job_id, compartment_id, max_wait_seconds=1800, wait_interval_seconds=30): +def wait_for_translation_job(language_client, job_id, compartment_id, max_wait_seconds=1800, wait_interval_seconds=30): """Wait for the translation job to complete""" for _ in range(0, max_wait_seconds, wait_interval_seconds): - get_job_response = client.get_job( - job_id=job_id, - compartment_id=compartment_id - ) - - status = get_job_response.data.lifecycle_state - if status == "SUCCEEDED": - return True - elif status in ["FAILED", "CANCELED"]: - print(f"Job failed with status: {status}") - return False + try: + get_job_response = language_client.get_job( + job_id=job_id, + compartment_id=compartment_id + ) + + status = get_job_response.data.lifecycle_state + if status == "SUCCEEDED": + log_step("Translation job completed successfully") + return True + elif status in ["FAILED", "CANCELED"]: + log_step(f"Translation job failed with status: {status}", True) + return False + else: + log_step(f"Translation job status: {status}. Waiting {wait_interval_seconds} seconds...") + + time.sleep(wait_interval_seconds) - time.sleep(wait_interval_seconds) + except Exception as e: + log_step(f"Error checking translation job status: {str(e)}", True) + return False + log_step("Translation job timed out", True) return False -def translate_srt(client, object_storage_client, config, input_file, source_lang='en', target_lang='es'): - """Translate an SRT file using OCI Language Async Document Translation""" + +def parse_srt_file(file_path): + """Parse SRT file and return list of subtitle entries""" + with open(file_path, 'r', encoding='utf-8') as f: + content = f.read() + + entries = [] + blocks = content.strip().split('\n\n') + + for block in blocks: + lines = block.strip().split('\n') + if len(lines) >= 3: + entry = { + 'number': lines[0], + 'timestamp': lines[1], + 'text': '\n'.join(lines[2:]) + } + entries.append(entry) + + return entries + + +def search_for_actual_srt_file(object_storage_client, config, expected_object_name): + """Search for the actual SRT file in Object Storage when the expected path doesn't exist""" + namespace, bucket_name = get_translation_namespace_bucket(config) + + path_parts = expected_object_name.split('/') + if len(path_parts) >= 3 and path_parts[0] == 'transcriptions': + audio_filename = path_parts[1] + base_name = os.path.splitext(path_parts[-1])[0] + + search_prefix = f"transcriptions/{audio_filename}" + + try: + log_step(f"Searching for SRT file with prefix: {search_prefix}") + + list_response = object_storage_client.list_objects( + namespace_name=namespace, + bucket_name=bucket_name, + prefix=search_prefix, + limit=1000 + ) + + srt_files = [] + for obj in list_response.data.objects: + if obj.name.endswith('.srt') and base_name in obj.name: + srt_files.append(obj.name) + + if srt_files: + srt_files.sort() + found_file = srt_files[-1] + log_step(f"Found actual SRT file: {found_file}") + return found_file + else: + log_step(f"No SRT file found with prefix {search_prefix}") + return None + + except Exception as e: + log_step(f"Error searching for SRT file: {str(e)}", True) + return None + + return None + + +def download_srt_from_object_storage(object_storage_client, config, object_name): + """Download SRT file from Object Storage to a temporary local file""" + import tempfile + + namespace, bucket_name = get_translation_namespace_bucket(config) + actual_object_name = object_name + + try: + log_step(f"Downloading SRT file from Object Storage: {actual_object_name}") + + get_response = object_storage_client.get_object( + namespace_name=namespace, + bucket_name=bucket_name, + object_name=actual_object_name + ) + + with tempfile.NamedTemporaryFile(mode='w+', suffix='.srt', delete=False, encoding='utf-8') as tmp_f: + for chunk in get_response.data.raw.stream(1024 * 1024, decode_content=False): + tmp_f.write(chunk.decode('utf-8')) + temp_path = tmp_f.name + + log_step(f"Downloaded SRT file to temporary location: {temp_path}") + return temp_path + + except Exception as e: + if "ObjectNotFound" in str(e) or "404" in str(e): + log_step(f"SRT file not found at expected path, searching...") + actual_object_name = search_for_actual_srt_file(object_storage_client, config, object_name) + + if actual_object_name: + try: + log_step(f"Downloading found SRT file: {actual_object_name}") + + get_response = object_storage_client.get_object( + namespace_name=namespace, + bucket_name=bucket_name, + object_name=actual_object_name + ) + + with tempfile.NamedTemporaryFile(mode='w+', suffix='.srt', delete=False, encoding='utf-8') as tmp_f: + for chunk in get_response.data.raw.stream(1024 * 1024, decode_content=False): + tmp_f.write(chunk.decode('utf-8')) + temp_path = tmp_f.name + + log_step(f"Downloaded SRT file to temporary location: {temp_path}") + return temp_path + + except Exception as retry_e: + log_step(f"Failed to download found SRT file: {str(retry_e)}", True) + raise + else: + log_step(f"Could not find SRT file in Object Storage", True) + raise + else: + log_step(f"Failed to download SRT file from Object Storage: {str(e)}", True) + raise + + +def get_srt_file_for_parsing(object_storage_client, config, srt_file_path): + """Get SRT file ready for parsing - download from Object Storage if needed""" + if os.path.exists(srt_file_path): + # Local file, return as-is + return srt_file_path, False # (file_path, is_temporary) + else: + # Object Storage path, download to temporary file + temp_path = download_srt_from_object_storage(object_storage_client, config, srt_file_path) + return temp_path, True # (file_path, is_temporary) + + +def translate_text_sync(language_client, text, source_lang, target_lang, compartment_id): + """Translate text using synchronous API""" try: - # Validate file size (20MB limit) - file_size = os.path.getsize(input_file) - if file_size > 20 * 1024 * 1024: # 20MB in bytes - raise ValueError("Input file exceeds 20MB limit") - - # Upload file to Object Storage - input_object_name = upload_to_object_storage( - object_storage_client, - config['speech']['namespace'], - config['speech']['bucket_name'], - input_file + documents = [oci.ai_language.models.TextDocument( + key="1", + text=text, + language_code=source_lang + )] + + batch_details = oci.ai_language.models.BatchLanguageTranslationDetails( + documents=documents, + target_language_code=target_lang, + compartment_id=compartment_id ) - # Create document details - document_details = oci.ai_language.models.ObjectLocation( - namespace_name=config['speech']['namespace'], - bucket_name=config['speech']['bucket_name'], + response = language_client.batch_language_translation( + batch_language_translation_details=batch_details + ) + + if response.status == 200 and response.data.documents: + return response.data.documents[0].translated_text + else: + log_step(f"Sync translation failed for {target_lang}", True) + return None + + except Exception as e: + log_step(f"Error in sync translation to {target_lang}: {str(e)}", True) + return None + + +def save_translated_srt(entries, output_path): + """Save translated SRT entries to file""" + os.makedirs(os.path.dirname(output_path), exist_ok=True) + + with open(output_path, 'w', encoding='utf-8') as f: + for entry in entries: + f.write(f"{entry['number']}\n") + f.write(f"{entry['timestamp']}\n") + f.write(f"{entry['text']}\n\n") + + +def translate_srt_sync(language_client, object_storage_client, config, srt_file_path, source_lang, target_lang): + """Translate SRT file using synchronous translation (subtitle by subtitle)""" + log_step(f"Translating {srt_file_path} to {target_lang} using synchronous method...") + + # Get the SRT file for parsing (download from Object Storage if needed) + local_srt_path, is_temporary = get_srt_file_for_parsing(object_storage_client, config, srt_file_path) + + try: + entries = parse_srt_file(local_srt_path) + translated_entries = [] + compartment_id = config['language']['compartment_id'] + + for i, entry in enumerate(entries): + log_step(f"Translating subtitle {i+1}/{len(entries)}") + translated_text = translate_text_sync(language_client, entry['text'], source_lang, target_lang, compartment_id) + + if translated_text: + translated_entry = entry.copy() + translated_entry['text'] = translated_text + translated_entries.append(translated_entry) + else: + log_step(f"Failed to translate subtitle {i+1}, keeping original", True) + translated_entries.append(entry) + + # Generate output filename + base_name = os.path.splitext(os.path.basename(srt_file_path))[0] + output_filename = f"{base_name}_{target_lang}.srt" + + # Save locally if configured + storage_type = config.get('output', {}).get('storage_type', 'both') + result = {'target_language': target_lang} + + if storage_type in ['local', 'both']: + output_dir = config.get('output', {}).get('local_directory', './output') + local_output_path = os.path.join(output_dir, output_filename) + save_translated_srt(translated_entries, local_output_path) + result['local_file_path'] = local_output_path + log_step(f"Saved translated SRT locally: {local_output_path}") + + # Upload to object storage if configured + if storage_type in ['object_storage', 'both']: + namespace, bucket_name = get_translation_namespace_bucket(config) + prefix = config.get('output', {}).get('object_storage_prefix', 'translations') + object_name = f"{prefix}/{output_filename}" + + # Create temporary file for upload + with tempfile.NamedTemporaryFile(mode='w', suffix='.srt', delete=False, encoding='utf-8') as tmp_f: + for entry in translated_entries: + tmp_f.write(f"{entry['number']}\n") + tmp_f.write(f"{entry['timestamp']}\n") + tmp_f.write(f"{entry['text']}\n\n") + temp_path = tmp_f.name + + try: + with open(temp_path, 'rb') as f: + object_storage_client.put_object( + namespace_name=namespace, + bucket_name=bucket_name, + object_name=object_name, + put_object_body=f + ) + + result['object_storage_path'] = object_name + log_step(f"Uploaded translated SRT to object storage: {object_name}") + + finally: + # Clean up temporary file + os.unlink(temp_path) + + return result + + finally: + # Clean up temporary SRT file if we downloaded it + if is_temporary and os.path.exists(local_srt_path): + os.unlink(local_srt_path) + + +def translate_srt_batch(language_client, object_storage_client, config, srt_file_path, source_lang, target_lang): + """Translate SRT file using batch/async translation""" + log_step(f"Translating {srt_file_path} to {target_lang} using batch method...") + + # Get the actual SRT file for processing (handles both local and Object Storage) + local_srt_path, is_temporary = get_srt_file_for_parsing(object_storage_client, config, srt_file_path) + + # Validate file size (20MB limit for batch translation) + file_size = os.path.getsize(local_srt_path) + if file_size > 20 * 1024 * 1024: # 20MB in bytes + log_step("File exceeds 20MB limit, falling back to synchronous translation") + # Clean up temporary file if needed + if is_temporary and os.path.exists(local_srt_path): + os.unlink(local_srt_path) + return translate_srt_sync(language_client, object_storage_client, config, srt_file_path, source_lang, target_lang) + + # Determine object storage path for batch processing + if os.path.exists(srt_file_path): + # Local file - upload to object storage first + input_object_name = upload_srt_file(object_storage_client, config, srt_file_path) + base_name = os.path.splitext(os.path.basename(srt_file_path))[0] + else: + # Already in object storage + input_object_name = srt_file_path + base_name = os.path.splitext(os.path.basename(srt_file_path))[0] + + namespace, bucket_name = get_translation_namespace_bucket(config) + + try: + # Create document details for input and output locations + input_location_details = oci.ai_language.models.ObjectStorageFileNameLocation( + namespace_name=namespace, + bucket_name=bucket_name, object_names=[input_object_name] ) + # Output prefix for the translated file + output_prefix = config.get('output', {}).get('object_storage_prefix', 'translations') + output_location_details = oci.ai_language.models.ObjectPrefixOutputLocation( + namespace_name=namespace, + bucket_name=bucket_name, + prefix=f"{output_prefix}/{base_name}_{target_lang}" + ) + + # Create translation task details + translation_task_details = oci.ai_language.models.BatchLanguageTranslationDetails( + target_language_code=target_lang + ) + # Create job details - create_job_details = oci.ai_language.models.CreateBatchLanguageTranslationJobDetails( + create_job_details = oci.ai_language.models.CreateJobDetails( compartment_id=config['language']['compartment_id'], - display_name=f"Translate_{os.path.basename(input_file)}_{target_lang}", - source_language_code=source_lang, - target_language_code=target_lang, - input_location=document_details, - output_location=document_details, - model_id="PRETRAINED_LANGUAGE_TRANSLATION" + display_name=f"Translate_{base_name}_{target_lang}", + input_location=input_location_details, + output_location=output_location_details, + job_details=translation_task_details ) # Create translation job - response = client.create_job( + response = language_client.create_job( create_job_details=create_job_details ) job_id = response.data.id - print(f"Translation job created with ID: {job_id}") + log_step(f"Translation job created with ID: {job_id}") # Wait for job completion - if wait_for_job_completion(client, job_id, config['language']['compartment_id']): - print(f"Successfully translated to {target_lang}") - return True + if wait_for_translation_job(language_client, job_id, config['language']['compartment_id']): + # Construct expected output file name + output_filename = f"{base_name}_{target_lang}.srt" + output_object_name = f"{output_prefix}/{base_name}_{target_lang}/{output_filename}" + + result = { + 'target_language': target_lang, + 'object_storage_path': output_object_name + } + + # Download locally if configured + storage_type = config.get('output', {}).get('storage_type', 'both') + if storage_type in ['local', 'both']: + output_dir = config.get('output', {}).get('local_directory', './output') + local_path = os.path.join(output_dir, output_filename) + + try: + get_response = object_storage_client.get_object( + namespace_name=namespace, + bucket_name=bucket_name, + object_name=output_object_name + ) + + os.makedirs(output_dir, exist_ok=True) + with open(local_path, 'wb') as f: + for chunk in get_response.data.raw.stream(1024 * 1024, decode_content=False): + f.write(chunk) + + result['local_file_path'] = local_path + log_step(f"Downloaded translated file locally: {local_path}") + + except Exception as e: + log_step(f"Failed to download translated file: {str(e)}", True) + + log_step(f"Successfully translated to {target_lang}") + return result else: - print("Translation job failed or timed out") - return False + log_step(f"Translation job failed for {target_lang}", True) + return None except Exception as e: - print(f"Error translating to {target_lang}: {str(e)}") - return False + log_step(f"Error in batch translation to {target_lang}: {str(e)}", True) + # Fallback to synchronous translation for smaller files + if os.path.exists(srt_file_path): + log_step("Falling back to synchronous translation...") + return translate_srt_sync(language_client, object_storage_client, config, srt_file_path, source_lang, target_lang) + return None + + finally: + # Clean up temporary SRT file if we downloaded it + if is_temporary and os.path.exists(local_srt_path): + os.unlink(local_srt_path) def main(): - # Define supported languages - SUPPORTED_LANGUAGES = { - 'ar': 'Arabic', 'hr': 'Croatian', 'cs': 'Czech', 'da': 'Danish', - 'nl': 'Dutch', 'en': 'English', 'fi': 'Finnish', 'fr': 'French', - 'fr-CA': 'French Canadian', 'de': 'German', 'el': 'Greek', - 'he': 'Hebrew', 'hu': 'Hungarian', 'it': 'Italian', 'ja': 'Japanese', - 'ko': 'Korean', 'no': 'Norwegian', 'pl': 'Polish', 'pt': 'Portuguese', - 'pt-BR': 'Portuguese Brazilian', 'ro': 'Romanian', 'ru': 'Russian', - 'zh-CN': 'Simplified Chinese', 'sk': 'Slovak', 'sl': 'Slovenian', - 'es': 'Spanish', 'sv': 'Swedish', 'th': 'Thai', 'zh-TW': 'Traditional Chinese', - 'tr': 'Turkish', 'vi': 'Vietnamese' - } - - parser = argparse.ArgumentParser(description='Translate SRT files using OCI Language') - parser.add_argument('--input-file', required=True, help='Input SRT file path') - parser.add_argument('--source-lang', default='en', help='Source language code') - parser.add_argument('--target-langs', nargs='+', help='Target language codes (space-separated)') + parser = argparse.ArgumentParser( + description='Translate SRT files using OCI Language service', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Translate local SRT file to multiple languages + python translate_srt.py --input-file subtitles.srt --target-languages es fr de + + # Translate with specific source language and method + python translate_srt.py --input-file subtitles.srt --source-language en --target-languages es --method sync + + # Translate SRT file in Object Storage + python translate_srt.py --input-file "srt_files/subtitles.srt" --target-languages es fr + +Supported languages: """ + ", ".join([f"{code} ({name})" for code, name in sorted(SUPPORTED_LANGUAGES.items())]) + ) + + parser.add_argument('--input-file', required=True, + help='SRT file path (local file or Object Storage object name)') + parser.add_argument('--source-language', type=str, default='en', + help='Source language code (default: en)') + parser.add_argument('--target-languages', nargs='+', type=str, + help='Target language codes (space-separated)') + parser.add_argument('--method', choices=['sync', 'batch'], default=None, + help='Translation method (default: from config or batch)') + parser.add_argument('--output-type', choices=['local', 'object_storage', 'both'], default=None, + help='Where to store output (default: from config)') + parser.add_argument('--config', type=str, default='config.yaml', + help='Configuration file path (default: config.yaml)') + args = parser.parse_args() - # Validate input file - if not os.path.exists(args.input_file): - print(f"Error: Input file {args.input_file} not found") + # Load configuration + config = load_config(args.config) + if not config: return - # Load configuration - config = load_config() + # Override config with command line arguments + if args.method: + config.setdefault('translation', {})['method'] = args.method + if args.output_type: + config.setdefault('output', {})['storage_type'] = args.output_type + + # Set defaults + method = config.get('translation', {}).get('method', 'batch') + storage_type = config.get('output', {}).get('storage_type', 'both') + target_languages = args.target_languages + if not target_languages: + target_languages = config.get('translation', {}).get('target_languages', ['es', 'fr', 'de']) + + # Validate input file + if os.path.exists(args.input_file): + log_step(f"Using local SRT file: {args.input_file}") + else: + log_step(f"Using SRT file from Object Storage: {args.input_file}") + + # Load OCI configuration + profile_name = config.get("profile", "DEFAULT") + try: + oci_config = oci.config.from_file(profile_name=profile_name) + region = oci_config.get("region", "unknown") + log_step(f"Loaded OCI profile '{profile_name}' (region: {region})") + except Exception as e: + log_step(f"Failed to load OCI configuration: {e}", True) + return # Initialize clients - language_client = get_language_client() - object_storage_client = oci.object_storage.ObjectStorageClient(oci.config.from_file()) + try: + language_client = oci.ai_language.AIServiceLanguageClient(oci_config) + object_storage_client = oci.object_storage.ObjectStorageClient(oci_config) + log_step("Successfully initialized OCI clients") + except Exception as e: + log_step(f"Failed to initialize OCI clients: {str(e)}", True) + return + + # Create output directory if needed + if storage_type in ['local', 'both']: + output_dir = config.get('output', {}).get('local_directory', './output') + os.makedirs(output_dir, exist_ok=True) + log_step(f"Local output directory: {output_dir}") + + # Validate target languages + valid_languages = [] + for lang in target_languages: + if lang in SUPPORTED_LANGUAGES: + if lang != args.source_language: # Don't translate to same language + valid_languages.append(lang) + else: + log_step(f"Unsupported language code '{lang}', skipping...", True) - # If no target languages specified, translate to all supported languages - target_langs = args.target_langs if args.target_langs else SUPPORTED_LANGUAGES.keys() + if not valid_languages: + log_step("No valid target languages specified", True) + return + + log_step(f"Translation settings:") + log_step(f" β€’ SRT file: {args.input_file}") + log_step(f" β€’ Source language: {args.source_language}") + log_step(f" β€’ Target languages: {', '.join(valid_languages)}") + log_step(f" β€’ Method: {method}") + log_step(f" β€’ Storage type: {storage_type}") # Translate to each target language - for lang in target_langs: - if lang not in SUPPORTED_LANGUAGES: - print(f"Warning: Unsupported language code '{lang}', skipping...") - continue - - if lang != args.source_lang: - print(f"Translating to {SUPPORTED_LANGUAGES[lang]} ({lang})...") - translate_srt( - language_client, - object_storage_client, - config, - args.input_file, - args.source_lang, - lang - ) + successful_translations = 0 + for lang in valid_languages: + lang_name = SUPPORTED_LANGUAGES[lang] + log_step(f"\nTranslating to {lang_name} ({lang})...") + + if method == 'sync': + result = translate_srt_sync(language_client, object_storage_client, config, args.input_file, args.source_language, lang) + else: # batch + result = translate_srt_batch(language_client, object_storage_client, config, args.input_file, args.source_language, lang) + + if result: + successful_translations += 1 + log_step(f"βœ“ Successfully translated to {lang_name} ({lang})") + if 'local_file_path' in result: + log_step(f" Local file: {result['local_file_path']}") + if 'object_storage_path' in result: + log_step(f" Object Storage: {result['object_storage_path']}") + else: + log_step(f"βœ— Failed to translate to {lang_name} ({lang})", True) + + log_step(f"\nTranslation completed: {successful_translations}/{len(valid_languages)} successful") + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/oci-subtitle-translation/workflow.py b/oci-subtitle-translation/workflow.py new file mode 100644 index 0000000..f6d94d6 --- /dev/null +++ b/oci-subtitle-translation/workflow.py @@ -0,0 +1,254 @@ +#!/usr/bin/env python3 +""" +OCI Subtitle Translation Workflow + +Complete workflow for transcribing audio files and translating subtitles +using OCI Speech and Language services. +""" + +import argparse +import os +import sys +import yaml +import subprocess +from datetime import datetime + + +def log_step(message, is_error=False): + """Print a formatted log message with timestamp""" + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + prefix = "ERROR" if is_error else "INFO" + print(f"[{timestamp}] {prefix}: {message}") + + +def load_config(config_file='config.yaml'): + """Load configuration from YAML file""" + try: + with open(config_file, 'r') as f: + config = yaml.safe_load(f) + return config + except FileNotFoundError: + log_step(f"Configuration file {config_file} not found", True) + log_step("Please copy config_example.yaml to config.yaml and update with your settings", True) + return None + except Exception as e: + log_step(f"Failed to load configuration: {str(e)}", True) + return None + + +def run_transcription(args, config): + """Run the transcription workflow""" + log_step("Starting transcription workflow...") + + cmd = [ + "python", "generate_srt_from_audio.py", + "--input-file", args.audio_source + ] + + if args.speech_language: + cmd.extend(["--language", args.speech_language]) + + if args.output_type: + cmd.extend(["--output-type", args.output_type]) + + if args.config != 'config.yaml': + cmd.extend(["--config", args.config]) + + try: + result = subprocess.run(cmd, check=True, capture_output=True, text=True) + log_step("Transcription completed successfully") + return True + except subprocess.CalledProcessError as e: + log_step(f"Transcription failed: {e}", True) + if e.stdout: + print("STDOUT:", e.stdout) + if e.stderr: + print("STDERR:", e.stderr) + return False + + +def run_translation(args, config): + """Run the translation workflow""" + log_step("Starting translation workflow...") + + cmd = [ + "python", "translate_srt.py", + "--input-file", args.srt_file, + "--source-language", args.source_language + ] + + if args.target_languages: + cmd.extend(["--target-languages"] + args.target_languages) + + if args.translation_method: + cmd.extend(["--method", args.translation_method]) + + if args.output_type: + cmd.extend(["--output-type", args.output_type]) + + if args.config != 'config.yaml': + cmd.extend(["--config", args.config]) + + try: + result = subprocess.run(cmd, check=True, capture_output=True, text=True) + log_step("Translation completed successfully") + return True + except subprocess.CalledProcessError as e: + log_step(f"Translation failed: {e}", True) + if e.stdout: + print("STDOUT:", e.stdout) + if e.stderr: + print("STDERR:", e.stderr) + return False + + +def find_generated_srt(config, audio_file): + """Find the SRT file generated from audio transcription""" + audio_filename = os.path.basename(audio_file) + base_name = os.path.splitext(audio_filename)[0] + + output_dir = config.get('output', {}).get('local_directory', './output') + expected_local_path = os.path.join(output_dir, f"{base_name}.srt") + + if os.path.exists(expected_local_path): + log_step(f"Found generated SRT file locally: {expected_local_path}") + return expected_local_path + + storage_type = config.get('output', {}).get('storage_type', 'both') + if storage_type in ['object_storage', 'both']: + object_storage_path = f"transcriptions/{audio_filename}/{base_name}.srt" + log_step(f"Using Object Storage SRT path: {object_storage_path}") + return object_storage_path + + log_step(f"Using fallback local SRT path: {expected_local_path}") + return expected_local_path + + +def main(): + parser = argparse.ArgumentParser( + description='OCI Subtitle Translation Workflow', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Complete workflow: transcribe and translate + python workflow.py --audio-source audio.mp3 --target-languages es fr de + + # Transcription only + python workflow.py --transcribe-only --audio-source audio.mp3 + + # Translation only + python workflow.py --translate-only --srt-file subtitles.srt --target-languages es fr + + # Object Storage audio file + python workflow.py --audio-source "audio/recording.mp3" --target-language es + """ + ) + + # Workflow control + workflow_group = parser.add_mutually_exclusive_group() + workflow_group.add_argument('--transcribe-only', action='store_true', + help='Only perform transcription (no translation)') + workflow_group.add_argument('--translate-only', action='store_true', + help='Only perform translation (no transcription)') + + # Transcription options + parser.add_argument('--audio-source', type=str, + help='Audio file path (local file or Object Storage object name)') + parser.add_argument('--speech-language', type=str, + help='Language code for speech transcription (default: from config)') + + # Translation options + parser.add_argument('--srt-file', type=str, + help='SRT file path for translation (local file or Object Storage object name)') + parser.add_argument('--source-language', type=str, default='en', + help='Source language code (default: en)') + parser.add_argument('--target-languages', nargs='+', type=str, + help='Target language codes (default: from config)') + parser.add_argument('--target-language', type=str, + help='Single target language code (alternative to --target-languages)') + parser.add_argument('--translation-method', choices=['sync', 'batch'], + help='Translation method (default: from config)') + + # General options + parser.add_argument('--output-type', choices=['local', 'object_storage', 'both'], + help='Where to store output (default: from config)') + parser.add_argument('--config', type=str, default='config.yaml', + help='Configuration file path (default: config.yaml)') + + args = parser.parse_args() + + if not args.transcribe_only and not args.translate_only: + if not args.audio_source: + log_step("ERROR: --audio-source is required for complete workflow", True) + parser.print_help() + sys.exit(1) + elif args.transcribe_only: + if not args.audio_source: + log_step("ERROR: --audio-source is required for transcription", True) + parser.print_help() + sys.exit(1) + elif args.translate_only: + if not args.srt_file: + log_step("ERROR: --srt-file is required for translation only", True) + parser.print_help() + sys.exit(1) + + config = load_config(args.config) + if not config: + sys.exit(1) + + if args.target_language and args.target_languages: + log_step("ERROR: Cannot specify both --target-language and --target-languages", True) + sys.exit(1) + elif args.target_language: + args.target_languages = [args.target_language] + elif not args.target_languages and not args.transcribe_only: + default_langs = config.get('translation', {}).get('target_languages', []) + if default_langs: + args.target_languages = default_langs + log_step(f"Using default target languages from config: {default_langs}") + + log_step("Starting OCI Subtitle Translation workflow") + log_step(f"Configuration: {args.config}") + + success = True + + if not args.translate_only: + success = run_transcription(args, config) + if not success and not args.transcribe_only: + log_step("Transcription failed, cannot proceed with translation", True) + sys.exit(1) + + if not args.transcribe_only and success: + if not args.translate_only: + args.srt_file = find_generated_srt(config, args.audio_source) + log_step(f"Using generated SRT file: {args.srt_file}") + + success = run_translation(args, config) + + log_step("\n" + "="*60) + log_step("WORKFLOW SUMMARY") + log_step("="*60) + + if args.transcribe_only: + if success: + log_step("βœ“ Transcription completed successfully") + else: + log_step("βœ— Transcription failed", True) + elif args.translate_only: + if success: + log_step("βœ“ Translation completed successfully") + else: + log_step("βœ— Translation failed", True) + else: + if success: + log_step("βœ“ Complete workflow completed successfully") + else: + log_step("βœ— Workflow failed", True) + + log_step("Workflow finished!") + sys.exit(0 if success else 1) + + +if __name__ == "__main__": + main()