diff --git a/cmd/raw-recording-tools/README.md b/cmd/raw-recording-tools/README.md new file mode 100644 index 0000000..725a86d --- /dev/null +++ b/cmd/raw-recording-tools/README.md @@ -0,0 +1,428 @@ +# Raw-Tools CLI + +Post-processing tools for raw video call recordings with intelligent completion, validation, and advanced audio/video processing. + +## Features + +- **Discovery**: Use `list-tracks` to explore recording contents with screenshare detection +- **Smart Completion**: Shell completion with dynamic values from actual recordings +- **Validation**: Automatic validation of user inputs against available data +- **Multiple Formats**: Support for different output formats (table, JSON, completion) +- **Advanced Processing**: Extract, mux, mix and process audio/video with gap filling +- **Hybrid Architecture**: Optimized performance for different use cases + +## Commands + +### `list-tracks` - Discovery & Completion Hub + +The `list-tracks` command serves as both a discovery tool and completion engine for other commands. + +```bash +# Basic usage - see all tracks in table format (no --output needed) +raw-tools --inputFile recording.tar.gz list-tracks + +# Get JSON output for programmatic use +raw-tools --inputFile recording.tar.gz list-tracks --format json + +# Get completion-friendly lists +raw-tools --inputFile recording.tar.gz list-tracks --format users +raw-tools --inputFile recording.tar.gz list-tracks --format sessions +raw-tools --inputFile recording.tar.gz list-tracks --format tracks +``` + +**Options:** +- `--format ` - Output format: `table` (default), `json`, `users`, `sessions`, `tracks`, `completion` +- `--trackType ` - Filter by track type: `audio`, `video` (optional) +- `-h, --help` - Show help message + +**Output Formats:** +- `table` - Human-readable table with screenshare detection (default) +- `json` - Full metadata in JSON format for scripting +- `users` - List of user IDs only (for shell scripts) +- `sessions` - List of session IDs only (for automation) +- `tracks` - List of track IDs only (for filtering) +- `completion` - Shell completion format + +### `extract-audio` - Extract Audio Tracks + +Extract and convert individual audio tracks from raw recordings to WebM format. + +```bash +# Extract audio for all users +raw-tools --inputFile recording.zip --output ./output extract-audio + +# Extract audio for specific user with gap filling +raw-tools --inputFile recording.zip --output ./output extract-audio --userId user123 --fill_gaps + +# Extract audio for specific session +raw-tools --inputFile recording.zip --output ./output extract-audio --sessionId session456 + +# Extract specific track only +raw-tools --inputFile recording.zip --output ./output extract-audio --trackId track789 +``` + +**Options:** +- `--userId ` - Filter by user ID (returns all tracks for that user) +- `--sessionId ` - Filter by session ID (returns all tracks for that session) +- `--trackId ` - Filter by track ID (returns only that specific track) +- **Note**: These filters are mutually exclusive - only one can be specified at a time +- `--fill_gaps` - Fill temporal gaps between segments with silence (recommended for playback) +- `-h, --help` - Show help message + +**Mutually Exclusive Filtering:** +- Only one filter can be specified at a time: `--userId`, `--sessionId`, or `--trackId` +- `--trackId` returns exactly one track (the specified track) +- `--sessionId` returns all tracks for that session (multiple tracks possible) +- `--userId` returns all tracks for that user (multiple tracks possible) +- If no filter is specified, all tracks are processed + +### `extract-video` - Extract Video Tracks + +Extract and convert individual video tracks from raw recordings to WebM format. + +```bash +# Extract video for all users +raw-tools --inputFile recording.zip --output ./output extract-video + +# Extract video for specific user with black frame filling +raw-tools --inputFile recording.zip --output ./output extract-video --userId user123 --fill_gaps + +# Extract screenshare video only +raw-tools --inputFile recording.zip --output ./output extract-video --userId user456 --fill_gaps +``` + +**Options:** +- `--userId ` - Filter by user ID (returns all tracks for that user) +- `--sessionId ` - Filter by session ID (returns all tracks for that session) +- `--trackId ` - Filter by track ID (returns only that specific track) +- **Note**: These filters are mutually exclusive - only one can be specified at a time +- `--fill_gaps` - Fill temporal gaps between segments with black frames (recommended for playback) +- `-h, --help` - Show help message + +**Video Processing:** +- Supports regular camera video and screenshare video +- Automatically detects and preserves video codec (VP8, VP9, H264, AV1) +- Gap filling generates black frames matching original video dimensions and framerate + +### `mux-av` - Mux Audio/Video + +Combine individual audio and video tracks with proper synchronization and timing offsets. + +```bash +# Mux audio/video for all users +raw-tools --inputFile recording.zip --output ./output mux-av + +# Mux for specific user with proper sync +raw-tools --inputFile recording.zip --output ./output mux-av --userId user123 + +# Mux for specific session +raw-tools --inputFile recording.zip --output ./output mux-av --sessionId session456 + +# Mux specific tracks with precise control +raw-tools --inputFile recording.zip --output ./output mux-av --userId user123 --sessionId session456 +``` + +**Options:** +- `--userId ` - Filter by user ID (returns all tracks for that user) +- `--sessionId ` - Filter by session ID (returns all tracks for that session) +- `--trackId ` - Filter by track ID (returns only that specific track) +- **Note**: These filters are mutually exclusive - only one can be specified at a time +- `--media ` - Filter by media type: `user` (camera/microphone), `display` (screen sharing), or `both` (default) +- `-h, --help` - Show help message + +**Features:** +- Automatic timing synchronization between audio and video using RTCP timestamps +- Gap filling for seamless playback (always enabled for muxing) +- Single combined WebM output per user/session combination +- Intelligent offset calculation for perfect A/V sync +- Supports all video codecs (VP8, VP9, H264, AV1) with Opus audio +- Media type filtering ensures consistent pairing (user camera ↔ user microphone, display sharing ↔ display audio) + +**Media Type Examples:** +```bash +# Mux only user camera/microphone tracks +raw-tools --inputFile recording.zip --output ./output mux-av --userId user123 --media user + +# Mux only display sharing tracks +raw-tools --inputFile recording.zip --output ./output mux-av --userId user123 --media display + +# Mux both types with proper pairing (default) +raw-tools --inputFile recording.zip --output ./output mux-av --userId user123 --media both +``` + +### `mix-audio` - Mix Multiple Audio Tracks + +Mix audio from multiple users/sessions into a single synchronized audio file, perfect for conference call reconstruction. + +```bash +# Mix audio from all users (full conference call) +raw-tools --inputFile recording.zip --output ./output mix-audio + +# Mix audio from specific user across all sessions +raw-tools --inputFile recording.zip --output ./output mix-audio --userId user123 + +# Mix audio from specific session (all users in that session) +raw-tools --inputFile recording.zip --output ./output mix-audio --sessionId session456 + +# Mix specific tracks with fine control +raw-tools --inputFile recording.zip --output ./output mix-audio --userId user123 --sessionId session456 +``` + +**Options:** +- `--userId ` - Filter by user ID (returns all tracks for that user) +- `--sessionId ` - Filter by session ID (returns all tracks for that session) +- `--trackId ` - Filter by track ID (returns only that specific track) +- **Note**: These filters are mutually exclusive - only one can be specified at a time +- `--no-fill-gaps` - Disable gap filling (not recommended for mixing, gaps enabled by default) +- `-h, --help` - Show help message + +**Perfect for:** +- Conference call audio reconstruction with proper timing +- Multi-participant audio analysis and review +- Creating complete session audio timelines +- Audio synchronization testing and validation +- Podcast-style recordings from video calls + +**Advanced Mixing:** +- Uses FFmpeg adelay and amix filters for professional-quality mixing +- Automatic timing offset calculation based on segment metadata +- Gap filling with silence maintains temporal relationships +- Output: Single `mixed_audio.webm` file with all tracks properly synchronized + +### `process-all` - Complete Workflow + +Execute audio extraction, video extraction, and muxing in a single command - the all-in-one solution. + +```bash +# Process everything for all users +raw-tools --inputFile recording.zip --output ./output process-all + +# Process everything for specific user +raw-tools --inputFile recording.zip --output ./output process-all --userId user123 + +# Process specific session with all participants +raw-tools --inputFile recording.zip --output ./output process-all --sessionId session456 + +# Process specific tracks with full workflow +raw-tools --inputFile recording.zip --output ./output process-all --userId user123 --sessionId session456 +``` + +**Options:** +- `--userId ` - Filter by user ID (returns all tracks for that user) +- `--sessionId ` - Filter by session ID (returns all tracks for that session) +- `--trackId ` - Filter by track ID (returns only that specific track) +- **Note**: These filters are mutually exclusive - only one can be specified at a time +- `-h, --help` - Show help message + +**Workflow Steps:** +1. **Audio Extraction** - Extracts all matching audio tracks with gap filling enabled +2. **Video Extraction** - Extracts all matching video tracks with gap filling enabled +3. **Audio/Video Muxing** - Combines corresponding audio and video tracks with sync + +**Outputs:** +- Individual audio tracks (WebM format): `audio_userId_sessionId_trackId.webm` +- Individual video tracks (WebM format): `video_userId_sessionId_trackId.webm` +- Combined audio/video files (WebM format): `muxed_userId_sessionId_combined.webm` +- All files include gap filling for seamless playback +- Perfect for bulk processing and automated workflows + +## Completion Workflow Architecture + +### 1. Discovery Phase +```bash +# First, explore what's in your recording +raw-tools --inputFile recording.zip list-tracks + +# Example output with screenshare detection: +# USER ID SESSION ID TRACK ID TYPE SCREENSHARE CODEC SEGMENTS +# -------------------- -------------------- -------------------- ------- ------------ --------------- -------- +# user_abc123 session_xyz789 track_001 audio No audio/opus 3 +# user_abc123 session_xyz789 track_002 video No video/VP8 2 +# user_def456 session_xyz789 track_003 video Yes video/VP8 1 +``` + +### 2. Shell Completion Setup + +```bash +# Install completion for your shell +source <(raw-tools completion bash) # Bash +source <(raw-tools completion zsh) # Zsh +raw-tools completion fish | source # Fish +``` + +### 3. Dynamic Completion in Action + +With completion enabled, the CLI will: +- **Auto-complete commands** and flags +- **Dynamically suggest user IDs** from the actual recording +- **Validate inputs** against available data +- **Provide helpful error messages** with discovery hints + +```bash +# Tab completion will suggest actual user IDs from your recording +raw-tools --inputFile recording.zip --output ./out extract-audio --userId +# Shows: user_abc123 user_def456 + +# Invalid inputs show helpful errors +raw-tools --inputFile recording.zip --output ./out extract-audio --userId invalid_user +# Error: userID 'invalid_user' not found in recording. Available users: user_abc123, user_def456 +# Tip: Use 'raw-tools --inputFile recording.zip --output ./out list-tracks --format users' to see available user IDs +``` + +### 4. Programmatic Integration + +```bash +# Get user IDs for scripts +USERS=$(raw-tools --inputFile recording.zip list-tracks --format users) + +# Process each user +for user in $USERS; do + raw-tools --inputFile recording.zip --output ./output extract-audio --userId "$user" --fill_gaps +done + +# Get JSON metadata for complex processing +raw-tools --inputFile recording.zip list-tracks --format json > metadata.json +``` + +## Workflow Examples + +### Example 1: Extract Audio for Each Participant + +```bash +# 1. Discover participants +raw-tools --inputFile call.zip list-tracks --format users + +# 2. Extract each participant's audio +for user in $(raw-tools --inputFile call.zip list-tracks --format users); do + echo "Extracting audio for user: $user" + raw-tools --inputFile call.zip --output ./extracted extract-audio --userId "$user" --fill_gaps +done +``` + +### Example 2: Quality Check Before Processing + +```bash +# 1. Get full metadata overview +raw-tools --inputFile recording.zip list-tracks --format json > recording_info.json + +# 2. Check track counts +audio_tracks=$(raw-tools --inputFile recording.zip list-tracks --trackType audio --format json | jq '.tracks | length') +video_tracks=$(raw-tools --inputFile recording.zip list-tracks --trackType video --format json | jq '.tracks | length') + +echo "Found $audio_tracks audio tracks and $video_tracks video tracks" + +# 3. Process only if we have both audio and video +if [ "$audio_tracks" -gt 0 ] && [ "$video_tracks" -gt 0 ]; then + raw-tools --inputFile recording.zip --output ./output mux-av +fi +``` + +### Example 3: Conference Call Audio Mixing + +```bash +# 1. Mix all participants into single audio file +raw-tools --inputFile conference.zip --output ./mixed mix-audio + +# 2. Mix specific users for focused conversation (individual commands) +raw-tools --inputFile conference.zip --output ./mixed mix-audio --userId user1 +raw-tools --inputFile conference.zip --output ./mixed mix-audio --userId user2 + +# 3. Create session-by-session mixed audio +for session in $(raw-tools --inputFile conference.zip list-tracks --format sessions); do + raw-tools --inputFile conference.zip --output "./mixed/$session" mix-audio --sessionId "$session" +done +``` + +### Example 4: Complete Processing Pipeline + +```bash +# All-in-one processing for the entire recording +raw-tools --inputFile recording.zip --output ./complete process-all + +# Results in: +# - ./complete/audio_*.webm (individual audio tracks) +# - ./complete/video_*.webm (individual video tracks) +# - ./complete/muxed_*.webm (combined A/V tracks) +``` + +### Example 5: Session-Based Processing + +```bash +# 1. Process each session separately +for session in $(raw-tools --inputFile recording.zip list-tracks --format sessions); do + echo "Processing session: $session" + + # Extract all audio from this session + raw-tools --inputFile recording.zip --output "./output/$session" extract-audio --sessionId "$session" --fill_gaps + + # Extract all video from this session + raw-tools --inputFile recording.zip --output "./output/$session" extract-video --sessionId "$session" --fill_gaps + + # Mux audio/video for this session + raw-tools --inputFile recording.zip --output "./output/$session" mux-av --sessionId "$session" +done +``` + +## Architecture & Performance + +### Hybrid Processing Architecture + +The tool uses an intelligent hybrid approach optimized for different use cases: + +**Fast Metadata Reading (`list-tracks`):** +- Direct tar.gz parsing for metadata-only operations +- Skips extraction of large media files (.rtpdump/.sdp) +- 10-50x faster than full extraction for discovery workflows + +**Full Processing (extraction commands):** +- Complete archive extraction to temporary directories +- Access to all media files for conversion and processing +- Unified processing pipeline for reliability + +### Command Categories + +1. **Discovery Commands** (`list-tracks`) + - Optimized for speed and shell completion + - Minimal resource usage + - Instant metadata access + +2. **Processing Commands** (`extract-*`, `mix-*`, `mux-*`, `process-all`) + - Full archive extraction and processing + - Complete media file access + - Advanced audio/video operations + +3. **Utility Commands** (`completion`, `help`) + - Shell integration and documentation + +## Benefits of the Architecture + +1. **Discoverability**: No need to guess user IDs, session IDs, or track IDs +2. **Performance**: Optimized operations for different use cases +3. **Validation**: Immediate feedback if specified IDs don't exist +4. **Efficiency**: Tab completion speeds up command construction +5. **Reliability**: Prevents typos and invalid commands +6. **Scriptability**: Programmatic access to metadata for automated workflows +7. **User Experience**: Helpful error messages with actionable suggestions +8. **Advanced Processing**: Conference call reconstruction and analysis capabilities + +## File Structure + +``` +cmd/raw-tools/ +├── main.go # Main CLI entry point and routing +├── metadata.go # Shared metadata parsing and filtering (hybrid architecture) +├── completion.go # Shell completion scripts generation +├── list_tracks.go # Discovery and completion command (optimized) +├── extract_audio.go # Audio extraction with validation +├── extract_video.go # Video extraction with validation +├── extract_track.go # Generic extraction logic (shared) +├── mix_audio.go # Multi-user audio mixing +├── mux_av.go # Audio/video synchronization and muxing +├── process_all.go # All-in-one processing workflow +└── README.md # This documentation +``` + +## Dependencies + +- **FFmpeg**: Required for media processing and conversion +- **Go 1.19+**: For building the CLI tool diff --git a/cmd/raw-recording-tools/completion.go b/cmd/raw-recording-tools/completion.go new file mode 100644 index 0000000..df0662f --- /dev/null +++ b/cmd/raw-recording-tools/completion.go @@ -0,0 +1,289 @@ +package main + +import ( + "fmt" + "os" +) + +// generateCompletion generates shell completion scripts +func generateCompletion(shell string) { + switch shell { + case "bash": + generateBashCompletion() + case "zsh": + generateZshCompletion() + case "fish": + generateFishCompletion() + default: + _, _ = fmt.Fprintf(os.Stderr, "Unsupported shell: %s\n", shell) + _, _ = fmt.Fprintf(os.Stderr, "Supported shells: bash, zsh, fish\n") + os.Exit(1) + } +} + +// generateBashCompletion generates bash completion script +func generateBashCompletion() { + script := `#!/bin/bash + +_raw_tools_completion() { + local cur prev words cword + _init_completion || return + + # Complete subcommands + if [[ $cword -eq 1 ]]; then + COMPREPLY=($(compgen -W "list-tracks extract-audio extract-video mux-av help" -- "$cur")) + return + fi + + local cmd="${words[1]}" + + case "$prev" in + --inputFile) + COMPREPLY=($(compgen -f -X "!*.zip" -- "$cur")) + return + ;; + --output) + COMPREPLY=($(compgen -d -- "$cur")) + return + ;; + --format) + case "$cmd" in + list-tracks) + COMPREPLY=($(compgen -W "table json completion users sessions tracks" -- "$cur")) + ;; + esac + return + ;; + --trackType) + COMPREPLY=($(compgen -W "audio video" -- "$cur")) + return + ;; + --userId|--sessionId|--trackId) + # Dynamic completion using list-tracks + if [[ -n "${_RAW_TOOLS_INPUT_FILE:-}" ]]; then + local completion_type="" + case "$prev" in + --userId) completion_type="users" ;; + --sessionId) completion_type="sessions" ;; + --trackId) completion_type="tracks" ;; + esac + if [[ -n "$completion_type" ]]; then + local values=$(raw-tools --inputFile "$_RAW_TOOLS_INPUT_FILE" --output /tmp list-tracks --format "$completion_type" 2>/dev/null) + COMPREPLY=($(compgen -W "$values" -- "$cur")) + fi + else + COMPREPLY=() + fi + return + ;; + esac + + # Complete global flags + local global_flags="--inputFile --inputS3 --output --verbose --help" + local cmd_flags="" + + case "$cmd" in + list-tracks) + cmd_flags="--format --trackType --completionType" + ;; + extract-audio|extract-video) + cmd_flags="--userId --sessionId --trackId --fill_gaps" + ;; + mux-av) + cmd_flags="--userId --sessionId" + ;; + esac + + COMPREPLY=($(compgen -W "$global_flags $cmd_flags" -- "$cur")) +} + +# Store input file for dynamic completion +_raw_tools_set_input_file() { + local i + for (( i=1; i < ${#COMP_WORDS[@]}; i++ )); do + if [[ "${COMP_WORDS[i]}" == "--inputFile" && i+1 < ${#COMP_WORDS[@]} ]]; then + export _RAW_TOOLS_INPUT_FILE="${COMP_WORDS[i+1]}" + break + fi + done +} + +# Hook to set input file before completion +complete -F _raw_tools_completion raw-tools + +# Wrapper to set input file +_raw_tools_wrapper() { + _raw_tools_set_input_file + _raw_tools_completion "$@" +} + +complete -F _raw_tools_wrapper raw-tools` + + fmt.Println(script) +} + +// generateZshCompletion generates zsh completion script +func generateZshCompletion() { + script := `#compdef raw-tools + +_raw_tools() { + local context state line + typeset -A opt_args + + _arguments -C \ + '1: :_raw_tools_commands' \ + '*:: :->args' + + case $state in + args) + case $words[1] in + list-tracks) + _raw_tools_list_tracks + ;; + extract-audio|extract-video) + _raw_tools_extract + ;; + mux-av) + _raw_tools_mux_av + ;; + esac + ;; + esac +} + +_raw_tools_commands() { + local commands=( + 'list-tracks:List all tracks with metadata' + 'extract-audio:Generate playable audio files' + 'extract-video:Generate playable video files' + 'mux-av:Mux audio and video tracks' + 'help:Show help' + ) + _describe 'commands' commands +} + +_raw_tools_global_args() { + _arguments \ + '--inputFile[Specify raw recording zip file]:file:_files -g "*.zip"' \ + '--inputS3[Specify raw recording zip file on S3]:s3path:' \ + '--output[Specify output directory]:directory:_directories' \ + '--verbose[Enable verbose logging]' \ + '--help[Show help]' +} + +_raw_tools_list_tracks() { + _arguments \ + '--format[Output format]:format:(table json completion users sessions tracks)' \ + '--trackType[Filter by track type]:type:(audio video)' \ + '--completionType[Completion type]:type:(users sessions tracks)' \ + '*: :_raw_tools_global_args' +} + +_raw_tools_extract() { + _arguments \ + '--userId[User ID filter]:userid:_raw_tools_complete_users' \ + '--sessionId[Session ID filter]:sessionid:_raw_tools_complete_sessions' \ + '--trackId[Track ID filter]:trackid:_raw_tools_complete_tracks' \ + '--fill_gaps[Fill gaps with silence/black frames]' \ + '*: :_raw_tools_global_args' +} + +_raw_tools_mux_av() { + _arguments \ + '--userId[User ID filter]:userid:_raw_tools_complete_users' \ + '--sessionId[Session ID filter]:sessionid:_raw_tools_complete_sessions' \ + '*: :_raw_tools_global_args' +} + +# Dynamic completion helpers +_raw_tools_complete_users() { + local input_file + for ((i=1; i <= $#words; i++)); do + if [[ $words[i] == "--inputFile" && i+1 <= $#words ]]; then + input_file=$words[i+1] + break + fi + done + + if [[ -n "$input_file" ]]; then + local users=($(raw-tools --inputFile "$input_file" --output /tmp list-tracks --format users 2>/dev/null)) + _wanted users expl 'user ID' compadd "$@" $users + else + _wanted users expl 'user ID' compadd "$@" + fi +} + +_raw_tools_complete_sessions() { + local input_file + for ((i=1; i <= $#words; i++)); do + if [[ $words[i] == "--inputFile" && i+1 <= $#words ]]; then + input_file=$words[i+1] + break + fi + done + + if [[ -n "$input_file" ]]; then + local sessions=($(raw-tools --inputFile "$input_file" --output /tmp list-tracks --format sessions 2>/dev/null)) + _wanted sessions expl 'session ID' compadd "$@" $sessions + else + _wanted sessions expl 'session ID' compadd "$@" + fi +} + +_raw_tools_complete_tracks() { + local input_file + for ((i=1; i <= $#words; i++)); do + if [[ $words[i] == "--inputFile" && i+1 <= $#words ]]; then + input_file=$words[i+1] + break + fi + done + + if [[ -n "$input_file" ]]; then + local tracks=($(raw-tools --inputFile "$input_file" --output /tmp list-tracks --format tracks 2>/dev/null)) + _wanted tracks expl 'track ID' compadd "$@" $tracks + else + _wanted tracks expl 'track ID' compadd "$@" + fi +} + +_raw_tools "$@"` + + fmt.Println(script) +} + +// generateFishCompletion generates fish completion script +func generateFishCompletion() { + script := `# Fish completion for raw-tools + +# Complete commands +complete -c raw-tools -f -n '__fish_use_subcommand' -a 'list-tracks' -d 'List all tracks with metadata' +complete -c raw-tools -f -n '__fish_use_subcommand' -a 'extract-audio' -d 'Generate playable audio files' +complete -c raw-tools -f -n '__fish_use_subcommand' -a 'extract-video' -d 'Generate playable video files' +complete -c raw-tools -f -n '__fish_use_subcommand' -a 'mux-av' -d 'Mux audio and video tracks' +complete -c raw-tools -f -n '__fish_use_subcommand' -a 'help' -d 'Show help' + +# Global options +complete -c raw-tools -l inputFile -d 'Specify raw recording zip file' -r -F +complete -c raw-tools -l inputS3 -d 'Specify raw recording zip file on S3' -r +complete -c raw-tools -l output -d 'Specify output directory' -r -a '(__fish_complete_directories)' +complete -c raw-tools -l verbose -d 'Enable verbose logging' +complete -c raw-tools -l help -d 'Show help' + +# list-tracks specific options +complete -c raw-tools -n '__fish_seen_subcommand_from list-tracks' -l format -d 'Output format' -r -a 'table json completion users sessions tracks' +complete -c raw-tools -n '__fish_seen_subcommand_from list-tracks' -l trackType -d 'Filter by track type' -r -a 'audio video' +complete -c raw-tools -n '__fish_seen_subcommand_from list-tracks' -l completionType -d 'Completion type' -r -a 'users sessions tracks' + +# extract commands specific options +complete -c raw-tools -n '__fish_seen_subcommand_from extract-audio extract-video' -l userId -d 'User ID filter' -r +complete -c raw-tools -n '__fish_seen_subcommand_from extract-audio extract-video' -l sessionId -d 'Session ID filter' -r +complete -c raw-tools -n '__fish_seen_subcommand_from extract-audio extract-video' -l trackId -d 'Track ID filter' -r +complete -c raw-tools -n '__fish_seen_subcommand_from extract-audio extract-video' -l fill_gaps -d 'Fill gaps' + +# mux-av specific options +complete -c raw-tools -n '__fish_seen_subcommand_from mux-av' -l userId -d 'User ID filter' -r +complete -c raw-tools -n '__fish_seen_subcommand_from mux-av' -l sessionId -d 'Session ID filter' -r` + + fmt.Println(script) +} diff --git a/cmd/raw-recording-tools/extract_audio.go b/cmd/raw-recording-tools/extract_audio.go new file mode 100644 index 0000000..f302ef8 --- /dev/null +++ b/cmd/raw-recording-tools/extract_audio.go @@ -0,0 +1,115 @@ +package main + +import ( + "flag" + "fmt" + "os" + + "github.com/GetStream/getstream-go/v3" + "github.com/GetStream/getstream-go/v3/processing" +) + +type ExtractAudioArgs struct { + UserID string + SessionID string + TrackID string + FillGaps bool + FixDtx bool +} + +type ExtractAudioProcess struct { + logger *getstream.DefaultLogger +} + +func NewExtractAudioProcess(logger *getstream.DefaultLogger) *ExtractAudioProcess { + return &ExtractAudioProcess{logger: logger} +} + +func (p *ExtractAudioProcess) runExtractAudio(args []string, globalArgs *GlobalArgs) { + printHelpIfAsked(args, p.printUsage) + + // Parse command-specific flags + fs := flag.NewFlagSet("extract-audio", flag.ExitOnError) + extractAudioArgs := &ExtractAudioArgs{} + fs.StringVar(&extractAudioArgs.UserID, "userId", "", "Specify a userId (empty for all)") + fs.StringVar(&extractAudioArgs.SessionID, "sessionId", "", "Specify a sessionId (empty for all)") + fs.StringVar(&extractAudioArgs.TrackID, "trackId", "", "Specify a trackId (empty for all)") + fs.BoolVar(&extractAudioArgs.FillGaps, "fill_gaps", true, "Fill with silence when track was muted (default true)") + fs.BoolVar(&extractAudioArgs.FixDtx, "fix_dtx", true, "Fix DTX shrink audio (default true)") + + if err := fs.Parse(args); err != nil { + fmt.Fprintf(os.Stderr, "Error parsing flags: %v\n", err) + os.Exit(1) + } + + // Validate input arguments against actual recording data + metadata, err := validateInputArgs(globalArgs, extractAudioArgs.UserID, extractAudioArgs.SessionID, extractAudioArgs.TrackID) + if err != nil { + fmt.Fprintf(os.Stderr, "Validation error: %v\n", err) + os.Exit(1) + } + + p.logger.Info("Starting extract-audio command") + p.printBanner(globalArgs, extractAudioArgs) + + // Implement extract audio functionality + if e := extractAudioTracks(globalArgs, extractAudioArgs, metadata, p.logger); e != nil { + p.logger.Error("Failed to extract audio: %v", e) + } + + p.logger.Info("Extract audio command completed") +} + +func (p *ExtractAudioProcess) printBanner(globalArgs *GlobalArgs, extractAudioArgs *ExtractAudioArgs) { + fmt.Printf("Extract audio command with hierarchical filtering:\n") + if globalArgs.InputFile != "" { + fmt.Printf(" Input file: %s\n", globalArgs.InputFile) + } + if globalArgs.InputS3 != "" { + fmt.Printf(" Input S3: %s\n", globalArgs.InputS3) + } + fmt.Printf(" Output directory: %s\n", globalArgs.Output) + fmt.Printf(" User ID filter: %s\n", extractAudioArgs.UserID) + + if extractAudioArgs.TrackID != "" { + fmt.Printf(" → Processing specific track '%s'\n", extractAudioArgs.TrackID) + } else if extractAudioArgs.SessionID != "" { + fmt.Printf(" → Processing all audio tracks for session '%s'\n", extractAudioArgs.SessionID) + } else if extractAudioArgs.UserID != "" { + fmt.Printf(" → Processing all audio tracks for user '%s'\n", extractAudioArgs.UserID) + } else { + fmt.Printf(" → Processing all audio tracks (no filters)\n") + } + fmt.Printf(" Fill gaps: %t\n", extractAudioArgs.FillGaps) + fmt.Printf(" Fix DTX: %t\n", extractAudioArgs.FixDtx) +} + +func (p *ExtractAudioProcess) printUsage() { + fmt.Fprintf(os.Stderr, "Usage: raw-tools [global options] extract-audio [command options]\n\n") + fmt.Fprintf(os.Stderr, "Generate playable audio files from raw recording tracks.\n") + fmt.Fprintf(os.Stderr, "Supports formats: webm, mp3, and others.\n\n") + fmt.Fprintf(os.Stderr, "Command Options (Hierarchical Filtering):\n") + fmt.Fprintf(os.Stderr, " --userId Specify a userId or * for all (default: *)\n") + fmt.Fprintf(os.Stderr, " --sessionId Specify a sessionId or * for all (default: *)\n") + fmt.Fprintf(os.Stderr, " Ignored if --userId=*\n") + fmt.Fprintf(os.Stderr, " --trackId Specify a trackId or * for all (default: *)\n") + fmt.Fprintf(os.Stderr, " Ignored if --userId=* or --sessionId=*\n") + fmt.Fprintf(os.Stderr, " --fill_gaps Fix DTX shrink audio, fill with silence when muted\n\n") + fmt.Fprintf(os.Stderr, "Hierarchical Filtering Logic:\n") + fmt.Fprintf(os.Stderr, " --userId=* → Extract ALL users, sessions, tracks (sessionId/trackId ignored)\n") + fmt.Fprintf(os.Stderr, " --userId=user1 --sessionId=* → Extract ALL sessions/tracks for user1 (trackId ignored)\n") + fmt.Fprintf(os.Stderr, " --userId=user1 --sessionId=session1 --trackId=* → Extract ALL tracks for user1/session1\n") + fmt.Fprintf(os.Stderr, " --userId=user1 --sessionId=session1 --trackId=track1 → Extract specific track\n\n") + fmt.Fprintf(os.Stderr, "Examples:\n") + fmt.Fprintf(os.Stderr, " # Extract audio for all users (sessionId/trackId ignored)\n") + fmt.Fprintf(os.Stderr, " raw-tools --inputFile recording.zip --output ./out extract-audio --userId '*'\n\n") + fmt.Fprintf(os.Stderr, " # Extract audio for specific user, all sessions (trackId ignored)\n") + fmt.Fprintf(os.Stderr, " raw-tools --inputFile recording.zip --output ./out extract-audio --userId user123 --sessionId '*'\n\n") + fmt.Fprintf(os.Stderr, " # Extract audio for specific user/session, all tracks\n") + fmt.Fprintf(os.Stderr, " raw-tools --inputFile recording.zip --output ./out extract-audio --userId user123 --sessionId session456 --trackId '*'\n\n") + fmt.Fprintf(os.Stderr, "Global Options: Use 'raw-tools --help' to see global options.\n") +} + +func extractAudioTracks(globalArgs *GlobalArgs, extractAudioArgs *ExtractAudioArgs, metadata *processing.RecordingMetadata, logger *getstream.DefaultLogger) error { + return processing.ExtractTracks(globalArgs.WorkDir, globalArgs.Output, extractAudioArgs.UserID, extractAudioArgs.SessionID, extractAudioArgs.TrackID, metadata, "audio", "both", extractAudioArgs.FillGaps, extractAudioArgs.FixDtx, logger) +} diff --git a/cmd/raw-recording-tools/extract_video.go b/cmd/raw-recording-tools/extract_video.go new file mode 100644 index 0000000..b6fb349 --- /dev/null +++ b/cmd/raw-recording-tools/extract_video.go @@ -0,0 +1,113 @@ +package main + +import ( + "flag" + "fmt" + "os" + + "github.com/GetStream/getstream-go/v3" + "github.com/GetStream/getstream-go/v3/processing" +) + +type ExtractVideoArgs struct { + UserID string + SessionID string + TrackID string + FillGaps bool +} + +type ExtractVideoProcess struct { + logger *getstream.DefaultLogger +} + +func NewExtractVideoProcess(logger *getstream.DefaultLogger) *ExtractVideoProcess { + return &ExtractVideoProcess{logger: logger} +} + +func (p *ExtractVideoProcess) runExtractVideo(args []string, globalArgs *GlobalArgs) { + printHelpIfAsked(args, p.printUsage) + + // Parse command-specific flags + fs := flag.NewFlagSet("extract-video", flag.ExitOnError) + extractVideoArgs := &ExtractVideoArgs{} + fs.StringVar(&extractVideoArgs.UserID, "userId", "", "Specify a userId (empty for all)") + fs.StringVar(&extractVideoArgs.SessionID, "sessionId", "", "Specify a sessionId (empty for all)") + fs.StringVar(&extractVideoArgs.TrackID, "trackId", "", "Specify a trackId (empty for all)") + fs.BoolVar(&extractVideoArgs.FillGaps, "fill_gaps", true, "Fill with black frame when track was muted (default true)") + + if err := fs.Parse(args); err != nil { + fmt.Fprintf(os.Stderr, "Error parsing flags: %v\n", err) + os.Exit(1) + } + + // Validate input arguments against actual recording data + metadata, err := validateInputArgs(globalArgs, extractVideoArgs.UserID, extractVideoArgs.SessionID, extractVideoArgs.TrackID) + if err != nil { + fmt.Fprintf(os.Stderr, "Validation error: %v\n", err) + os.Exit(1) + } + + p.logger.Info("Starting extract-video command") + p.printBanner(globalArgs, extractVideoArgs) + + // Extract video tracks + if e := extractVideoTracks(globalArgs, extractVideoArgs, metadata, p.logger); e != nil { + p.logger.Error("Failed to extract video tracks: %v", e) + os.Exit(1) + } + + p.logger.Info("Extract video command completed successfully") +} + +func (p *ExtractVideoProcess) printBanner(globalArgs *GlobalArgs, extractVideoArgs *ExtractVideoArgs) { + fmt.Printf("Extract video command with hierarchical filtering:\n") + if globalArgs.InputFile != "" { + fmt.Printf(" Input file: %s\n", globalArgs.InputFile) + } + if globalArgs.InputS3 != "" { + fmt.Printf(" Input S3: %s\n", globalArgs.InputS3) + } + fmt.Printf(" Output directory: %s\n", globalArgs.Output) + fmt.Printf(" User ID filter: %s\n", extractVideoArgs.UserID) + + if extractVideoArgs.TrackID != "" { + fmt.Printf(" → Processing specific track '%s'\n", extractVideoArgs.TrackID) + } else if extractVideoArgs.SessionID != "" { + fmt.Printf(" → Processing all video tracks for session '%s'\n", extractVideoArgs.SessionID) + } else if extractVideoArgs.UserID != "" { + fmt.Printf(" → Processing all video tracks for user '%s'\n", extractVideoArgs.UserID) + } else { + fmt.Printf(" → Processing all video tracks (no filters)\n") + } + fmt.Printf(" Fill gaps: %t\n", extractVideoArgs.FillGaps) +} + +func (p *ExtractVideoProcess) printUsage() { + fmt.Fprintf(os.Stderr, "Usage: raw-tools [global options] extract-video [command options]\n\n") + fmt.Fprintf(os.Stderr, "Generate playable video files from raw recording tracks.\n") + fmt.Fprintf(os.Stderr, "Supports formats: webm, mp4, and others.\n\n") + fmt.Fprintf(os.Stderr, "Command Options (Hierarchical Filtering):\n") + fmt.Fprintf(os.Stderr, " --userId Specify a userId or * for all (default: *)\n") + fmt.Fprintf(os.Stderr, " --sessionId Specify a sessionId or * for all (default: *)\n") + fmt.Fprintf(os.Stderr, " Ignored if --userId=*\n") + fmt.Fprintf(os.Stderr, " --trackId Specify a trackId or * for all (default: *)\n") + fmt.Fprintf(os.Stderr, " Ignored if --userId=* or --sessionId=*\n") + fmt.Fprintf(os.Stderr, " --fill_gaps Fill with black frame when track was muted\n\n") + fmt.Fprintf(os.Stderr, "Hierarchical Filtering Logic:\n") + fmt.Fprintf(os.Stderr, " --userId=* → Extract ALL users, sessions, tracks (sessionId/trackId ignored)\n") + fmt.Fprintf(os.Stderr, " --userId=user1 --sessionId=* → Extract ALL sessions/tracks for user1 (trackId ignored)\n") + fmt.Fprintf(os.Stderr, " --userId=user1 --sessionId=session1 --trackId=* → Extract ALL tracks for user1/session1\n") + fmt.Fprintf(os.Stderr, " --userId=user1 --sessionId=session1 --trackId=track1 → Extract specific track\n\n") + fmt.Fprintf(os.Stderr, "Examples:\n") + fmt.Fprintf(os.Stderr, " # Extract video for all users (sessionId/trackId ignored)\n") + fmt.Fprintf(os.Stderr, " raw-tools --inputFile recording.zip --output ./out extract-video --userId '*'\n\n") + fmt.Fprintf(os.Stderr, " # Extract video for specific user, all sessions (trackId ignored)\n") + fmt.Fprintf(os.Stderr, " raw-tools --inputFile recording.zip --output ./out extract-video --userId user123 --sessionId '*'\n\n") + fmt.Fprintf(os.Stderr, " # Extract video for specific user/session, all tracks\n") + fmt.Fprintf(os.Stderr, " raw-tools --inputFile recording.zip --output ./out extract-video --userId user123 --sessionId session456 --trackId '*'\n\n") + fmt.Fprintf(os.Stderr, "Global Options: Use 'raw-tools --help' to see global options.\n") +} + +func extractVideoTracks(globalArgs *GlobalArgs, extractVideoArgs *ExtractVideoArgs, metadata *processing.RecordingMetadata, logger *getstream.DefaultLogger) error { + return processing.ExtractTracks(globalArgs.WorkDir, globalArgs.Output, extractVideoArgs.UserID, extractVideoArgs.SessionID, extractVideoArgs.TrackID, metadata, "video", "both", extractVideoArgs.FillGaps, false, logger) +} diff --git a/cmd/raw-recording-tools/list_tracks.go b/cmd/raw-recording-tools/list_tracks.go new file mode 100644 index 0000000..5da416d --- /dev/null +++ b/cmd/raw-recording-tools/list_tracks.go @@ -0,0 +1,235 @@ +package main + +import ( + "encoding/json" + "flag" + "fmt" + "os" + "sort" + "strings" + + "github.com/GetStream/getstream-go/v3" + "github.com/GetStream/getstream-go/v3/processing" +) + +type ListTracksArgs struct { + Format string // "table", "json", "completion", "users", "sessions", "tracks" + TrackType string // Filter by track type: "audio", "video", or "" for all + CompletionType string // For completion format: "users", "sessions", "tracks" +} + +type ListTracksProcess struct { + logger *getstream.DefaultLogger +} + +func NewListTracksProcess(logger *getstream.DefaultLogger) *ListTracksProcess { + return &ListTracksProcess{logger: logger} +} + +func (p *ListTracksProcess) runListTracks(args []string, globalArgs *GlobalArgs) { + printHelpIfAsked(args, p.printUsage) + + // Parse command-specific flags + fs := flag.NewFlagSet("list-tracks", flag.ExitOnError) + listTracksArgs := &ListTracksArgs{} + fs.StringVar(&listTracksArgs.Format, "format", "table", "Output format: table, json, completion, users, sessions, tracks") + fs.StringVar(&listTracksArgs.TrackType, "trackType", "", "Filter by track type: audio, video") + fs.StringVar(&listTracksArgs.CompletionType, "completionType", "tracks", "For completion format: users, sessions, tracks") + + if err := fs.Parse(args); err != nil { + fmt.Fprintf(os.Stderr, "Error parsing flags: %v\n", err) + os.Exit(1) + } + + // Setup logger + logger := setupLogger(globalArgs.Verbose) + + logger.Info("Starting list-tracks command") + + // Parse the recording metadata using efficient metadata-only approach + var inputPath string + if globalArgs.InputFile != "" { + inputPath = globalArgs.InputFile + } else { + // TODO: Handle S3 input + return // For now, only support local files + } + + // Use efficient metadata-only parsing (optimized for list-tracks) + parser := processing.NewMetadataParser(logger) + metadata, err := parser.ParseMetadataOnly(inputPath) + if err != nil { + logger.Error("Failed to parse recording: %v", err) + } + + // Filter tracks if track type is specified + tracks := processing.FilterTracks(metadata.Tracks, "", "", "", listTracksArgs.TrackType, "") + + // Output in requested format + switch listTracksArgs.Format { + case "table": + p.printTracksTable(tracks) + case "json": + p.printTracksJSON(metadata) + case "completion": + p.printCompletion(metadata, listTracksArgs.CompletionType) + case "users": + p.printUsers(metadata.UserIDs) + case "sessions": + p.printSessions(metadata.Sessions) + case "tracks": + p.printTrackIDs(tracks) + default: + fmt.Fprintf(os.Stderr, "Unknown format: %s\n", listTracksArgs.Format) + os.Exit(1) + } + + logger.Info("List tracks command completed") +} + +// printTracksTable prints tracks in a human-readable table format +func (p *ListTracksProcess) printTracksTable(tracks []*processing.TrackInfo) { + if len(tracks) == 0 { + fmt.Println("No tracks found.") + return + } + + // Print header + fmt.Printf("%-22s %-38s %-38s %-6s %-12s %-15s %-8s\n", "USER ID", "SESSION ID", "TRACK ID", "TYPE", "SCREENSHARE", "CODEC", "SEGMENTS") + fmt.Printf("%-22s %-38s %-38s %-6s %-12s %-15s %-8s\n", + strings.Repeat("-", 22), + strings.Repeat("-", 38), + strings.Repeat("-", 38), + strings.Repeat("-", 6), + strings.Repeat("-", 12), + strings.Repeat("-", 15), + strings.Repeat("-", 8)) + + // Print tracks + for _, track := range tracks { + screenshareStatus := "No" + if track.IsScreenshare { + screenshareStatus = "Yes" + } + fmt.Printf("%-22s %-38s %-38s %-6s %-12s %-15s %-8d\n", + p.truncateString(track.UserID, 22), + p.truncateString(track.SessionID, 38), + p.truncateString(track.TrackID, 38), + track.TrackType, + screenshareStatus, + track.Codec, + track.SegmentCount) + } +} + +// truncateString truncates a string to a maximum length, adding "..." if needed +func (p *ListTracksProcess) truncateString(s string, maxLen int) string { + if len(s) <= maxLen { + return s + } + return s[:maxLen-3] + "..." +} + +// printTracksJSON prints the full metadata in JSON format +func (p *ListTracksProcess) printTracksJSON(metadata *processing.RecordingMetadata) { + data, err := json.MarshalIndent(metadata, "", " ") + if err != nil { + fmt.Fprintf(os.Stderr, "Error marshaling JSON: %v\n", err) + return + } + fmt.Println(string(data)) +} + +// printCompletion prints completion-friendly output +func (p *ListTracksProcess) printCompletion(metadata *processing.RecordingMetadata, completionType string) { + switch completionType { + case "users": + p.printUsers(metadata.UserIDs) + case "sessions": + p.printSessions(metadata.Sessions) + case "tracks": + trackIDs := make([]string, 0) + for _, track := range metadata.Tracks { + trackIDs = append(trackIDs, track.TrackID) + } + // Remove duplicates and sort + uniqueTrackIDs := p.removeDuplicates(trackIDs) + sort.Strings(uniqueTrackIDs) + p.printTrackIDs(metadata.Tracks) + default: + fmt.Fprintf(os.Stderr, "Unknown completion type: %s\n", completionType) + } +} + +// printUsers prints user IDs, one per line +func (p *ListTracksProcess) printUsers(userIDs []string) { + sort.Strings(userIDs) + for _, userID := range userIDs { + fmt.Println(userID) + } +} + +// printSessions prints session IDs, one per line +func (p *ListTracksProcess) printSessions(sessions []string) { + sort.Strings(sessions) + for _, session := range sessions { + fmt.Println(session) + } +} + +// printTrackIDs prints unique track IDs, one per line +func (p *ListTracksProcess) printTrackIDs(tracks []*processing.TrackInfo) { + trackIDs := make([]string, 0) + seen := make(map[string]bool) + + for _, track := range tracks { + if !seen[track.TrackID] { + trackIDs = append(trackIDs, track.TrackID) + seen[track.TrackID] = true + } + } + + sort.Strings(trackIDs) + for _, trackID := range trackIDs { + fmt.Println(trackID) + } +} + +// removeDuplicates removes duplicate strings from a slice +func (p *ListTracksProcess) removeDuplicates(input []string) []string { + keys := make(map[string]bool) + result := make([]string, 0) + + for _, item := range input { + if !keys[item] { + keys[item] = true + result = append(result, item) + } + } + + return result +} + +func (p *ListTracksProcess) printUsage() { + fmt.Fprintf(os.Stderr, "Usage: raw-tools [global options] list-tracks [command options]\n\n") + fmt.Fprintf(os.Stderr, "List all tracks in the raw recording with their metadata.\n") + fmt.Fprintf(os.Stderr, "Note: --output is optional for this command (only displays information).\n\n") + fmt.Fprintf(os.Stderr, "Command Options:\n") + fmt.Fprintf(os.Stderr, " --format Output format (default: table)\n") + fmt.Fprintf(os.Stderr, " table - Human readable table\n") + fmt.Fprintf(os.Stderr, " json - JSON format\n") + fmt.Fprintf(os.Stderr, " users - List of user IDs only\n") + fmt.Fprintf(os.Stderr, " sessions - List of session IDs only\n") + fmt.Fprintf(os.Stderr, " tracks - List of track IDs only\n") + fmt.Fprintf(os.Stderr, " completion - Shell completion format\n") + fmt.Fprintf(os.Stderr, " --trackType Filter by track type: audio, video\n") + fmt.Fprintf(os.Stderr, " --completionType For completion format: users, sessions, tracks\n\n") + fmt.Fprintf(os.Stderr, "Examples:\n") + fmt.Fprintf(os.Stderr, " # List all tracks in table format (no output directory needed)\n") + fmt.Fprintf(os.Stderr, " raw-tools --inputFile recording.zip list-tracks\n\n") + fmt.Fprintf(os.Stderr, " # Get JSON output for programmatic use\n") + fmt.Fprintf(os.Stderr, " raw-tools --inputFile recording.zip list-tracks --format json\n\n") + fmt.Fprintf(os.Stderr, " # Get user IDs for completion\n") + fmt.Fprintf(os.Stderr, " raw-tools --inputFile recording.zip list-tracks --format users\n") + fmt.Fprintf(os.Stderr, "\nGlobal Options: Use 'raw-tools --help' to see global options.\n") +} diff --git a/cmd/raw-recording-tools/main.go b/cmd/raw-recording-tools/main.go new file mode 100644 index 0000000..907cf10 --- /dev/null +++ b/cmd/raw-recording-tools/main.go @@ -0,0 +1,306 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os" + + "github.com/GetStream/getstream-go/v3" + "github.com/GetStream/getstream-go/v3/processing" +) + +type GlobalArgs struct { + InputFile string + InputS3 string + Output string + Verbose bool + + WorkDir string +} + +func main() { + if len(os.Args) < 2 { + printGlobalUsage() + os.Exit(1) + } + + // Parse global flags first + globalArgs := &GlobalArgs{} + command, remainingArgs := parseGlobalFlags(os.Args[1:], globalArgs) + + if command == "" { + printGlobalUsage() + os.Exit(1) + } + + // Setup logger + logger := setupLogger(globalArgs.Verbose) + + switch command { + case "list-tracks": + p := NewListTracksProcess(logger) + p.runListTracks(remainingArgs, globalArgs) + case "completion": + runCompletion(remainingArgs) + case "help", "-h", "--help": + printGlobalUsage() + default: + if e := processCommand(command, globalArgs, remainingArgs, logger); e != nil { + logger.Error("Error processing command %s - %v", command, e) + os.Exit(1) + } + } +} + +func processCommand(command string, globalArgs *GlobalArgs, remainingArgs []string, logger *getstream.DefaultLogger) error { + // Extract to temp directory if needed (unified approach) + workingDir, cleanup, err := processing.ExtractToTempDir(globalArgs.InputFile, logger) + if err != nil { + return fmt.Errorf("failed to prepare working directory: %w", err) + } + defer cleanup() + globalArgs.WorkDir = workingDir + + // Create output directory if it doesn't exist + if e := os.MkdirAll(globalArgs.Output, 0755); e != nil { + return fmt.Errorf("failed to create output directory: %w", err) + } + + switch command { + case "extract-audio": + p := NewExtractAudioProcess(logger) + p.runExtractAudio(remainingArgs, globalArgs) + case "extract-video": + p := NewExtractVideoProcess(logger) + p.runExtractVideo(remainingArgs, globalArgs) + case "mux-av": + p := NewMuxAudioVideoProcess(logger) + p.runMuxAV(remainingArgs, globalArgs) + case "mix-audio": + p := NewMixAudioProcess(logger) + p.runMixAudio(remainingArgs, globalArgs) + case "process-all": + p := NewProcessAllProcess(logger) + p.runProcessAll(remainingArgs, globalArgs) + default: + fmt.Fprintf(os.Stderr, "Unknown command: %s\n", command) + printGlobalUsage() + os.Exit(1) + } + + return nil +} + +// parseGlobalFlags parses global flags and returns the command and remaining args +func parseGlobalFlags(args []string, globalArgs *GlobalArgs) (string, []string) { + fs := flag.NewFlagSet("global", flag.ContinueOnError) + + fs.StringVar(&globalArgs.InputFile, "inputFile", "", "Specify raw recording zip file on file system") + fs.StringVar(&globalArgs.InputS3, "inputS3", "", "Specify raw recording zip file on S3") + fs.StringVar(&globalArgs.Output, "output", "", "Specify an output directory") + fs.BoolVar(&globalArgs.Verbose, "verbose", false, "Enable verbose logging") + + // Find the command by looking for known commands + knownCommands := map[string]bool{ + "list-tracks": true, + "extract-audio": true, + "extract-video": true, + "mux-av": true, + "mix-audio": true, + "process-all": true, + "completion": true, + "help": true, + } + + commandIndex := -1 + for i, arg := range args { + if knownCommands[arg] { + commandIndex = i + break + } + } + + if commandIndex == -1 { + return "", nil + } + + // Parse global flags (everything before the command) + globalFlags := args[:commandIndex] + command := args[commandIndex] + remainingArgs := args[commandIndex+1:] + + err := fs.Parse(globalFlags) + if err != nil { + fmt.Fprintf(os.Stderr, "Error parsing global flags: %v\n", err) + os.Exit(1) + } + + // Validate global arguments + if e := validateGlobalArgs(globalArgs, command); e != nil { + fmt.Fprintf(os.Stderr, "Error: %v\n", e) + printGlobalUsage() + os.Exit(1) + } + + return command, remainingArgs +} + +func setupLogger(verbose bool) *getstream.DefaultLogger { + var level getstream.LogLevel + if verbose { + level = getstream.LogLevelDebug + } else { + level = getstream.LogLevelInfo + } + logger := getstream.NewDefaultLogger(os.Stderr, "", log.LstdFlags, level) + return logger +} + +func validateGlobalArgs(globalArgs *GlobalArgs, command string) error { + if globalArgs.InputFile == "" && globalArgs.InputS3 == "" { + return fmt.Errorf("either --inputFile or --inputS3 must be specified") + } + + if globalArgs.InputFile != "" && globalArgs.InputS3 != "" { + return fmt.Errorf("cannot specify both --inputFile and --inputS3") + } + + // --output is optional for list-tracks command (it only displays information) + if command != "list-tracks" && globalArgs.Output == "" { + return fmt.Errorf("--output directory must be specified") + } + + return nil +} + +// validateInputArgs validates input arguments using mutually exclusive logic +func validateInputArgs(globalArgs *GlobalArgs, userID, sessionID, trackID string) (*processing.RecordingMetadata, error) { + // Count how many filters are specified + filtersCount := 0 + if userID != "" { + filtersCount++ + } + if sessionID != "" { + filtersCount++ + } + if trackID != "" { + filtersCount++ + } + + // Ensure filters are mutually exclusive + if filtersCount > 1 { + return nil, fmt.Errorf("only one filter can be specified at a time: --userId, --sessionId, and --trackId are mutually exclusive") + } + + var inputPath string + if globalArgs.InputFile != "" { + inputPath = globalArgs.InputFile + } else { + // TODO: Handle S3 validation + return nil, fmt.Errorf("Not implemented for now") + } + + // Parse metadata to validate the single specified argument + logger := setupLogger(false) // Use non-verbose for validation + parser := processing.NewMetadataParser(logger) + metadata, err := parser.ParseMetadataOnly(inputPath) + if err != nil { + return nil, fmt.Errorf("failed to parse recording for validation: %w", err) + } + + // If no filters specified, no validation needed + if filtersCount == 0 { + return metadata, nil + } + + // Validate the single specified filter + if trackID != "" { + found := false + for _, track := range metadata.Tracks { + if track.TrackID == trackID { + found = true + break + } + } + if !found { + return nil, fmt.Errorf("trackID '%s' not found in recording. Use 'list-tracks --format tracks' to see available track IDs", trackID) + } + } else if sessionID != "" { + found := false + for _, track := range metadata.Tracks { + if track.SessionID == sessionID { + found = true + break + } + } + if !found { + return nil, fmt.Errorf("sessionID '%s' not found in recording. Use 'list-tracks --format sessions' to see available session IDs", sessionID) + } + } else if userID != "" { + found := false + for _, uid := range metadata.UserIDs { + if uid == userID { + found = true + break + } + } + if !found { + return nil, fmt.Errorf("userID '%s' not found in recording. Use 'list-tracks --format users' to see available user IDs", userID) + } + } + + return metadata, nil +} + +func printGlobalUsage() { + fmt.Fprintf(os.Stderr, "Raw Recording Post Processing Tools\n\n") + fmt.Fprintf(os.Stderr, "Usage: %s [global options] [command options]\n\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "Global Options:\n") + fmt.Fprintf(os.Stderr, " --inputFile Specify raw recording zip file on file system\n") + fmt.Fprintf(os.Stderr, " --inputS3 Specify raw recording zip file on S3\n") + fmt.Fprintf(os.Stderr, " --output Specify an output directory (optional for list-tracks)\n") + fmt.Fprintf(os.Stderr, " --verbose Enable verbose logging\n\n") + fmt.Fprintf(os.Stderr, "Commands:\n") + fmt.Fprintf(os.Stderr, " list-tracks Return list of userId - sessionId - trackId - trackType\n") + fmt.Fprintf(os.Stderr, " extract-audio Generate a playable audio file (webm, mp3, ...)\n") + fmt.Fprintf(os.Stderr, " extract-video Generate a playable video file (webm, mp4, ...)\n") + fmt.Fprintf(os.Stderr, " mux-av Mux audio and video tracks\n") + fmt.Fprintf(os.Stderr, " mix-audio Mix multiple audio tracks into one file\n") + fmt.Fprintf(os.Stderr, " process-all Process audio, video, and mux (all-in-one)\n") + fmt.Fprintf(os.Stderr, " completion Generate shell completion scripts\n") + fmt.Fprintf(os.Stderr, " help Show this help message\n\n") + fmt.Fprintf(os.Stderr, "Examples:\n") + fmt.Fprintf(os.Stderr, " %s --inputFile recording.zip list-tracks\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " %s --inputFile recording.zip --output ./out extract-audio --userId user123\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " %s --inputFile recording.zip --output ./out mix-audio --userId '*'\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " %s --verbose --inputFile recording.zip --output ./out mux-av --userId '*'\n\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "Use '%s [global options] --help' for command-specific options.\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "\nCompletion Setup:\n") + fmt.Fprintf(os.Stderr, " # Bash\n") + fmt.Fprintf(os.Stderr, " source <(%s completion bash)\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " # Zsh\n") + fmt.Fprintf(os.Stderr, " source <(%s completion zsh)\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " # Fish\n") + fmt.Fprintf(os.Stderr, " %s completion fish | source\n", os.Args[0]) +} + +func printHelpIfAsked(args []string, fn func()) { + // Check for help flag before parsing + for _, arg := range args { + if arg == "--help" || arg == "-h" { + fn() + os.Exit(0) + } + } +} +func runCompletion(args []string) { + if len(args) == 0 { + fmt.Fprintf(os.Stderr, "Usage: raw-tools completion \n") + fmt.Fprintf(os.Stderr, "Supported shells: bash, zsh, fish\n") + os.Exit(1) + } + + shell := args[0] + generateCompletion(shell) +} diff --git a/cmd/raw-recording-tools/mix_audio.go b/cmd/raw-recording-tools/mix_audio.go new file mode 100644 index 0000000..2fe3912 --- /dev/null +++ b/cmd/raw-recording-tools/mix_audio.go @@ -0,0 +1,90 @@ +package main + +import ( + "fmt" + "os" + + "github.com/GetStream/getstream-go/v3" + "github.com/GetStream/getstream-go/v3/processing" +) + +// MixAudioArgs represents the arguments for the mix-audio command +type MixAudioArgs struct { + IncludeScreenShare bool +} + +type MixAudioProcess struct { + logger *getstream.DefaultLogger +} + +func NewMixAudioProcess(logger *getstream.DefaultLogger) *MixAudioProcess { + return &MixAudioProcess{logger: logger} +} + +// runMixAudio handles the mix-audio command +func (p *MixAudioProcess) runMixAudio(args []string, globalArgs *GlobalArgs) { + printHelpIfAsked(args, p.printUsage) + + mixAudioArgs := &MixAudioArgs{ + IncludeScreenShare: false, + } + + // Validate input arguments against actual recording data + metadata, err := validateInputArgs(globalArgs, "", "", "") + if err != nil { + fmt.Fprintf(os.Stderr, "Validation error: %v\n", err) + os.Exit(1) + } + + p.logger.Info("Starting mix-audio command") + + // Execute the mix-audio operation + if e := p.mixAllAudioTracks(globalArgs, mixAudioArgs, metadata, p.logger); e != nil { + p.logger.Error("Mix-audio failed: %v", e) + os.Exit(1) + } + + p.logger.Info("Mix-audio command completed successfully") +} + +// mixAllAudioTracks orchestrates the entire audio mixing workflow using existing extraction logic +func (p *MixAudioProcess) mixAllAudioTracks(globalArgs *GlobalArgs, mixAudioArgs *MixAudioArgs, metadata *processing.RecordingMetadata, logger *getstream.DefaultLogger) error { + mixer := processing.NewAudioMixer(logger) + mixer.MixAllAudioTracks(&processing.AudioMixerConfig{ + WorkDir: globalArgs.WorkDir, + OutputDir: globalArgs.Output, + WithScreenshare: false, + WithExtract: true, + WithCleanup: false, + }, metadata, logger) + return nil +} + +// printMixAudioUsage prints the usage information for the mix-audio command +func (p *MixAudioProcess) printUsage() { + fmt.Println("Usage: raw-tools [global-options] mix-audio [options]") + fmt.Println() + fmt.Println("Mix all audio tracks from multiple users/sessions into a single audio file") + fmt.Println("with proper timing synchronization (like a conference call recording).") + fmt.Println() + fmt.Println("Options:") + fmt.Println(" --userId Filter by user ID (* for all users, default: *)") + fmt.Println(" --sessionId Filter by session ID (* for all sessions, default: *)") + fmt.Println(" --trackId Filter by track ID (* for all tracks, default: *)") + fmt.Println(" --no-fill-gaps Don't fill gaps with silence (not recommended for mixing)") + fmt.Println(" -h, --help Show this help message") + fmt.Println() + fmt.Println("Examples:") + fmt.Println(" # Mix all audio tracks from all users and sessions") + fmt.Println(" raw-tools --inputFile recording.tar.gz --output /tmp/mixed mix-audio") + fmt.Println() + fmt.Println(" # Mix audio tracks from a specific user") + fmt.Println(" raw-tools --inputFile recording.tar.gz --output /tmp/mixed mix-audio --userId user123") + fmt.Println() + fmt.Println(" # Mix audio tracks from a specific session") + fmt.Println(" raw-tools --inputFile recording.tar.gz --output /tmp/mixed mix-audio --sessionId session456") + fmt.Println() + fmt.Println("Output:") + fmt.Println(" Creates 'mixed_audio.webm' - a single audio file containing all mixed tracks") + fmt.Println(" with proper timing synchronization based on the original recording timeline.") +} diff --git a/cmd/raw-recording-tools/mux_av.go b/cmd/raw-recording-tools/mux_av.go new file mode 100644 index 0000000..4493933 --- /dev/null +++ b/cmd/raw-recording-tools/mux_av.go @@ -0,0 +1,109 @@ +package main + +import ( + "flag" + "fmt" + "os" + + "github.com/GetStream/getstream-go/v3" + "github.com/GetStream/getstream-go/v3/processing" +) + +type MuxAVArgs struct { + UserID string + SessionID string + TrackID string + Media string // "user", "display", or "both" (default) +} + +type MuxAudioVideoProcess struct { + logger *getstream.DefaultLogger +} + +func NewMuxAudioVideoProcess(logger *getstream.DefaultLogger) *MuxAudioVideoProcess { + return &MuxAudioVideoProcess{logger: logger} +} + +func (p *MuxAudioVideoProcess) runMuxAV(args []string, globalArgs *GlobalArgs) { + printHelpIfAsked(args, p.printUsage) + + // Parse command-specific flags + fs := flag.NewFlagSet("mux-av", flag.ExitOnError) + muxAVArgs := &MuxAVArgs{} + fs.StringVar(&muxAVArgs.UserID, "userId", "", "Specify a userId (empty for all)") + fs.StringVar(&muxAVArgs.SessionID, "sessionId", "", "Specify a sessionId (empty for all)") + fs.StringVar(&muxAVArgs.TrackID, "trackId", "", "Specify a trackId (empty for all)") + fs.StringVar(&muxAVArgs.Media, "media", "both", "Filter by media type: 'user', 'display', or 'both'") + + if err := fs.Parse(args); err != nil { + fmt.Fprintf(os.Stderr, "Error parsing flags: %v\n", err) + os.Exit(1) + } + + // Validate input arguments against actual recording data + metadata, err := validateInputArgs(globalArgs, muxAVArgs.UserID, muxAVArgs.SessionID, muxAVArgs.TrackID) + if err != nil { + fmt.Fprintf(os.Stderr, "Validation error: %v\n", err) + os.Exit(1) + } + + p.logger.Info("Starting mux-av command") + + // Display hierarchy information for user clarity + fmt.Printf("Mux audio and video command with hierarchical filtering:\n") + fmt.Printf(" Input file: %s\n", globalArgs.InputFile) + fmt.Printf(" Output directory: %s\n", globalArgs.Output) + fmt.Printf(" User ID filter: %s\n", muxAVArgs.UserID) + fmt.Printf(" Session ID filter: %s\n", muxAVArgs.SessionID) + fmt.Printf(" Track ID filter: %s\n", muxAVArgs.TrackID) + fmt.Printf(" Media filter: %s\n", muxAVArgs.Media) + + if muxAVArgs.TrackID != "" { + fmt.Printf(" → Processing specific track '%s'\n", muxAVArgs.TrackID) + } else if muxAVArgs.SessionID != "" { + fmt.Printf(" → Processing all tracks for session '%s'\n", muxAVArgs.SessionID) + } else if muxAVArgs.UserID != "" { + fmt.Printf(" → Processing all tracks for user '%s'\n", muxAVArgs.UserID) + } else { + fmt.Printf(" → Processing all tracks (no filters)\n") + } + + // Extract and mux audio/video tracks + if err := p.muxAudioVideoTracks(globalArgs, muxAVArgs, metadata, p.logger); err != nil { + p.logger.Error("Failed to mux audio/video tracks: %v", err) + os.Exit(1) + } + + p.logger.Info("Mux audio and video command completed successfully") +} + +func (p *MuxAudioVideoProcess) printUsage() { + fmt.Printf("Usage: mux-av [OPTIONS]\n") + fmt.Printf("\nMux audio and video tracks into a single file\n") + fmt.Printf("\nOptions:\n") + fmt.Printf(" --userId STRING Specify a userId or * for all (default: \"*\")\n") + fmt.Printf(" --sessionId STRING Specify a sessionId or * for all (default: \"*\")\n") + fmt.Printf(" --trackId STRING Specify a trackId or * for all (default: \"*\")\n") + fmt.Printf(" --media STRING Filter by media type: 'user', 'display', or 'both' (default: \"both\")\n") + fmt.Printf("\nMedia Filtering:\n") + fmt.Printf(" --media user Only mux user camera audio/video pairs\n") + fmt.Printf(" --media display Only mux display sharing audio/video pairs\n") + fmt.Printf(" --media both Mux both types, but ensure consistent pairing (default)\n") +} + +func (p *MuxAudioVideoProcess) muxAudioVideoTracks(globalArgs *GlobalArgs, muxAVArgs *MuxAVArgs, metadata *processing.RecordingMetadata, logger *getstream.DefaultLogger) error { + muxer := processing.NewAudioVideoMuxer(p.logger) + if e := muxer.MuxAudioVideoTracks(&processing.AudioVideoMuxerConfig{ + WorkDir: globalArgs.WorkDir, + OutputDir: globalArgs.Output, + UserID: muxAVArgs.UserID, + SessionID: muxAVArgs.SessionID, + TrackID: muxAVArgs.TrackID, + Media: muxAVArgs.Media, + WithExtract: true, + WithCleanup: false, + }, metadata, logger); e != nil { + return e + } + return nil +} diff --git a/cmd/raw-recording-tools/process_all.go b/cmd/raw-recording-tools/process_all.go new file mode 100644 index 0000000..3bf43ff --- /dev/null +++ b/cmd/raw-recording-tools/process_all.go @@ -0,0 +1,127 @@ +package main + +import ( + "flag" + "fmt" + "os" + + "github.com/GetStream/getstream-go/v3" + "github.com/GetStream/getstream-go/v3/processing" +) + +type ProcessAllArgs struct { + UserID string + SessionID string + TrackID string +} + +type ProcessAllProcess struct { + logger *getstream.DefaultLogger +} + +func NewProcessAllProcess(logger *getstream.DefaultLogger) *ProcessAllProcess { + return &ProcessAllProcess{logger: logger} +} + +func (p *ProcessAllProcess) runProcessAll(args []string, globalArgs *GlobalArgs) { + printHelpIfAsked(args, p.printUsage) + + // Parse command-specific flags + fs := flag.NewFlagSet("process-all", flag.ExitOnError) + processAllArgs := &ProcessAllArgs{} + fs.StringVar(&processAllArgs.UserID, "userId", "", "Specify a userId (empty for all)") + fs.StringVar(&processAllArgs.SessionID, "sessionId", "", "Specify a sessionId (empty for all)") + fs.StringVar(&processAllArgs.TrackID, "trackId", "", "Specify a trackId (empty for all)") + + if err := fs.Parse(args); err != nil { + fmt.Fprintf(os.Stderr, "Error parsing flags: %v\n", err) + os.Exit(1) + } + + // Validate input arguments against actual recording data + metadata, err := validateInputArgs(globalArgs, processAllArgs.UserID, processAllArgs.SessionID, processAllArgs.TrackID) + if err != nil { + fmt.Fprintf(os.Stderr, "Validation error: %v\n", err) + os.Exit(1) + } + + p.logger.Info("Starting process-all command") + + // Display hierarchy information for user clarity + fmt.Printf("Process-all command (audio + video + mux) with hierarchical filtering:\n") + fmt.Printf(" Input file: %s\n", globalArgs.InputFile) + fmt.Printf(" Output directory: %s\n", globalArgs.Output) + fmt.Printf(" User ID filter: %s\n", processAllArgs.UserID) + fmt.Printf(" Session ID filter: %s\n", processAllArgs.SessionID) + fmt.Printf(" Track ID filter: %s\n", processAllArgs.TrackID) + fmt.Printf(" Gap filling: always enabled\n") + + if processAllArgs.TrackID != "" { + fmt.Printf(" → Processing specific track '%s'\n", processAllArgs.TrackID) + } else if processAllArgs.SessionID != "" { + fmt.Printf(" → Processing all tracks for session '%s'\n", processAllArgs.SessionID) + } else if processAllArgs.UserID != "" { + fmt.Printf(" → Processing all tracks for user '%s'\n", processAllArgs.UserID) + } else { + fmt.Printf(" → Processing all tracks (no filters)\n") + } + + // Process all tracks and mux them + if err := p.processAllTracks(globalArgs, processAllArgs, metadata, p.logger); err != nil { + p.logger.Error("Failed to process and mux tracks: %v", err) + os.Exit(1) + } + + p.logger.Info("Process-all command completed successfully") +} + +func (p *ProcessAllProcess) printUsage() { + fmt.Printf("Usage: process-all [OPTIONS]\n") + fmt.Printf("\nProcess audio, video, and mux them into combined files (all-in-one workflow)\n") + fmt.Printf("Outputs 3 files per session: audio WebM, video WebM, and muxed WebM\n") + fmt.Printf("Gap filling is always enabled for seamless playback.\n") + fmt.Printf("\nOptions:\n") + fmt.Printf(" --userId STRING Specify a userId or * for all (default: \"*\")\n") + fmt.Printf(" --sessionId STRING Specify a sessionId or * for all (default: \"*\")\n") + fmt.Printf(" --trackId STRING Specify a trackId or * for all (default: \"*\")\n") + fmt.Printf("\nOutput files per session:\n") + fmt.Printf(" audio_{userId}_{sessionId}_{trackId}.webm - Audio-only file\n") + fmt.Printf(" video_{userId}_{sessionId}_{trackId}.webm - Video-only file\n") + fmt.Printf(" muxed_{userId}_{sessionId}_{trackId}.webm - Combined audio+video file\n") +} + +func (p *ProcessAllProcess) processAllTracks(globalArgs *GlobalArgs, processAllArgs *ProcessAllArgs, metadata *processing.RecordingMetadata, logger *getstream.DefaultLogger) error { + + if e := processing.ExtractTracks(globalArgs.WorkDir, globalArgs.Output, "", "", "", metadata, "audio", "both", true, true, logger); e != nil { + return e + } + + if e := processing.ExtractTracks(globalArgs.WorkDir, globalArgs.Output, "", "", "", metadata, "video", "both", true, true, logger); e != nil { + return e + } + + mixer := processing.NewAudioMixer(logger) + mixer.MixAllAudioTracks(&processing.AudioMixerConfig{ + WorkDir: globalArgs.WorkDir, + OutputDir: globalArgs.Output, + WithScreenshare: false, + WithExtract: false, + WithCleanup: false, + }, metadata, logger) + + muxer := processing.NewAudioVideoMuxer(p.logger) + if e := muxer.MuxAudioVideoTracks(&processing.AudioVideoMuxerConfig{ + WorkDir: globalArgs.WorkDir, + OutputDir: globalArgs.Output, + UserID: "", + SessionID: "", + TrackID: "", + Media: "", + WithExtract: false, + WithCleanup: false, + }, metadata, logger); e != nil { + return e + } + + return nil +} diff --git a/go.mod b/go.mod index c0ff2e8..564ec16 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,36 @@ module github.com/GetStream/getstream-go/v3 -go 1.19 +go 1.21 + +toolchain go1.24.4 require ( github.com/golang-jwt/jwt/v5 v5.2.1 github.com/joho/godotenv v1.5.1 - github.com/stretchr/testify v1.9.0 + github.com/pion/rtcp v1.2.15 + github.com/pion/rtp v1.8.23 + github.com/pion/webrtc/v4 v4.1.5 + github.com/stretchr/testify v1.11.1 +) + +require ( + github.com/pion/datachannel v1.5.10 // indirect + github.com/pion/dtls/v3 v3.0.7 // indirect + github.com/pion/ice/v4 v4.0.10 // indirect + github.com/pion/interceptor v0.1.41 // indirect + github.com/pion/logging v0.2.4 // indirect + github.com/pion/mdns/v2 v2.0.7 // indirect + github.com/pion/randutil v0.1.0 // indirect + github.com/pion/sctp v1.8.39 // indirect + github.com/pion/sdp/v3 v3.0.16 // indirect + github.com/pion/srtp/v3 v3.0.8 // indirect + github.com/pion/stun/v3 v3.0.0 // indirect + github.com/pion/transport/v3 v3.0.8 // indirect + github.com/pion/turn/v4 v4.1.1 // indirect + github.com/wlynxg/anet v0.0.5 // indirect + golang.org/x/crypto v0.33.0 // indirect + golang.org/x/net v0.35.0 // indirect + golang.org/x/sys v0.30.0 // indirect ) require ( diff --git a/go.sum b/go.sum index 82f3ce6..462c262 100644 --- a/go.sum +++ b/go.sum @@ -6,11 +6,56 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pion/datachannel v1.5.10 h1:ly0Q26K1i6ZkGf42W7D4hQYR90pZwzFOjTq5AuCKk4o= +github.com/pion/datachannel v1.5.10/go.mod h1:p/jJfC9arb29W7WrxyKbepTU20CFgyx5oLo8Rs4Py/M= +github.com/pion/dtls/v3 v3.0.7 h1:bItXtTYYhZwkPFk4t1n3Kkf5TDrfj6+4wG+CZR8uI9Q= +github.com/pion/dtls/v3 v3.0.7/go.mod h1:uDlH5VPrgOQIw59irKYkMudSFprY9IEFCqz/eTz16f8= +github.com/pion/ice/v4 v4.0.10 h1:P59w1iauC/wPk9PdY8Vjl4fOFL5B+USq1+xbDcN6gT4= +github.com/pion/ice/v4 v4.0.10/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw= +github.com/pion/interceptor v0.1.41 h1:NpvX3HgWIukTf2yTBVjVGFXtpSpWgXjqz7IIpu7NsOw= +github.com/pion/interceptor v0.1.41/go.mod h1:nEt4187unvRXJFyjiw00GKo+kIuXMWQI9K89fsosDLY= +github.com/pion/logging v0.2.4 h1:tTew+7cmQ+Mc1pTBLKH2puKsOvhm32dROumOZ655zB8= +github.com/pion/logging v0.2.4/go.mod h1:DffhXTKYdNZU+KtJ5pyQDjvOAh/GsNSyv1lbkFbe3so= +github.com/pion/mdns/v2 v2.0.7 h1:c9kM8ewCgjslaAmicYMFQIde2H9/lrZpjBkN8VwoVtM= +github.com/pion/mdns/v2 v2.0.7/go.mod h1:vAdSYNAT0Jy3Ru0zl2YiW3Rm/fJCwIeM0nToenfOJKA= +github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= +github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= +github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo= +github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0= +github.com/pion/rtp v1.8.23 h1:kxX3bN4nM97DPrVBGq5I/Xcl332HnTHeP1Swx3/MCnU= +github.com/pion/rtp v1.8.23/go.mod h1:rF5nS1GqbR7H/TCpKwylzeq6yDM+MM6k+On5EgeThEM= +github.com/pion/sctp v1.8.39 h1:PJma40vRHa3UTO3C4MyeJDQ+KIobVYRZQZ0Nt7SjQnE= +github.com/pion/sctp v1.8.39/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE= +github.com/pion/sdp/v3 v3.0.16 h1:0dKzYO6gTAvuLaAKQkC02eCPjMIi4NuAr/ibAwrGDCo= +github.com/pion/sdp/v3 v3.0.16/go.mod h1:9tyKzznud3qiweZcD86kS0ff1pGYB3VX+Bcsmkx6IXo= +github.com/pion/srtp/v3 v3.0.8 h1:RjRrjcIeQsilPzxvdaElN0CpuQZdMvcl9VZ5UY9suUM= +github.com/pion/srtp/v3 v3.0.8/go.mod h1:2Sq6YnDH7/UDCvkSoHSDNDeyBcFgWL0sAVycVbAsXFg= +github.com/pion/stun/v3 v3.0.0 h1:4h1gwhWLWuZWOJIJR9s2ferRO+W3zA/b6ijOI6mKzUw= +github.com/pion/stun/v3 v3.0.0/go.mod h1:HvCN8txt8mwi4FBvS3EmDghW6aQJ24T+y+1TKjB5jyU= +github.com/pion/transport/v3 v3.0.8 h1:oI3myyYnTKUSTthu/NZZ8eu2I5sHbxbUNNFW62olaYc= +github.com/pion/transport/v3 v3.0.8/go.mod h1:+c2eewC5WJQHiAA46fkMMzoYZSuGzA/7E2FPrOYHctQ= +github.com/pion/turn/v4 v4.1.1 h1:9UnY2HB99tpDyz3cVVZguSxcqkJ1DsTSZ+8TGruh4fc= +github.com/pion/turn/v4 v4.1.1/go.mod h1:2123tHk1O++vmjI5VSD0awT50NywDAq5A2NNNU4Jjs8= +github.com/pion/webrtc/v4 v4.1.5 h1:hJqfKPdRAVcXV9rsg2xcCiuXuMJ38BLW/87GsYJUtUU= +github.com/pion/webrtc/v4 v4.1.5/go.mod h1:vzHh7egVnZRgkK83lYzciWVszdDs759y3/eyu6AvZRA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/wlynxg/anet v0.0.5 h1:J3VJGi1gvo0JwZ/P1/Yc/8p63SoW98B5dHkYDmpgvvU= +github.com/wlynxg/anet v0.0.5/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA= +golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= +golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= +golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= +golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= +golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= +golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/processing/archive_input.go b/processing/archive_input.go new file mode 100644 index 0000000..5f61d8d --- /dev/null +++ b/processing/archive_input.go @@ -0,0 +1,102 @@ +package processing + +import ( + "archive/tar" + "compress/gzip" + "fmt" + "io" + "os" + "path/filepath" + "strings" + + "github.com/GetStream/getstream-go/v3" +) + +// extractToTempDir extracts archive to temp directory or returns the directory path +// Returns: (workingDir, cleanupFunc, error) +func ExtractToTempDir(inputPath string, logger *getstream.DefaultLogger) (string, func(), error) { + // If it's already a directory, just return it + if stat, err := os.Stat(inputPath); err == nil && stat.IsDir() { + logger.Debug("Input is already a directory: %s", inputPath) + return inputPath, func() {}, nil + } + + // If it's a tar.gz file, extract it to temp directory + if strings.HasSuffix(strings.ToLower(inputPath), ".tar.gz") { + logger.Info("Extracting tar.gz archive to temporary directory...") + + tempDir, err := os.MkdirTemp("", "raw-tools-*") + if err != nil { + return "", nil, fmt.Errorf("failed to create temp directory: %w", err) + } + + cleanup := func() { + os.RemoveAll(tempDir) + } + + err = extractTarGzToDir(inputPath, tempDir, logger) + if err != nil { + cleanup() + return "", nil, fmt.Errorf("failed to extract tar.gz: %w", err) + } + + logger.Debug("Extracted archive to: %s", tempDir) + return tempDir, cleanup, nil + } + + return "", nil, fmt.Errorf("unsupported input format: %s (only tar.gz files and directories supported)", inputPath) +} + +// extractTarGzToDir extracts a tar.gz file to the specified directory +func extractTarGzToDir(tarGzPath, destDir string, logger *getstream.DefaultLogger) error { + file, err := os.Open(tarGzPath) + if err != nil { + return fmt.Errorf("failed to open tar.gz file: %w", err) + } + defer file.Close() + + gzReader, err := gzip.NewReader(file) + if err != nil { + return fmt.Errorf("failed to create gzip reader: %w", err) + } + defer gzReader.Close() + + tarReader := tar.NewReader(gzReader) + + for { + header, err := tarReader.Next() + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("failed to read tar entry: %w", err) + } + + // Skip directories + if header.FileInfo().IsDir() { + continue + } + + // Create destination file + destPath := filepath.Join(destDir, header.Name) + + // Create directory structure if needed + if err := os.MkdirAll(filepath.Dir(destPath), 0755); err != nil { + return fmt.Errorf("failed to create directory structure: %w", err) + } + + // Extract file + outFile, err := os.Create(destPath) + if err != nil { + return fmt.Errorf("failed to create file %s: %w", destPath, err) + } + + _, err = io.Copy(outFile, tarReader) + outFile.Close() + if err != nil { + return fmt.Errorf("failed to extract file %s: %w", destPath, err) + } + } + + return nil +} diff --git a/processing/archive_json.go b/processing/archive_json.go new file mode 100644 index 0000000..85dfc81 --- /dev/null +++ b/processing/archive_json.go @@ -0,0 +1,30 @@ +package processing + +type SessionTimingMetadata struct { + ParticipantID string `json:"participant_id"` + UserSessionID string `json:"user_session_id"` + Segments struct { + Audio []*SegmentMetadata `json:"audio"` + Video []*SegmentMetadata `json:"video"` + } `json:"segments"` +} + +type SegmentMetadata struct { + // Global information + BaseFilename string `json:"base_filename"` + + // Track information + Codec string `json:"codec"` + TrackID string `json:"track_id"` + TrackType string `json:"track_type"` + + // Packet timing information + FirstRtpRtpTimestamp uint32 `json:"first_rtp_rtp_timestamp"` + FirstRtpUnixTimestamp int64 `json:"first_rtp_unix_timestamp"` + LastRtpRtpTimestamp uint32 `json:"last_rtp_rtp_timestamp,omitempty"` + LastRtpUnixTimestamp int64 `json:"last_rtp_unix_timestamp,omitempty"` + FirstRtcpRtpTimestamp uint32 `json:"first_rtcp_rtp_timestamp,omitempty"` + FirstRtcpNtpTimestamp int64 `json:"first_rtcp_ntp_timestamp,omitempty"` + LastRtcpRtpTimestamp uint32 `json:"last_rtcp_rtp_timestamp,omitempty"` + LastRtcpNtpTimestamp int64 `json:"last_rtcp_ntp_timestamp,omitempty"` +} diff --git a/processing/archive_metadata.go b/processing/archive_metadata.go new file mode 100644 index 0000000..f1ae828 --- /dev/null +++ b/processing/archive_metadata.go @@ -0,0 +1,370 @@ +package processing + +import ( + "archive/tar" + "compress/gzip" + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "strings" + + "github.com/GetStream/getstream-go/v3" +) + +// TrackInfo represents a single track with its metadata (deduplicated across segments) +type TrackInfo struct { + UserID string `json:"userId"` // participant_id from timing metadata + SessionID string `json:"sessionId"` // user_session_id from timing metadata + TrackID string `json:"trackId"` // track_id from segment + TrackType string `json:"trackType"` // "audio" or "video" (cleaned from TRACK_TYPE_*) + IsScreenshare bool `json:"isScreenshare"` // true if this is a screenshare track + Codec string `json:"codec"` // codec info + SegmentCount int `json:"segmentCount"` // number of segments for this track + Segments []*SegmentInfo `json:"segments"` // list of filenames (for JSON output only) + + ConcatenatedContainerPath string +} + +type SegmentInfo struct { + metadata *SegmentMetadata + + RtpDumpPath string + SdpPath string + ContainerPath string + ContainerExt string + FFMpegOffset int64 +} + +// RecordingMetadata contains all tracks and session information +type RecordingMetadata struct { + Tracks []*TrackInfo `json:"tracks"` + UserIDs []string `json:"userIds"` + Sessions []string `json:"sessions"` +} + +// MetadataParser handles parsing of raw recording files +type MetadataParser struct { + logger *getstream.DefaultLogger +} + +// NewMetadataParser creates a new metadata parser +func NewMetadataParser(logger *getstream.DefaultLogger) *MetadataParser { + return &MetadataParser{ + logger: logger, + } +} + +// ParseMetadataOnly efficiently extracts only metadata from archives (optimized for list-tracks) +// This is much faster than full extraction when you only need timing metadata +func (p *MetadataParser) ParseMetadataOnly(inputPath string) (*RecordingMetadata, error) { + // If it's already a directory, use the normal path + if stat, err := os.Stat(inputPath); err == nil && stat.IsDir() { + return p.parseDirectory(inputPath) + } + + // If it's a tar.gz file, use selective extraction (much faster) + if strings.HasSuffix(strings.ToLower(inputPath), ".tar.gz") { + return p.parseMetadataOnlyFromTarGz(inputPath) + } + + return nil, fmt.Errorf("unsupported input format: %s (only tar.gz files and directories supported)", inputPath) +} + +// parseDirectory processes a directory containing recording files +func (p *MetadataParser) parseDirectory(dirPath string) (*RecordingMetadata, error) { + metadata := &RecordingMetadata{ + Tracks: make([]*TrackInfo, 0), + UserIDs: make([]string, 0), + Sessions: make([]string, 0), + } + + // Find and process timing metadata files + err := filepath.Walk(dirPath, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if !info.IsDir() && strings.HasSuffix(strings.ToLower(info.Name()), "_timing_metadata.json") { + p.logger.Debug("Processing metadata file: %s", path) + + data, err := os.ReadFile(path) + if err != nil { + p.logger.Warn("Failed to read metadata file %s: %v", path, err) + return nil + } + + tracks, err := p.parseTimingMetadataFile(data) + if err != nil { + p.logger.Warn("Failed to parse metadata file %s: %v", path, err) + return nil + } + + metadata.Tracks = append(metadata.Tracks, tracks...) + } + + return nil + }) + + if err != nil { + return nil, fmt.Errorf("failed to process directory: %w", err) + } + + // Build unique lists + metadata.UserIDs = p.extractUniqueUserIDs(metadata.Tracks) + metadata.Sessions = p.extractUniqueSessions(metadata.Tracks) + + return metadata, nil +} + +// parseMetadataOnlyFromTarGz efficiently extracts only timing metadata from tar.gz files +// This is optimized for list-tracks - only reads JSON files, skips all .rtpdump/.sdp files +func (p *MetadataParser) parseMetadataOnlyFromTarGz(tarGzPath string) (*RecordingMetadata, error) { + p.logger.Debug("Reading metadata directly from tar.gz (efficient mode): %s", tarGzPath) + + file, err := os.Open(tarGzPath) + if err != nil { + return nil, fmt.Errorf("failed to open tar.gz file: %w", err) + } + defer file.Close() + + gzReader, err := gzip.NewReader(file) + if err != nil { + return nil, fmt.Errorf("failed to create gzip reader: %w", err) + } + defer gzReader.Close() + + tarReader := tar.NewReader(gzReader) + + metadata := &RecordingMetadata{ + Tracks: make([]*TrackInfo, 0), + UserIDs: make([]string, 0), + Sessions: make([]string, 0), + } + + filesRead := 0 + for { + header, err := tarReader.Next() + if err == io.EOF { + break + } else if err != nil { + return nil, fmt.Errorf("failed to read tar entry: %w", err) + } else if header.FileInfo().IsDir() { + continue + } + + // Only process timing metadata JSON files (skip all .rtpdump/.sdp files) + if strings.HasSuffix(strings.ToLower(header.Name), "_timing_metadata.json") { + p.logger.Debug("Processing metadata file: %s", header.Name) + + data, err := io.ReadAll(tarReader) + if err != nil { + p.logger.Warn("Failed to read metadata file %s: %v", header.Name, err) + continue + } + + tracks, err := p.parseTimingMetadataFile(data) + if err != nil { + p.logger.Warn("Failed to parse metadata file %s: %v", header.Name, err) + continue + } + + metadata.Tracks = append(metadata.Tracks, tracks...) + filesRead++ + } + // Skip all other files (.rtpdump, .sdp, etc.) - huge efficiency gain! + } + + p.logger.Debug("Efficiently read %d metadata files from archive (skipped all media data files)", filesRead) + + // Extract unique user IDs and sessions + metadata.UserIDs = p.extractUniqueUserIDs(metadata.Tracks) + metadata.Sessions = p.extractUniqueSessions(metadata.Tracks) + + return metadata, nil +} + +// parseTimingMetadataFile parses a timing metadata JSON file and extracts tracks +func (p *MetadataParser) parseTimingMetadataFile(data []byte) ([]*TrackInfo, error) { + var sessionMetadata SessionTimingMetadata + err := json.Unmarshal(data, &sessionMetadata) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal session metadata: %w", err) + } + + // Use a map to deduplicate tracks by unique key + trackMap := make(map[string]*TrackInfo) + + processSegment := func(segment *SegmentMetadata, trackType string) { + key := fmt.Sprintf("%s|%s|%s|%s", + sessionMetadata.ParticipantID, + sessionMetadata.UserSessionID, + segment.TrackID, + trackType) + + if existingTrack, exists := trackMap[key]; exists { + existingTrack.Segments = append(existingTrack.Segments, &SegmentInfo{metadata: segment}) + existingTrack.SegmentCount++ + } else { + // Create new track + track := &TrackInfo{ + UserID: sessionMetadata.ParticipantID, + SessionID: sessionMetadata.UserSessionID, + TrackID: segment.TrackID, + TrackType: p.cleanTrackType(segment.TrackType), + IsScreenshare: p.isScreenshareTrack(segment.TrackType), + Codec: segment.Codec, + SegmentCount: 1, + Segments: []*SegmentInfo{{metadata: segment}}, + } + trackMap[key] = track + } + } + + // Process audio segments + for _, segment := range sessionMetadata.Segments.Audio { + processSegment(segment, p.cleanTrackType(segment.TrackType)) + } + + // Process video segments + for _, segment := range sessionMetadata.Segments.Video { + processSegment(segment, p.cleanTrackType(segment.TrackType)) + } + + // Convert map to slice + tracks := make([]*TrackInfo, 0, len(trackMap)) + for _, track := range trackMap { + sort.Slice(track.Segments, func(i, j int) bool { + return track.Segments[i].metadata.FirstRtpUnixTimestamp < track.Segments[j].metadata.FirstRtpUnixTimestamp + }) + tracks = append(tracks, track) + } + + return tracks, nil +} + +// isScreenshareTrack detects if a track is screenshare-related +func (p *MetadataParser) isScreenshareTrack(trackType string) bool { + return trackType == "TRACK_TYPE_SCREEN_SHARE_AUDIO" || trackType == "TRACK_TYPE_SCREEN_SHARE" +} + +// cleanTrackType converts TRACK_TYPE_* to simple "audio" or "video" +func (p *MetadataParser) cleanTrackType(trackType string) string { + switch trackType { + case "TRACK_TYPE_AUDIO", "TRACK_TYPE_SCREEN_SHARE_AUDIO": + return "audio" + case "TRACK_TYPE_VIDEO", "TRACK_TYPE_SCREEN_SHARE": + return "video" + default: + return strings.ToLower(trackType) + } +} + +// extractUniqueUserIDs returns a sorted list of unique user IDs +func (p *MetadataParser) extractUniqueUserIDs(tracks []*TrackInfo) []string { + userIDMap := make(map[string]bool) + for _, track := range tracks { + userIDMap[track.UserID] = true + } + + userIDs := make([]string, 0, len(userIDMap)) + for userID := range userIDMap { + userIDs = append(userIDs, userID) + } + + return userIDs +} + +// NOTE: ExtractTrackFiles and extractTrackFromTarGz removed - no longer needed since we always work with directories + +// extractUniqueSessions returns a sorted list of unique session IDs +func (p *MetadataParser) extractUniqueSessions(tracks []*TrackInfo) []string { + sessionMap := make(map[string]bool) + for _, track := range tracks { + sessionMap[track.SessionID] = true + } + + sessions := make([]string, 0, len(sessionMap)) + for session := range sessionMap { + sessions = append(sessions, session) + } + + return sessions +} + +// FilterTracks filters tracks based on mutually exclusive criteria +// Only one filter (userID, sessionID, or trackID) can be specified at a time +// Empty values are ignored, specific values must match +// If all are empty, all tracks are returned +func FilterTracks(tracks []*TrackInfo, userID, sessionID, trackID, trackType, mediaFilter string) []*TrackInfo { + filtered := make([]*TrackInfo, 0) + + for _, track := range tracks { + if trackType != "" && track.TrackType != trackType { + continue // Skip tracks with wrong TrackType + } + + // Apply media type filtering if specified + if mediaFilter != "" && mediaFilter != "both" { + if mediaFilter == "user" && track.IsScreenshare { + continue // Skip display tracks when only user requested + } + if mediaFilter == "display" && !track.IsScreenshare { + continue // Skip user tracks when only display requested + } + } + + // Apply the single specified filter (mutually exclusive) + if trackID != "" { + // Filter by trackID - return only that specific track + if track.TrackID == trackID { + filtered = append(filtered, track) + } + } else if sessionID != "" { + // Filter by sessionID - return all tracks for that session + if track.SessionID == sessionID { + filtered = append(filtered, track) + } + } else if userID != "" { + // Filter by userID - return all tracks for that user + if track.UserID == userID { + filtered = append(filtered, track) + } + } else { + // No filters specified - return all tracks + filtered = append(filtered, track) + } + } + + return filtered +} + +func firstPacketNtpTimestamp(segment *SegmentMetadata) int64 { + if segment.FirstRtcpNtpTimestamp != 0 && segment.FirstRtcpRtpTimestamp != 0 { + rtpNtpTs := (segment.FirstRtcpRtpTimestamp - segment.FirstRtpRtpTimestamp) / sampleRate(segment) + return segment.FirstRtcpNtpTimestamp - int64(rtpNtpTs) + } else { + return segment.FirstRtpUnixTimestamp + } +} + +func lastPacketNtpTimestamp(segment *SegmentMetadata) int64 { + if segment.LastRtcpNtpTimestamp != 0 && segment.LastRtcpRtpTimestamp != 0 { + rtpNtpTs := (segment.LastRtpRtpTimestamp - segment.LastRtcpRtpTimestamp) / sampleRate(segment) + return segment.LastRtcpNtpTimestamp + int64(rtpNtpTs) + } else { + return segment.LastRtpUnixTimestamp + } +} + +func sampleRate(segment *SegmentMetadata) uint32 { + switch segment.TrackType { + case "TRACK_TYPE_AUDIO", + "TRACK_TYPE_SCREEN_SHARE_AUDIO": + return 48 + default: + return 90 + } +} diff --git a/processing/audio_mixer.go b/processing/audio_mixer.go new file mode 100644 index 0000000..b69751f --- /dev/null +++ b/processing/audio_mixer.go @@ -0,0 +1,96 @@ +package processing + +import ( + "fmt" + "path/filepath" + + "github.com/GetStream/getstream-go/v3" +) + +type AudioMixerConfig struct { + WorkDir string + OutputDir string + WithScreenshare bool + WithExtract bool + WithCleanup bool +} + +type AudioMixer struct { + logger *getstream.DefaultLogger +} + +func NewAudioMixer(logger *getstream.DefaultLogger) *AudioMixer { + return &AudioMixer{logger: logger} +} + +// MixAllAudioTracks orchestrates the entire audio mixing workflow using existing extraction logic +func (p *AudioMixer) MixAllAudioTracks(config *AudioMixerConfig, metadata *RecordingMetadata, logger *getstream.DefaultLogger) error { + // Step 1: Extract all matching audio tracks using existing ExtractTracks function + logger.Info("Step 1/2: Extracting all matching audio tracks...") + + if config.WithExtract { + mediaFilter := "user" + if config.WithScreenshare { + mediaFilter = "both" + } + + if err := ExtractTracks(config.WorkDir, config.OutputDir, "", "", "", metadata, "audio", mediaFilter, true, true, logger); err != nil { + return fmt.Errorf("failed to extract audio tracks: %w", err) + } + } + + fileOffsetMap := p.offset(metadata, config.WithScreenshare, logger) + if len(fileOffsetMap) == 0 { + return fmt.Errorf("no audio files were extracted - check your filter criteria") + } + + logger.Info("Found %d extracted audio files to mix", len(fileOffsetMap)) + + // Step 3: Mix all discovered audio files using existing webm.mixAudioFiles + outputFile := filepath.Join(config.OutputDir, "mixed_audio.webm") + + err := mixAudioFiles(outputFile, fileOffsetMap, logger) + if err != nil { + return fmt.Errorf("failed to mix audio files: %w", err) + } + + logger.Info("Successfully created mixed audio file: %s", outputFile) + + //// Clean up individual audio files (optional) + //for _, audioFile := range audioFiles { + // if err := os.Remove(audioFile.FilePath); err != nil { + // logger.Warn("Failed to clean up temporary file %s: %v", audioFile.FilePath, err) + // } + //} + + return nil +} + +func (p *AudioMixer) offset(metadata *RecordingMetadata, withScreenshare bool, logger *getstream.DefaultLogger) []*FileOffset { + var offsets []*FileOffset + var firstTrack *TrackInfo + for _, t := range metadata.Tracks { + if t.TrackType == "audio" && (!t.IsScreenshare || withScreenshare) { + if firstTrack == nil { + firstTrack = t + offsets = append(offsets, &FileOffset{ + Name: t.ConcatenatedContainerPath, + Offset: 0, // Will be sorted later and rearranged + }) + } else { + offset, err := calculateSyncOffsetFromFiles(t, firstTrack, logger) + if err != nil { + logger.Warn("Failed to calculate sync offset for audio tracks: %v", err) + continue + } + + offsets = append(offsets, &FileOffset{ + Name: t.ConcatenatedContainerPath, + Offset: offset, + }) + } + } + } + + return offsets +} diff --git a/processing/audio_video_muxer.go b/processing/audio_video_muxer.go new file mode 100644 index 0000000..20ef319 --- /dev/null +++ b/processing/audio_video_muxer.go @@ -0,0 +1,151 @@ +package processing + +import ( + "fmt" + "path/filepath" + "strings" + + "github.com/GetStream/getstream-go/v3" +) + +type AudioVideoMuxerConfig struct { + WorkDir string + OutputDir string + UserID string + SessionID string + TrackID string + Media string + + WithExtract bool + WithCleanup bool +} + +type AudioVideoMuxer struct { + logger *getstream.DefaultLogger +} + +func NewAudioVideoMuxer(logger *getstream.DefaultLogger) *AudioVideoMuxer { + return &AudioVideoMuxer{logger: logger} +} + +func (p *AudioVideoMuxer) MuxAudioVideoTracks(config *AudioVideoMuxerConfig, metadata *RecordingMetadata, logger *getstream.DefaultLogger) error { + if config.WithExtract { + // Extract audio tracks with gap filling enabled + logger.Info("Extracting audio tracks with gap filling...") + err := ExtractTracks(config.WorkDir, config.OutputDir, config.UserID, config.SessionID, config.TrackID, metadata, "audio", config.Media, true, true, logger) + if err != nil { + return fmt.Errorf("failed to extract audio tracks: %w", err) + } + + // Extract video tracks with gap filling enabled + logger.Info("Extracting video tracks with gap filling...") + err = ExtractTracks(config.WorkDir, config.OutputDir, config.UserID, config.SessionID, config.TrackID, metadata, "video", config.Media, true, true, logger) + if err != nil { + return fmt.Errorf("failed to extract video tracks: %w", err) + } + } + + // Group files by media type for proper pairing + pairedTracks := p.groupFilesByMediaType(metadata) + for audioTrack, videoTrack := range pairedTracks { + //logger.Info("Muxing %d user audio/video pairs", len(userAudio)) + err := p.muxTrackPairs(audioTrack, videoTrack, config.OutputDir, logger) + if err != nil { + logger.Error("Failed to mux user tracks: %v", err) + } + } + + return nil +} + +// calculateSyncOffsetFromFiles calculates sync offset between audio and video files using metadata +func calculateSyncOffsetFromFiles(audioTrack, videoTrack *TrackInfo, logger *getstream.DefaultLogger) (int64, error) { + // Calculate offset: positive means video starts before audio + audioTs := audioTrack.Segments[0].FFMpegOffset + firstPacketNtpTimestamp(audioTrack.Segments[0].metadata) + videoTs := videoTrack.Segments[0].FFMpegOffset + firstPacketNtpTimestamp(videoTrack.Segments[0].metadata) + offset := audioTs - videoTs + + logger.Info(fmt.Sprintf("Calculated sync offset: audio_start=%v, audio_ts=%v, video_start=%v, video_ts=%v, offset=%d", + audioTrack.Segments[0].metadata.FirstRtpUnixTimestamp, audioTs, videoTrack.Segments[0].metadata.FirstRtpUnixTimestamp, videoTs, offset)) + + return offset, nil +} + +// groupFilesByMediaType groups audio and video files by media type (user vs display) +func (p *AudioVideoMuxer) groupFilesByMediaType(metadata *RecordingMetadata) map[*TrackInfo]*TrackInfo { + pairedTracks := make(map[*TrackInfo]*TrackInfo) + + matches := func(audio *TrackInfo, video *TrackInfo) bool { + return audio.UserID == video.UserID && + audio.SessionID == video.SessionID && + audio.IsScreenshare == video.IsScreenshare + } + + for _, at := range metadata.Tracks { + if at.TrackType == "audio" { + for _, vt := range metadata.Tracks { + if vt.TrackType == "video" && matches(at, vt) { + pairedTracks[at] = vt + break + } + } + } + } + + return pairedTracks +} + +// muxTrackPairs muxes audio/video pairs of the same media type +func (p *AudioVideoMuxer) muxTrackPairs(audio, video *TrackInfo, outputDir string, logger *getstream.DefaultLogger) error { + // Calculate sync offset using segment timing information + offset, err := calculateSyncOffsetFromFiles(audio, video, logger) + if err != nil { + logger.Warn("Failed to calculate sync offset, using 0: %v", err) + offset = 0 + } + + // Generate output filename with media type indicator + outputFile := p.generateMediaAwareMuxedFilename(audio, video, outputDir) + + audioFile := audio.ConcatenatedContainerPath + videoFile := video.ConcatenatedContainerPath + + // Mux the audio and video files + logger.Info("Muxing %s + %s → %s (offset: %dms)", + filepath.Base(audioFile), filepath.Base(videoFile), filepath.Base(outputFile), offset) + + err = muxFiles(outputFile, audioFile, videoFile, float64(offset), logger) + if err != nil { + logger.Error("Failed to mux %s + %s: %v", audioFile, videoFile, err) + return err + } + + logger.Info("Successfully created muxed file: %s", outputFile) + + // Clean up individual track files to avoid clutter + //os.Remove(audioFile) + //os.Remove(videoFile) + //} + // + //if len(audioFiles) != len(videoFiles) { + // logger.Warn("Mismatched %s track counts: %d audio, %d video", mediaTypeName, len(audioFiles), len(videoFiles)) + //} + + return nil +} + +// generateMediaAwareMuxedFilename creates output filename that indicates media type +func (p *AudioVideoMuxer) generateMediaAwareMuxedFilename(audioFile, videoFile *TrackInfo, outputDir string) string { + audioBase := filepath.Base(audioFile.Segments[0].ContainerPath) + audioBase = strings.TrimSuffix(audioBase, "."+audioFile.Segments[0].ContainerExt) + + // Replace "audio_" with "muxed_{mediaType}_" to create output name + var muxedName string + if audioFile.IsScreenshare { + muxedName = strings.Replace(audioBase, "audio_", "muxed_display_", 1) + "." + videoFile.Segments[0].ContainerExt + } else { + muxedName = strings.Replace(audioBase, "audio_", "muxed_", 1) + "." + videoFile.Segments[0].ContainerExt + } + + return filepath.Join(outputDir, muxedName) +} diff --git a/processing/constants.go b/processing/constants.go new file mode 100644 index 0000000..5d1f595 --- /dev/null +++ b/processing/constants.go @@ -0,0 +1,15 @@ +package processing + +const ( + RtpDump = "rtpdump" + SuffixRtpDump = "." + RtpDump + + Sdp = "sdp" + SuffixSdp = "." + Sdp + + Webm = "webm" + SuffixWebm = "." + Webm + + Mp4 = "mp4" + SuffixMp4 = "." + Mp4 +) diff --git a/processing/container_converter.go b/processing/container_converter.go new file mode 100644 index 0000000..bee8c51 --- /dev/null +++ b/processing/container_converter.go @@ -0,0 +1,336 @@ +package processing + +import ( + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "time" + + "github.com/GetStream/getstream-go/v3" + "github.com/pion/rtp" + "github.com/pion/rtp/codecs" + "github.com/pion/webrtc/v4" + "github.com/pion/webrtc/v4/pkg/media/rtpdump" + "github.com/pion/webrtc/v4/pkg/media/samplebuilder" +) + +const audioMaxLate = 200 // 4sec +const videoMaxLate = 1000 // 4sec + +type RTPDump2WebMConverter struct { + logger *getstream.DefaultLogger + reader *rtpdump.Reader + recorder WebmRecorder + sampleBuilder *samplebuilder.SampleBuilder + + lastPkt *rtp.Packet + lastPktDuration uint32 + inserted uint16 +} + +type WebmRecorder interface { + OnRTP(pkt *rtp.Packet) error + PushRtpBuf(payload []byte) error + Close() error +} + +func newRTPDump2WebMConverter(logger *getstream.DefaultLogger) *RTPDump2WebMConverter { + return &RTPDump2WebMConverter{ + logger: logger, + } +} + +func ConvertDirectory(directory string, accept func(path string, info os.FileInfo) (*SegmentInfo, bool), fixDtx bool, logger *getstream.DefaultLogger) error { + rtpdumpFiles := make(map[string]*SegmentInfo) + + // Walk through directory to find .rtpdump files + err := filepath.Walk(directory, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if !info.IsDir() && strings.HasSuffix(strings.ToLower(info.Name()), SuffixRtpDump) { + segment, accepted := accept(path, info) + if accepted { + rtpdumpFiles[path] = segment + } + } + + return nil + }) + if err != nil { + return err + } + + for rtpdumpFile, segment := range rtpdumpFiles { + c := newRTPDump2WebMConverter(logger) + if err := c.ConvertFile(rtpdumpFile, fixDtx); err != nil { + c.logger.Error("Failed to convert %s: %v", rtpdumpFile, err) + continue + } + + switch c.recorder.(type) { + case *CursorWebmRecorder: + offset, exists := c.recorder.(*CursorWebmRecorder).StartOffset() + if exists { + segment.FFMpegOffset = offset + } + } + } + + return nil +} + +func (c *RTPDump2WebMConverter) ConvertFile(inputFile string, fixDtx bool) error { + c.logger.Info("Converting %s", inputFile) + + // Parse the RTP dump file + // Open the file + file, err := os.Open(inputFile) + if err != nil { + return fmt.Errorf("failed to open rtpdump file: %w", err) + } + defer file.Close() + + // Create standardized reader + reader, _, _ := rtpdump.NewReader(file) + c.reader = reader + + sdpContent, _ := readSDP(strings.Replace(inputFile, SuffixRtpDump, SuffixSdp, 1)) + mType, _ := mimeType(sdpContent) + + releasePacketHandler := samplebuilder.WithPacketReleaseHandler(c.buildDefaultReleasePacketHandler()) + + switch mType { + case webrtc.MimeTypeAV1: + c.sampleBuilder = samplebuilder.New(videoMaxLate, &codecs.AV1Depacketizer{}, 90000, releasePacketHandler) + c.recorder, err = NewCursorWebmRecorder(strings.Replace(inputFile, SuffixRtpDump, SuffixWebm, 1), sdpContent, c.logger) + case webrtc.MimeTypeVP9: + c.sampleBuilder = samplebuilder.New(videoMaxLate, &codecs.VP9Packet{}, 90000, releasePacketHandler) + c.recorder, err = NewCursorGstreamerWebmRecorder(strings.Replace(inputFile, SuffixRtpDump, SuffixWebm, 1), sdpContent, c.logger) + case webrtc.MimeTypeH264: + c.sampleBuilder = samplebuilder.New(videoMaxLate, &codecs.H264Packet{}, 90000, releasePacketHandler) + c.recorder, err = NewCursorWebmRecorder(strings.Replace(inputFile, SuffixRtpDump, SuffixMp4, 1), sdpContent, c.logger) + case webrtc.MimeTypeVP8: + c.sampleBuilder = samplebuilder.New(videoMaxLate, &codecs.VP8Packet{}, 90000, releasePacketHandler) + c.recorder, err = NewCursorWebmRecorder(strings.Replace(inputFile, SuffixRtpDump, SuffixWebm, 1), sdpContent, c.logger) + case webrtc.MimeTypeOpus: + if fixDtx { + releasePacketHandler = samplebuilder.WithPacketReleaseHandler(c.buildOpusReleasePacketHandler()) + } + c.sampleBuilder = samplebuilder.New(audioMaxLate, &codecs.OpusPacket{}, 48000, releasePacketHandler) + c.recorder, err = NewCursorWebmRecorder(strings.Replace(inputFile, SuffixRtpDump, SuffixWebm, 1), sdpContent, c.logger) + default: + return fmt.Errorf("unsupported codec type: %s", mType) + } + if err != nil { + return fmt.Errorf("failed to create WebM recorder: %w", err) + } + defer c.recorder.Close() + + time.Sleep(1 * time.Second) + + // Convert and feed RTP packets + return c.feedPackets(reader) +} + +func (c *RTPDump2WebMConverter) feedPackets(reader *rtpdump.Reader) error { + startTime := time.Now() + + i := 0 + for ; ; i++ { + packet, err := reader.Next() + if errors.Is(err, io.EOF) { + break + } else if err != nil { + return err + } else if packet.IsRTCP { + // _ = c.recorder.PushRtcpBuf(packet.Payload) + continue + } + + // Unmarshal the RTP packet from the raw payload + if c.sampleBuilder == nil { + _ = c.recorder.PushRtpBuf(packet.Payload) + } else { + // Unmarshal the RTP packet from the raw payload + rtpPacket := &rtp.Packet{} + if err := rtpPacket.Unmarshal(packet.Payload); err != nil { + c.logger.Warn("Failed to unmarshal RTP packet %d: %v", i, err) + continue + } + + // Push packet to samplebuilder for reordering + c.sampleBuilder.Push(rtpPacket) + } + + // Log progress + if i%2000 == 0 && i > 0 { + c.logger.Info("Processed %d packets", i) + } + } + + if c.sampleBuilder != nil { + c.sampleBuilder.Flush() + } + + duration := time.Since(startTime) + c.logger.Info("Finished feeding %d packets in %v", i, duration) + + // Allow some time for the recorder to finalize + time.Sleep(2 * time.Second) + + return nil +} + +func (c *RTPDump2WebMConverter) buildDefaultReleasePacketHandler() func(pkt *rtp.Packet) { + return func(pkt *rtp.Packet) { + if c.lastPkt != nil { + if pkt.SequenceNumber-c.lastPkt.SequenceNumber > 1 { + c.logger.Info("Missing Packet Detected, Previous SeqNum: %d RtpTs: %d - Last SeqNum: %d RtpTs: %d", c.lastPkt.SequenceNumber, c.lastPkt.Timestamp, pkt.SequenceNumber, pkt.Timestamp) + } + } + + c.lastPkt = pkt + + if e := c.recorder.OnRTP(pkt); e != nil { + c.logger.Warn("Failed to record RTP packet SeqNum: %d RtpTs: %d: %v", pkt.SequenceNumber, pkt.Timestamp, e) + } + } +} + +func (c *RTPDump2WebMConverter) buildOpusReleasePacketHandler() func(pkt *rtp.Packet) { + return func(pkt *rtp.Packet) { + pkt.SequenceNumber += c.inserted + + if c.lastPkt != nil { + if pkt.SequenceNumber-c.lastPkt.SequenceNumber > 1 { + c.logger.Info("Missing Packet Detected, Previous SeqNum: %d RtpTs: %d - Last SeqNum: %d RtpTs: %d", c.lastPkt.SequenceNumber, c.lastPkt.Timestamp, pkt.SequenceNumber, pkt.Timestamp) + } + + tsDiff := pkt.Timestamp - c.lastPkt.Timestamp // TODO handle rollover + lastPktDuration := opusPacketDurationMs(c.lastPkt) + rtpDuration := uint32(lastPktDuration * 48) + + if rtpDuration == 0 { + rtpDuration = c.lastPktDuration + c.logger.Info("LastPacket with no duration, Previous SeqNum: %d RtpTs: %d - Last SeqNum: %d RtpTs: %d", c.lastPkt.SequenceNumber, c.lastPkt.Timestamp, pkt.SequenceNumber, pkt.Timestamp) + } else { + c.lastPktDuration = rtpDuration + } + + if rtpDuration > 0 && tsDiff > rtpDuration { + + // Calculate how many packets we need to insert, taking care of packet losses + var toAdd uint16 + if uint32(pkt.SequenceNumber-c.lastPkt.SequenceNumber)*rtpDuration != tsDiff { // TODO handle rollover + toAdd = uint16(tsDiff/rtpDuration) - (pkt.SequenceNumber - c.lastPkt.SequenceNumber) + } + + c.logger.Info("Gap detected, inserting %d packets tsDiff %d, Previous SeqNum: %d RtpTs: %d - Last SeqNum: %d RtpTs: %d", + toAdd, tsDiff, c.lastPkt.SequenceNumber, c.lastPkt.Timestamp, pkt.SequenceNumber, pkt.Timestamp) + + for i := 1; i <= int(toAdd); i++ { + ins := c.lastPkt.Clone() + ins.Payload = ins.Payload[:1] // Keeping only TOC byte + ins.SequenceNumber += uint16(i) + ins.Timestamp += uint32(i) * rtpDuration + + c.logger.Debug("Writing inserted Packet %v", ins) + if e := c.recorder.OnRTP(ins); e != nil { + c.logger.Warn("Failed to record inserted RTP packet SeqNum: %d RtpTs: %d: %v", ins.SequenceNumber, ins.Timestamp, e) + } + } + + c.inserted += toAdd + pkt.SequenceNumber += toAdd + } + } + + c.lastPkt = pkt + + c.logger.Debug("Writing real Packet Last SeqNum: %d RtpTs: %d", pkt.SequenceNumber, pkt.Timestamp) + if e := c.recorder.OnRTP(pkt); e != nil { + c.logger.Warn("Failed to record RTP packet SeqNum: %d RtpTs: %d: %v", pkt.SequenceNumber, pkt.Timestamp, e) + } + } +} + +func opusPacketDurationMs(pkt *rtp.Packet) int { + // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // | config |s|1|1|0|p| M | + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + payload := pkt.Payload + if len(payload) < 1 { + return 0 + } + + toc := payload[0] + config := (toc >> 3) & 0x1F + c := toc & 0x03 + + // Calculate frame duration according to OPUS RFC 6716 table (use x10 factor) + // Frame duration is determined by the config value + var duration int + switch { + case config < 3: + // SILK-only NB: 10, 20, 40 ms + duration = 100 * (1 << (config & 0x03)) + case config == 3: + // SILK-only NB: 60 ms + duration = 600 + case config < 7: + // SILK-only MB: 10, 20, 40 ms + duration = 100 * (1 << (config & 0x03)) + case config == 7: + // SILK-only MB: 60 ms + duration = 600 + case config < 11: + // SILK-only WB: 10, 20, 40 ms + duration = 100 * (1 << (config & 0x03)) + case config == 11: + // SILK-only WB: 60 ms + duration = 600 + case config <= 13: + // Hybrid SWB: 10, 20 ms + duration = 100 * (1 << (config & 0x01)) + case config <= 15: + // Hybrid FB: 10, 20 ms + duration = 100 * (1 << (config & 0x01)) + case config <= 19: + // CELT-only NB: 2.5, 5, 10, 20 ms + duration = 25 * (1 << (config & 0x03)) // 2.5ms * 10 for integer math + case config <= 23: + // CELT-only WB: 2.5, 5, 10, 20 ms + duration = 25 * (1 << (config & 0x03)) // 2.5ms * 10 for integer math + case config <= 27: + // CELT-only SWB: 2.5, 5, 10, 20 ms + duration = 25 * (1 << (config & 0x03)) // 2.5ms * 10 for integer math + case config <= 31: + // CELT-only FB: 2.5, 5, 10, 20 ms + duration = 25 * (1 << (config & 0x03)) // 2.5ms * 10 for integer math + default: + // MUST NOT HAPPEN + duration = 0 + } + + frameDuration := float32(duration) / 10 + + var frameCount float32 + switch c { + case 0: + frameCount = 1 + case 1, 2: + frameCount = 2 + case 3: + if len(payload) > 1 { + frameCount = float32(payload[1] & 0x3F) + } + } + + return int(frameDuration * frameCount) +} diff --git a/processing/ffmpeg_converter.go b/processing/ffmpeg_converter.go new file mode 100644 index 0000000..e246f68 --- /dev/null +++ b/processing/ffmpeg_converter.go @@ -0,0 +1,310 @@ +package processing + +import ( + "bufio" + "context" + "fmt" + "io" + "math/rand" + "net" + "os" + "os/exec" + "regexp" + "strconv" + "strings" + "sync" + "time" + + "github.com/GetStream/getstream-go/v3" + "github.com/pion/rtcp" + "github.com/pion/rtp" +) + +type CursorWebmRecorder struct { + logger *getstream.DefaultLogger + outputPath string + conn *net.UDPConn + ffmpegCmd *exec.Cmd + stdin io.WriteCloser + mu sync.Mutex + ctx context.Context + cancel context.CancelFunc + + // Parsed from FFmpeg output: "Duration: N/A, start: , bitrate: N/A" + startOffsetMs int64 + hasStartOffset bool +} + +func NewCursorWebmRecorder(outputPath, sdpContent string, logger *getstream.DefaultLogger) (*CursorWebmRecorder, error) { + ctx, cancel := context.WithCancel(context.Background()) + + r := &CursorWebmRecorder{ + logger: logger, + outputPath: outputPath, + ctx: ctx, + cancel: cancel, + } + + // Set up UDP connections + port := rand.Intn(10000) + 10000 + if err := r.setupConnections(port); err != nil { + cancel() + return nil, err + } + + // Start FFmpeg with codec detection + if err := r.startFFmpeg(outputPath, sdpContent, port); err != nil { + cancel() + return nil, err + } + + return r, nil +} + +func (r *CursorWebmRecorder) setupConnections(port int) error { + // Setup connection + addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:"+strconv.Itoa(port)) + if err != nil { + return err + } + conn, err := net.DialUDP("udp", nil, addr) + if err != nil { + return err + } + r.conn = conn + + return nil +} + +func (r *CursorWebmRecorder) startFFmpeg(outputFilePath, sdpContent string, port int) error { + + // Write SDP to a temporary file + sdpFile, err := os.CreateTemp("", "cursor_webm_*.sdp") + if err != nil { + return err + } + + updatedSdp := replaceSDP(sdpContent, port) + r.logger.Info("Using Sdp:\n%s\n", updatedSdp) + + if _, err := sdpFile.WriteString(updatedSdp); err != nil { + sdpFile.Close() + return err + } + sdpFile.Close() + + // Build FFmpeg command with optimized settings for single track recording + args := []string{ + "-threads", "1", + // "-loglevel", "debug", + "-protocol_whitelist", "file,udp,rtp", + "-buffer_size", "10000000", + "-max_delay", "150000", + "-reorder_queue_size", "130", + "-i", sdpFile.Name(), + } + + //switch strings.ToLower(mimeType) { + //case "audio/opus": + // // For other codecs, use direct copy + args = append(args, "-c", "copy") + //default: + // // For other codecs, use direct copy + // args = append(args, "-c", "copy") + //} + //if isVP9 { + // // For VP9, avoid direct copy and use re-encoding with error resilience + // // This works around FFmpeg's experimental VP9 RTP support issues + // r.logger.Info("Detected VP9 codec, applying workarounds...") + // args = append(args, + // "-c:v", "libvpx-vp9", + // // "-error_resilience", "aggressive", + // "-err_detect", "ignore_err", + // "-fflags", "+genpts+igndts", + // "-avoid_negative_ts", "make_zero", + // // VP9-specific quality settings to handle corrupted frames + // "-crf", "30", + // "-row-mt", "1", + // "-frame-parallel", "1", + // ) + //} else if strings.Contains(strings.ToUpper(sdpContent), "AV1") { + // args = append(args, + // "-c:v", "libaom-av1", + // "-cpu-used", "8", + // "-usage", "realtime", + // ) + //} else if strings.Contains(strings.ToUpper(sdpContent), "OPUS") { + // args = append(args, "-fflags", "+genpts", "-use_wallclock_as_timestamps", "0", "-c:a", "copy") + //} else { + // // For other codecs, use direct copy + // args = append(args, "-c", "copy") + //} + + args = append(args, + "-y", + outputFilePath, + ) + + r.logger.Info("FFMpeg pipeline: %s", strings.Join(args, " ")) // Skip debug args for display + + r.ffmpegCmd = exec.Command("ffmpeg", args...) + + // Capture stdout/stderr to parse FFmpeg logs while mirroring to console + stdoutPipe, err := r.ffmpegCmd.StdoutPipe() + if err != nil { + return err + } + stderrPipe, err := r.ffmpegCmd.StderrPipe() + if err != nil { + return err + } + + // Create stdin pipe to send commands to FFmpeg + //var err error + r.stdin, err = r.ffmpegCmd.StdinPipe() + if err != nil { + fmt.Println("Error creating stdin pipe:", err) + } + + // Begin scanning output streams after process has started + go r.scanFFmpegOutput(stdoutPipe, false) + go r.scanFFmpegOutput(stderrPipe, true) + + // Start FFmpeg process + if err := r.ffmpegCmd.Start(); err != nil { + return err + } + + return nil +} + +// scanFFmpegOutput reads lines from FFmpeg output, mirrors to console, and extracts start offset. +func (r *CursorWebmRecorder) scanFFmpegOutput(reader io.Reader, isStderr bool) { + scanner := bufio.NewScanner(reader) + re := regexp.MustCompile(`\bstart:\s*([0-9]+(?:\.[0-9]+)?)`) + for scanner.Scan() { + line := scanner.Text() + // Mirror output + if isStderr { + fmt.Fprintln(os.Stderr, line) + } else { + fmt.Fprintln(os.Stdout, line) + } + + // Try to extract the start value from those lines "Duration: N/A, start: 0.000000, bitrate: N/A" + if !strings.Contains(line, "Duration") || !strings.Contains(line, "bitrate") { + continue + } else if matches := re.FindStringSubmatch(line); len(matches) == 2 { + if v, parseErr := strconv.ParseFloat(matches[1], 64); parseErr == nil { + // Save only once + r.mu.Lock() + if !r.hasStartOffset { + r.startOffsetMs = int64(v * 1000) + r.hasStartOffset = true + r.logger.Info("Detected FFmpeg start offset: %.6f seconds", v) + } + r.mu.Unlock() + } + } + } + _ = scanner.Err() +} + +// StartOffset returns the parsed FFmpeg start offset in seconds and whether it was found. +func (r *CursorWebmRecorder) StartOffset() (int64, bool) { + r.mu.Lock() + defer r.mu.Unlock() + return r.startOffsetMs, r.hasStartOffset +} + +func (r *CursorWebmRecorder) OnRTP(packet *rtp.Packet) error { + // Marshal RTP packet + buf, err := packet.Marshal() + if err != nil { + return err + } + + return r.PushRtpBuf(buf) +} + +func (r *CursorWebmRecorder) PushRtpBuf(buf []byte) error { + r.mu.Lock() + defer r.mu.Unlock() + + // Send RTP packet over UDP + if r.conn != nil { + _, _ = r.conn.Write(buf) + //if err != nil { + // return err) + //} + // r.logger.Info("Wrote packet to %s - %v", r.conn.LocalAddr().String(), err) + } + return nil +} + +func (r *CursorWebmRecorder) Close() error { + r.mu.Lock() + defer r.mu.Unlock() + + // Cancel context to stop background goroutines + if r.cancel != nil { + r.cancel() + } + + r.logger.Info("Closing UPD connection...") + + // Close UDP connection by sending arbitrary RtcpBye (Ffmpeg is no able to end correctly) + if r.conn != nil { + buf, _ := rtcp.Goodbye{ + Sources: []uint32{1}, // fixed ssrc is ok + Reason: "bye", + }.Marshal() + _, _ = r.conn.Write(buf) + _ = r.conn.Close() + r.conn = nil + } + + r.logger.Info("UDP Connection closed...") + + time.Sleep(5 * time.Second) + + r.logger.Info("After sleep...") + + // Gracefully stop FFmpeg + if r.ffmpegCmd != nil && r.ffmpegCmd.Process != nil { + + // ✅ Gracefully stop FFmpeg by sending 'q' to stdin + //fmt.Println("Sending 'q' to FFmpeg...") + //_, _ = r.stdin.Write([]byte("q\n")) + //r.stdin.Close() + + // Send interrupt signal to FFmpeg process + r.logger.Info("Sending SIGTERM...") + + //if err := r.ffmpegCmd.Process.Signal(os.Interrupt); err != nil { + // // If interrupt fails, force kill + // r.ffmpegCmd.Process.Kill() + //} else { + + r.logger.Info("Waiting for SIGTERM...") + + // Wait for graceful exit with timeout + done := make(chan error, 1) + go func() { + done <- r.ffmpegCmd.Wait() + }() + + select { + case <-time.After(10 * time.Second): + r.logger.Info("Wait timetout for SIGTERM...") + + // Timeout, force kill + r.ffmpegCmd.Process.Kill() + case <-done: + r.logger.Info("Process exited succesfully SIGTERM...") + // Process exited gracefully + } + } + + return nil +} diff --git a/processing/ffmpeg_helper.go b/processing/ffmpeg_helper.go new file mode 100644 index 0000000..0d7d70f --- /dev/null +++ b/processing/ffmpeg_helper.go @@ -0,0 +1,176 @@ +package processing + +import ( + "fmt" + "os" + "os/exec" + "sort" + "strings" + + "github.com/GetStream/getstream-go/v3" +) + +const TmpDir = "/tmp" + +type FileOffset struct { + Name string + Offset int64 +} + +func concatFile(outputPath string, files []string, logger *getstream.DefaultLogger) error { + // Write to a temporary file + tmpFile, err := os.CreateTemp(TmpDir, "concat_*.txt") + if err != nil { + return err + } + defer func() { + tmpFile.Close() + // _ = os.Remove(concatFile.Name()) + }() + + for _, file := range files { + if _, err := tmpFile.WriteString(fmt.Sprintf("file '%s'\n", file)); err != nil { + return err + } + } + + args := []string{} + args = append(args, "-f", "concat") + args = append(args, "-safe", "0") + args = append(args, "-i", tmpFile.Name()) + args = append(args, "-c", "copy") + args = append(args, outputPath) + return runFFMEPGCpmmand(args, logger) +} + +func muxFiles(fileName string, audioFile string, videoFile string, offsetMs float64, logger *getstream.DefaultLogger) error { + args := []string{} + + // Apply offset using itsoffset + // If offset is positive (video ahead), delay audio + // If offset is negative (audio ahead), delay video + if offsetMs != 0 { + offsetSeconds := offsetMs / 1000.0 + + if offsetMs > 0 { + // Video is ahead, delay audio + args = append(args, "-itsoffset", fmt.Sprintf("%.3f", offsetSeconds)) + args = append(args, "-i", audioFile) + args = append(args, "-i", videoFile) + } else { + args = append(args, "-i", audioFile) + args = append(args, "-itsoffset", fmt.Sprintf("%.3f", -offsetSeconds)) + args = append(args, "-i", videoFile) + } + } else { + args = append(args, "-i", audioFile) + args = append(args, "-i", videoFile) + } + + args = append(args, "-map", "0:a") + args = append(args, "-map", "1:v") + args = append(args, "-c", "copy") + args = append(args, fileName) + + return runFFMEPGCpmmand(args, logger) +} + +func mixAudioFiles(fileName string, files []*FileOffset, logger *getstream.DefaultLogger) error { + var args []string + + var filterParts []string + var mixParts []string + + sort.Slice(files, func(i, j int) bool { + return files[i].Offset < files[j].Offset + }) + + var offsetToAdd int64 + for i, fo := range files { + args = append(args, "-i", fo.Name) + + if i == 0 { + offsetToAdd = -fo.Offset + } + offset := fo.Offset + offsetToAdd + + if offset > 0 { + // for stereo: offset|offset + label := fmt.Sprintf("a%d", i) + filterParts = append(filterParts, + fmt.Sprintf("[%d:a]adelay=%d|%d[%s]", i, offset, offset, label)) + mixParts = append(mixParts, fmt.Sprintf("[%s]", label)) + } else { + mixParts = append(mixParts, fmt.Sprintf("[%d:a]", i)) + } + } + + // Build amix filter + filter := strings.Join(filterParts, "; ") + if filter != "" { + filter += "; " + } + filter += strings.Join(mixParts, "") + + fmt.Sprintf("amix=inputs=%d:normalize=0", len(files)) + + args = append(args, "-filter_complex", filter) + args = append(args, "-c:a", "libopus") + args = append(args, "-b:a", "128k") + args = append(args, fileName) + + fmt.Println(strings.Join(args, " ")) + + return runFFMEPGCpmmand(args, logger) +} + +func generateSilence(fileName string, duration float64, logger *getstream.DefaultLogger) error { + args := []string{} + args = append(args, "-f", "lavfi") + args = append(args, "-t", fmt.Sprintf("%.3f", duration)) + args = append(args, "-i", "anullsrc=cl=stereo:r=48000") + args = append(args, "-c:a", "libopus") + args = append(args, "-b:a", "32k") + args = append(args, fileName) + + return runFFMEPGCpmmand(args, logger) +} + +func generateBlackVideo(fileName, mimeType string, duration float64, width, height, frameRate int, logger *getstream.DefaultLogger) error { + var codecLib string + switch strings.ToLower(mimeType) { + case "video/vp8": + codecLib = "libvpx-vp9" + case "video/vp9": + codecLib = "libvpx-vp9" + case "video/h264": + codecLib = "libh264" + case "video/av1": + codecLib = "libav1" + } + + args := []string{} + args = append(args, "-f", "lavfi") + args = append(args, "-t", fmt.Sprintf("%.3f", duration)) + args = append(args, "-i", fmt.Sprintf("color=c=black:s=%dx%d:r=%d", width, height, frameRate)) + args = append(args, "-c:v", codecLib) + args = append(args, "-b:v", "1M") + args = append(args, fileName) + + return runFFMEPGCpmmand(args, logger) +} + +func runFFMEPGCpmmand(args []string, logger *getstream.DefaultLogger) error { + cmd := exec.Command("ffmpeg", args...) + + // Capture output for debugging + output, err := cmd.CombinedOutput() + if err != nil { + logger.Error("FFmpeg command failed: %v", err) + logger.Error("FFmpeg output: %s", string(output)) + return fmt.Errorf("ffmpeg command failed: %w", err) + } + + logger.Info("Successfully ran ffmpeg: %s", args) + logger.Debug("FFmpeg output: %s", string(output)) + return nil +} diff --git a/processing/gstreamer_converter.go b/processing/gstreamer_converter.go new file mode 100644 index 0000000..64714a5 --- /dev/null +++ b/processing/gstreamer_converter.go @@ -0,0 +1,474 @@ +package processing + +import ( + "context" + "encoding/binary" + "fmt" + "math/rand" + "net" + "os" + "os/exec" + "strconv" + "strings" + "sync" + "time" + + "github.com/GetStream/getstream-go/v3" + "github.com/pion/rtp" +) + +type CursorGstreamerWebmRecorder struct { + logger *getstream.DefaultLogger + outputPath string + rtpConn net.Conn + gstreamerCmd *exec.Cmd + mu sync.Mutex + ctx context.Context + cancel context.CancelFunc + port int + sdpFile *os.File + finalOutputPath string // Path for post-processed file with duration + tempOutputPath string // Path for temporary file before post-processing +} + +func NewCursorGstreamerWebmRecorder(outputPath, sdpContent string, logger *getstream.DefaultLogger) (*CursorGstreamerWebmRecorder, error) { + ctx, cancel := context.WithCancel(context.Background()) + + r := &CursorGstreamerWebmRecorder{ + logger: logger, + outputPath: outputPath, + ctx: ctx, + cancel: cancel, + } + + // Choose TCP listen port for GStreamer tcpserversrc + r.port = rand.Intn(10000) + 10000 + + // Start GStreamer with codec detection + if err := r.startGStreamer(sdpContent, outputPath); err != nil { + cancel() + return nil, err + } + + // Establish TCP client connection to the local tcpserversrc + if err := r.setupConnections(r.port); err != nil { + cancel() + return nil, err + } + + return r, nil +} + +func (r *CursorGstreamerWebmRecorder) setupConnections(port int) error { + // Setup TCP connection with retry to match GStreamer tcpserversrc readiness + address := "127.0.0.1:" + strconv.Itoa(port) + deadline := time.Now().Add(10 * time.Second) + var conn net.Conn + var err error + for { + conn, err = net.DialTimeout("tcp", address, 500*time.Millisecond) + if err == nil { + break + } + if time.Now().After(deadline) { + return fmt.Errorf("failed to connect to tcpserversrc at %s: %w", address, err) + } + time.Sleep(100 * time.Millisecond) + } + r.rtpConn = conn + return nil +} + +func (r *CursorGstreamerWebmRecorder) startGStreamer(sdpContent, outputFilePath string) error { + // Parse SDP to determine RTP caps for rtpstreamdepay + media, encodingName, payloadType, clockRate := parseRtpCapsFromSDP(sdpContent) + r.logger.Info("Starting TCP-based GStreamer pipeline (media=%s, encoding=%s, payload=%d, clock-rate=%d)", media, encodingName, payloadType, clockRate) + + // Determine codec from SDP content and build GStreamer arguments + isVP9 := strings.Contains(strings.ToUpper(sdpContent), "VP9") + isVP8 := strings.Contains(strings.ToUpper(sdpContent), "VP8") + isAV1 := strings.Contains(strings.ToUpper(sdpContent), "AV1") + isH264 := strings.Contains(strings.ToUpper(sdpContent), "H264") || strings.Contains(strings.ToUpper(sdpContent), "H.264") + isOpus := strings.Contains(strings.ToUpper(sdpContent), "OPUS") + + // Start with common GStreamer arguments optimized for RTP dump replay + args := []string{ + "--gst-debug-level=3", + "--gst-debug=tcpserversrc:5,rtp*:5,webm*:5,identity:5,jitterbuffer:5,vp9*:5", + //"--gst-debug-no-color", + "-e", // Send EOS on interrupt for clean shutdown + } + // Source from TCP (RFC4571 framed) and depayload back to application/x-rtp + args = append(args, + "tcpserversrc", + "host=127.0.0.1", + fmt.Sprintf("port=%d", r.port), + "name=tcp_in", + "!", + "queue", + "max-size-buffers=0", + "max-size-bytes=268435456", + "max-size-time=0", + "leaky=0", + "!", + // Ensure rtpstreamdepay sink has caps + "application/x-rtp-stream", + "!", + "rtpstreamdepay", + "!", + fmt.Sprintf("application/x-rtp,media=%s,encoding-name=%s,clock-rate=%d,payload=%d", media, encodingName, clockRate, payloadType), + "!", + ) + + // Build pipeline based on codec with simplified RTP timestamp handling for dump replay + // + // Simplified approach for RTP dump replay: + // - rtpjitterbuffer: Basic packet reordering with minimal interference + // - latency=0: No artificial latency, process packets as they come + // - mode=none: Don't override timing, let depayloaders handle it + // - do-retransmission=false: No retransmission for dump replay + // - Remove identity sync to avoid timing conflicts + // + // This approach focuses on preserving original RTP timestamps without + // artificial buffering that can interfere with dump replay timing. + if false && isH264 { + r.logger.Info("Detected H.264 codec, building H.264 pipeline with timestamp handling...") + args = append(args, + "application/x-rtp,media=video,encoding-name=H264,clock-rate=90000", "!", + "rtpjitterbuffer", + "latency=0", + "mode=none", + "do-retransmission=false", "!", + "rtph264depay", "!", + "h264parse", "!", + "mp4mux", "!", + "filesink", fmt.Sprintf("location=%s", outputFilePath), + ) + } else if false && isVP9 { + r.logger.Info("Detected VP9 codec, building VP9 pipeline with timestamp handling...") + args = append(args, + "rtpjitterbuffer", + "latency=0", + "mode=none", + "do-retransmission=false", + "drop-on-latency=false", + "buffer-mode=slave", + "max-dropout-time=5000000000", + "max-reorder-delay=1000000000", + "!", + "rtpvp9depay", "!", + "vp9parse", "!", + "webmmux", + "writing-app=GStreamer-VP9", + "streamable=false", + "min-index-interval=2000000000", "!", + "filesink", fmt.Sprintf("location=%s", outputFilePath), + ) + } else if isVP9 { + r.logger.Info("Detected VP9 codec, building VP9 pipeline with RTP timestamp handling...") + args = append(args, + + //// jitterbuffer for packet reordering and timestamp handling + "rtpjitterbuffer", + "name=jitterbuffer", + "mode=none", + "latency=0", // No artificial latency - process immediately + "do-lost=false", // Don't generate lost events for missing packets + "do-retransmission=false", // No retransmission for offline replay + "drop-on-latency=false", // Keep all packets even if late + "!", + // + // Depayload RTP to get VP9 frames + "rtpvp9depay", + "!", + + // Parse VP9 stream to ensure valid frame structure + "vp9parse", + "!", + + // Queue for buffering + "queue", + "!", + + // Mux into Matroska/WebM container + "webmmux", + "writing-app=GStreamer-VP9", + "streamable=false", + "min-index-interval=2000000000", + "!", + + // Write to file + "filesink", + fmt.Sprintf("location=%s", outputFilePath), + ) + + } else if false && isVP8 { + r.logger.Info("Detected VP8 codec, building VP8 pipeline with timestamp handling...") + args = append(args, + "application/x-rtp,media=video,encoding-name=VP8,clock-rate=90000", "!", + "rtpjitterbuffer", + "latency=0", + "mode=none", + "do-retransmission=false", "!", + "rtpvp8depay", "!", + "vp8parse", "!", + "webmmux", "writing-app=GStreamer", "streamable=false", "min-index-interval=2000000000", "!", + "filesink", fmt.Sprintf("location=%s", outputFilePath), + ) + } else if false && isAV1 { + r.logger.Info("Detected AV1 codec, building AV1 pipeline with timestamp handling...") + args = append(args, + "application/x-rtp,media=video,encoding-name=AV1,clock-rate=90000", "!", + "rtpjitterbuffer", + "latency=0", + "mode=none", + "do-retransmission=false", "!", + "rtpav1depay", "!", + "av1parse", "!", + "webmmux", "!", + "filesink", fmt.Sprintf("location=%s", outputFilePath), + ) + } else if false && isOpus { + r.logger.Info("Detected Opus codec, building Opus pipeline with timestamp handling...") + args = append(args, + "application/x-rtp,media=audio,encoding-name=OPUS,clock-rate=48000,payload=111", "!", + "rtpjitterbuffer", + "latency=0", + "mode=none", + "do-retransmission=false", "!", + "rtpopusdepay", "!", + "opusparse", "!", + "webmmux", "!", + "filesink", fmt.Sprintf("location=%s", outputFilePath), + ) + } else if false { + // Default to VP8 if codec is not detected + r.logger.Info("Unknown or no codec detected, defaulting to VP8 pipeline with timestamp handling...") + args = append(args, + "application/x-rtp,media=video,encoding-name=VP8,clock-rate=90000", "!", + "rtpjitterbuffer", + "latency=0", + "mode=none", + "do-retransmission=false", "!", + "rtpvp8depay", "!", + "vp8parse", "!", + "webmmux", "writing-app=GStreamer", "streamable=false", "min-index-interval=2000000000", "!", + "filesink", fmt.Sprintf("location=%s", outputFilePath), + ) + } + + r.logger.Info("GStreamer pipeline: %s", strings.Join(args, " ")) // Skip debug args for display + + r.gstreamerCmd = exec.Command("gst-launch-1.0", args...) + + // Redirect output for debugging + r.gstreamerCmd.Stdout = os.Stdout + r.gstreamerCmd.Stderr = os.Stderr + + // Start GStreamer process + if err := r.gstreamerCmd.Start(); err != nil { + return err + } + + r.logger.Info("GStreamer pipeline started with PID: %d", r.gstreamerCmd.Process.Pid) + + // Monitor the process in a goroutine + go func() { + if err := r.gstreamerCmd.Wait(); err != nil { + r.logger.Error("GStreamer process exited with error: %v", err) + } else { + r.logger.Info("GStreamer process exited normally") + } + }() + + return nil +} + +// parseRtpCapsFromSDP extracts basic RTP caps from an SDP for use with application/x-rtp caps +// Prioritizes video codecs (H264/VP9/VP8/AV1) over audio (OPUS) and parses payload/clock-rate +func parseRtpCapsFromSDP(sdp string) (media string, encodingName string, payload int, clockRate int) { + upper := strings.ToUpper(sdp) + + // Defaults + media = "video" + encodingName = "VP9" + payload = 96 + clockRate = 90000 + + // Select target encoding with priority: H264 > VP9 > VP8 > AV1 > OPUS (audio) + if strings.Contains(upper, "H264") || strings.Contains(upper, "H.264") { + encodingName = "H264" + media = "video" + clockRate = 90000 + } else if strings.Contains(upper, "VP9") { + encodingName = "VP9" + media = "video" + clockRate = 90000 + } else if strings.Contains(upper, "VP8") { + encodingName = "VP8" + media = "video" + clockRate = 90000 + } else if strings.Contains(upper, "AV1") { + encodingName = "AV1" + media = "video" + clockRate = 90000 + } else if strings.Contains(upper, "OPUS") { + encodingName = "OPUS" + media = "audio" + clockRate = 48000 + } + + // Parse matching a=rtpmap for the chosen encoding to refine payload and clock + chosen := encodingName + for _, line := range strings.Split(sdp, "\n") { + line = strings.TrimSpace(line) + if !strings.HasPrefix(strings.ToLower(line), "a=rtpmap:") { + continue + } + // Example: a=rtpmap:96 VP9/90000 + after := strings.TrimSpace(line[len("a=rtpmap:"):]) + fields := strings.Fields(after) + if len(fields) < 2 { + continue + } + ptStr := fields[0] + codec := strings.ToUpper(fields[1]) + parts := strings.Split(codec, "/") + name := parts[0] + if name != chosen { + continue + } + if v, err := strconv.Atoi(ptStr); err == nil { + payload = v + } + if len(parts) >= 2 { + if v, err := strconv.Atoi(parts[1]); err == nil { + clockRate = v + } + } + break + } + + return +} + +func (r *CursorGstreamerWebmRecorder) OnRTP(packet *rtp.Packet) error { + // Marshal RTP packet + buf, err := packet.Marshal() + if err != nil { + return err + } + + return r.PushRtpBuf(buf) +} + +func (r *CursorGstreamerWebmRecorder) PushRtpBuf(buf []byte) error { + r.mu.Lock() + defer r.mu.Unlock() + + // Send RTP packet over TCP using RFC4571 2-byte length prefix + if r.rtpConn != nil { + if len(buf) > 0xFFFF { + return fmt.Errorf("rtp packet too large for TCP framing: %d bytes", len(buf)) + } + header := make([]byte, 2) + binary.BigEndian.PutUint16(header, uint16(len(buf))) + if _, err := r.rtpConn.Write(header); err != nil { + r.logger.Warn("Failed to write RTP length header: %v", err) + return err + } + if _, err := r.rtpConn.Write(buf); err != nil { + r.logger.Warn("Failed to write RTP packet: %v", err) + return err + } + } + return nil +} + +func (r *CursorGstreamerWebmRecorder) Close() error { + r.mu.Lock() + defer r.mu.Unlock() + + r.logger.Info("Closing GStreamer WebM recorder...") + + r.logger.Info("Closing GStreamer WebM recorder2222...") + + // Cancel context to stop background goroutines + if r.cancel != nil { + r.cancel() + } + + // Close TCP connection + if r.rtpConn != nil { + r.logger.Info("Closing TCP connection...") + _ = r.rtpConn.Close() + r.rtpConn = nil + r.logger.Info("TCP connection closed") + } + + // Gracefully stop GStreamer + if r.gstreamerCmd != nil && r.gstreamerCmd.Process != nil { + r.logger.Info("Stopping GStreamer process...") + + // Send EOS (End of Stream) signal to GStreamer + // GStreamer handles SIGINT gracefully and will finish writing the file + if err := r.gstreamerCmd.Process.Signal(os.Interrupt); err != nil { + r.logger.Error("Failed to send SIGINT to GStreamer: %v", err) + // If interrupt fails, force kill + r.gstreamerCmd.Process.Kill() + } else { + r.logger.Info("Sent SIGINT to GStreamer, waiting for graceful exit...") + + // Wait for graceful exit with timeout + done := make(chan error, 1) + go func() { + done <- r.gstreamerCmd.Wait() + }() + + select { + case <-time.After(15 * time.Second): + r.logger.Info("GStreamer exit timeout, force killing...") + // Timeout, force kill + r.gstreamerCmd.Process.Kill() + <-done // Wait for the kill to complete + case err := <-done: + if err != nil { + r.logger.Info("GStreamer exited with error: %v", err) + } else { + r.logger.Info("GStreamer exited gracefully") + } + } + } + } + + // Clean up temporary SDP file + if r.sdpFile != nil { + os.Remove(r.sdpFile.Name()) + r.sdpFile = nil + } + + // Post-process WebM to fix duration metadata if needed + if r.tempOutputPath != "" && r.finalOutputPath != "" { + r.logger.Info("Starting WebM duration post-processing...") + } + + r.logger.Info("GStreamer WebM recorder closed") + return nil +} + +// GetOutputPath returns the output file path (for compatibility) +func (r *CursorGstreamerWebmRecorder) GetOutputPath() string { + // Return final output path if post-processing is enabled, otherwise return original + if r.finalOutputPath != "" { + return r.finalOutputPath + } + return r.outputPath +} + +// IsRecording returns true if the recorder is currently active +func (r *CursorGstreamerWebmRecorder) IsRecording() bool { + r.mu.Lock() + defer r.mu.Unlock() + + return r.gstreamerCmd != nil && r.gstreamerCmd.Process != nil +} diff --git a/processing/sdp_tool.go b/processing/sdp_tool.go new file mode 100644 index 0000000..ed61c08 --- /dev/null +++ b/processing/sdp_tool.go @@ -0,0 +1,55 @@ +package processing + +import ( + "fmt" + "os" + "strings" + + "github.com/pion/webrtc/v4" +) + +func readSDP(sdpFilePath string) (string, error) { + content, err := os.ReadFile(sdpFilePath) + if err != nil { + return "", fmt.Errorf("failed to read SDP file %s: %w", sdpFilePath, err) + } + return string(content), nil +} + +func replaceSDP(sdpContent string, port int) string { + lines := strings.Split(sdpContent, "\n") + for i, line := range lines { + if strings.HasPrefix(line, "m=") { + // Parse the m= line: m= RTP/AVP + parts := strings.Fields(line) + if len(parts) >= 4 { + // Replace the port (second field) + parts[1] = fmt.Sprintf("%d", port) + lines[i] = strings.Join(parts, " ") + break + } + } + } + return strings.Join(lines, "\n") +} + +func mimeType(sdp string) (string, error) { + upper := strings.ToUpper(sdp) + if strings.Contains(upper, "VP9") { + return webrtc.MimeTypeVP9, nil + } + if strings.Contains(upper, "VP8") { + return webrtc.MimeTypeVP8, nil + } + if strings.Contains(upper, "AV1") { + return webrtc.MimeTypeAV1, nil + } + if strings.Contains(upper, "OPUS") { + return webrtc.MimeTypeOpus, nil + } + if strings.Contains(upper, "H264") { + return webrtc.MimeTypeH264, nil + } + + return "", fmt.Errorf("mimeType should be OPUS, VP8, VP9, AV1, H264") +} diff --git a/processing/track_extractor.go b/processing/track_extractor.go new file mode 100644 index 0000000..4aba07c --- /dev/null +++ b/processing/track_extractor.go @@ -0,0 +1,127 @@ +package processing + +import ( + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/GetStream/getstream-go/v3" + "github.com/pion/webrtc/v4" +) + +// Generic track extraction function that works for both audio and video +func ExtractTracks(workingDir, outputDir, userID, sessionID, trackID string, metadata *RecordingMetadata, trackType, mediaFilter string, fillGaps, fixDtx bool, logger *getstream.DefaultLogger) error { + // Filter tracks to specified type only and apply hierarchical filtering + filteredTracks := FilterTracks(metadata.Tracks, userID, sessionID, trackID, trackType, mediaFilter) + if len(filteredTracks) == 0 { + logger.Warn("No %s tracks found matching the filter criteria", trackType) + return nil + } + + logger.Info("Found %d %s tracks to extract", len(filteredTracks), trackType) + + // Extract and convert each track + for i, track := range filteredTracks { + logger.Info("Processing %s track %d/%d: %s", trackType, i+1, len(filteredTracks), track.TrackID) + + err := extractSingleTrackWithOptions(workingDir, track, outputDir, trackType, fillGaps, fixDtx, logger) + if err != nil { + logger.Error("Failed to extract %s track %s: %v", trackType, track.TrackID, err) + continue + } + } + + return nil +} + +func extractSingleTrackWithOptions(inputPath string, track *TrackInfo, outputDir string, trackType string, fillGaps, fixDtx bool, logger *getstream.DefaultLogger) error { + accept := func(path string, info os.FileInfo) (*SegmentInfo, bool) { + for _, s := range track.Segments { + if strings.Contains(info.Name(), s.metadata.BaseFilename) { + if track.Codec == webrtc.MimeTypeH264 { + s.ContainerExt = Mp4 + } else { + s.ContainerExt = Webm + } + s.RtpDumpPath = path + s.SdpPath = strings.Replace(path, SuffixRtpDump, SuffixSdp, -1) + s.ContainerPath = strings.Replace(path, SuffixRtpDump, "."+s.ContainerExt, -1) + return s, true + } + } + return nil, false + } + + // Convert using the WebM converter + err := ConvertDirectory(inputPath, accept, fixDtx, logger) + if err != nil { + return fmt.Errorf("failed to convert %s track: %w", trackType, err) + } + + // Create segments with timing info and fill gaps + finalFile, err := processSegmentsWithGapFilling(track, trackType, outputDir, fillGaps, logger) + if err != nil { + return fmt.Errorf("failed to process segments with gap filling: %w", err) + } + + track.ConcatenatedContainerPath = finalFile + logger.Info("Successfully extracted %s track to: %s", trackType, finalFile) + return nil +} + +// processSegmentsWithGapFilling processes webm segments, fills gaps if requested, and concatenates into final file +func processSegmentsWithGapFilling(track *TrackInfo, trackType string, outputDir string, fillGaps bool, logger *getstream.DefaultLogger) (string, error) { + // Build list of files to concatenate (with optional gap fillers) + var filesToConcat []string + for i, segment := range track.Segments { + // Add the segment file + filesToConcat = append(filesToConcat, segment.ContainerPath) + + // Add gap filler if requested and there's a gap before the next segment + if fillGaps && i < track.SegmentCount-1 { + nextSegment := track.Segments[i+1] + gapDuration := nextSegment.FFMpegOffset + firstPacketNtpTimestamp(nextSegment.metadata) - lastPacketNtpTimestamp(segment.metadata) + + if gapDuration > 0 { // There's a gap + gapSeconds := float64(gapDuration) / 1000.0 + logger.Info("Detected %dms gap between segments, generating %s filler", gapDuration, trackType) + + // Create gap filler file + gapFilePath := filepath.Join(outputDir, fmt.Sprintf("gap_%s_%d.%s", trackType, i, segment.ContainerExt)) + + if trackType == "audio" { + err := generateSilence(gapFilePath, gapSeconds, logger) + if err != nil { + logger.Warn("Failed to generate silence, skipping gap: %v", err) + continue + } + } else if trackType == "video" { + // Use 720p quality as defaults + err := generateBlackVideo(gapFilePath, track.Codec, gapSeconds, 1280, 720, 30, logger) + if err != nil { + logger.Warn("Failed to generate black video, skipping gap: %v", err) + continue + } + } + + defer os.Remove(gapFilePath) + + filesToConcat = append(filesToConcat, gapFilePath) + } + } + } + + // Create final output file + finalName := fmt.Sprintf("%s_%s_%s_%s.%s", trackType, track.UserID, track.SessionID, track.TrackID, track.Segments[0].ContainerExt) + finalPath := filepath.Join(outputDir, finalName) + + // Concatenate all segments (with gap fillers if any) + err := concatFile(finalPath, filesToConcat, logger) + if err != nil { + return "", fmt.Errorf("failed to concatenate segments: %w", err) + } + + logger.Info("Successfully concatenated %d segments into %s (gap filled %t)", track.SegmentCount, finalPath, fillGaps) + return finalPath, nil +}