diff --git a/examples/motion_graphics_example.py b/examples/motion_graphics_example.py new file mode 100644 index 0000000..6e9b199 --- /dev/null +++ b/examples/motion_graphics_example.py @@ -0,0 +1,238 @@ +#!/usr/bin/env python3 +"""Example usage of motion graphics video pipeline. + +This example demonstrates how to use the motion graphics pipeline to create +videos from natural language prompts using HTML/GSAP animations. + +Requirements: + pip install praisonai-tools[video-motion] + playwright install chromium +""" + +import asyncio +import tempfile +from pathlib import Path + +try: + from praisonai_tools.video.motion_graphics import ( + create_motion_graphics_agent, + motion_graphics_team, + HtmlRenderBackend, + RenderOpts + ) + from praisonai_tools.tools.git_tools import GitTools +except ImportError as e: + print(f"Import error: {e}") + print("Please install with: pip install praisonai-tools[video-motion]") + exit(1) + + +async def example_basic_animation(): + """Example 1: Basic motion graphics with simple animations.""" + print("Example 1: Creating basic motion graphics animation...") + + with tempfile.TemporaryDirectory() as tmpdir: + workspace = Path(tmpdir) + + # Create a simple HTML composition + html_content = """ + + + + + + + +
+
Motion Graphics
+
Powered by PraisonAI
+
+ + + + + """ + + # Save HTML file + (workspace / "index.html").write_text(html_content) + + # Create backend and render + backend = HtmlRenderBackend() + + # Lint first + lint_result = await backend.lint(workspace) + print(f"Lint result: {'✓ Passed' if lint_result.ok else '✗ Failed'}") + if not lint_result.ok: + print(f" Errors: {', '.join(lint_result.messages)}") + + # Render video + if lint_result.ok: + render_opts = RenderOpts( + output_name="basic_animation.mp4", + fps=30, + quality="standard" + ) + + print("Rendering video... (this may take a moment)") + render_result = await backend.render(workspace, render_opts) + + if render_result.ok: + print(f"✓ Video rendered successfully!") + print(f" Output: {render_result.output_path}") + print(f" Size: {render_result.size_kb}KB") + else: + print(f"✗ Render failed: {render_result.stderr}") + + +async def example_git_tools(): + """Example 2: Using GitTools for code exploration.""" + print("\nExample 2: Using GitTools for safe git operations...") + + with tempfile.TemporaryDirectory() as tmpdir: + git_tools = GitTools(base_dir=tmpdir) + + print("Testing repository URL parsing...") + + # Test different repository formats + test_repos = [ + "octocat/Hello-World", # owner/repo format + "https://github.com/octocat/Hello-World.git", # HTTPS URL + ] + + for repo_input in test_repos: + try: + url, name = git_tools._parse_repo_input(repo_input) + print(f" {repo_input} -> URL: {url}, Name: {name}") + except Exception as e: + print(f" {repo_input} -> Error: {e}") + + print("\nTesting file path safety...") + + # Test file path validation + test_paths = [ + "README.md", # Safe + "src/main.py", # Safe + "../etc/passwd", # Unsafe + "../../secret.txt", # Unsafe + ] + + for path in test_paths: + try: + safe_path = git_tools._validate_file_path(path) + print(f" {path} -> ✓ Safe: {safe_path}") + except ValueError as e: + print(f" {path} -> ✗ Unsafe: {e}") + + +def example_agent_factory(): + """Example 3: Using motion graphics agent factory.""" + print("\nExample 3: Creating motion graphics agent...") + + try: + # This would normally require praisonaiagents to be installed + agent = create_motion_graphics_agent( + backend="html", + max_retries=3, + llm="claude-sonnet-4" + ) + + print("✓ Motion graphics agent created successfully!") + print(f" Backend: {agent._motion_graphics_backend.__class__.__name__}") + print(f" Workspace: {agent._motion_graphics_workspace}") + print(f" Max retries: {agent._motion_graphics_max_retries}") + + except ImportError as e: + print(f"⚠ Agent creation skipped: {e}") + print(" Install praisonaiagents to use agent features") + + +def example_team_preset(): + """Example 4: Using motion graphics team preset.""" + print("\nExample 4: Creating motion graphics team...") + + try: + team = motion_graphics_team( + research=True, + code_exploration=True, + backend="html" + ) + + print("✓ Motion graphics team created successfully!") + print(f" Agents: {[agent.name for agent in team.agents]}") + print(f" Leader: {team.leader.name}") + print(f" Workspace: {team._motion_graphics_workspace}") + + except ImportError as e: + print(f"⚠ Team creation skipped: {e}") + print(" Install praisonaiagents to use team features") + + +async def main(): + """Run all examples.""" + print("Motion Graphics Pipeline Examples") + print("=" * 50) + + # Example 1: Basic animation (requires Playwright) + try: + await example_basic_animation() + except ImportError as e: + print(f"Example 1 skipped: {e}") + print("Install with: pip install playwright && playwright install chromium") + except Exception as e: + print(f"Example 1 failed: {e}") + + # Example 2: GitTools (no extra dependencies) + await example_git_tools() + + # Example 3: Agent factory (requires praisonaiagents) + example_agent_factory() + + # Example 4: Team preset (requires praisonaiagents) + example_team_preset() + + print("\n" + "=" * 50) + print("Examples complete!") + print("\nNext steps:") + print("1. Install optional dependencies: pip install praisonai-tools[video-motion]") + print("2. Install Playwright: playwright install chromium") + print("3. Install praisonaiagents for agent features") + print("4. Try creating your own motion graphics!") + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/examples/motion_graphics_team.yaml b/examples/motion_graphics_team.yaml new file mode 100644 index 0000000..2f65554 --- /dev/null +++ b/examples/motion_graphics_team.yaml @@ -0,0 +1,118 @@ +# Motion Graphics Team Configuration +# This file shows how to configure a motion graphics team using YAML + +framework: praisonai +project_name: "Motion Graphics Pipeline" + +agents: + - name: "Coordinator" + role: "Team Leader" + backstory: | + You are the motion graphics team coordinator. Your role is to analyze incoming + requests, route them to appropriate specialists, coordinate between team members, + and validate final outputs with strict rules. + goal: | + Route requests effectively, coordinate team work, and ensure high-quality video + output with concrete file paths and proper error handling. + instructions: | + CRITICAL OUTPUT VALIDATION RULES: + - A render succeeded ONLY IF the reply contains a concrete file path AND no error indicators + - Never fabricate file paths or claim success without concrete evidence + - Surface all errors from the Animator + - Stop work after maximum retry budget is exceeded + + - name: "Researcher" + role: "Research Specialist" + backstory: | + You are a research specialist who gathers information about topics in user requests. + You search the web for relevant facts, concepts, and examples, then summarize findings + in a brief that the Animator can use for on-screen content. + goal: | + Provide focused research that can be turned into visual motion graphics elements. + tools: + - search_web + + - name: "CodeExplorer" + role: "Code Analysis Specialist" + backstory: | + You are a code exploration specialist who can clone and explore git repositories + on-demand. You read source code, understand implementations, and extract key + algorithms, data structures, and concepts for visualization. + goal: | + Extract essential programming concepts that can be animated visually. + tools: + - GitTools + instructions: | + IMPORTANT: You are READ-ONLY. Never write or modify code. + Always validate file paths to prevent directory traversal. + + - name: "Animator" + role: "Motion Graphics Creator" + backstory: | + You are a motion graphics specialist who creates HTML/CSS/GSAP compositions + that render to high-quality MP4 videos. You follow strict authoring guidelines + and iterate on render failures. + goal: | + Create deterministic, frame-perfect HTML/GSAP compositions for video export. + tools: + - FileTools + - RenderTools + instructions: | + Follow the motion graphics authoring skill guide precisely: + - Use deterministic animations only (no Math.random, no infinite loops) + - Export timelines as window.__timelines = [tl] + - Add data-duration attributes + - Use 1920x1080 viewport + - Test with lint and render tools + - Iterate on failures with maximum retry limit + +tasks: + - description: | + Create a 30-second motion graphics video explaining the concept requested by the user. + + Process: + 1. Coordinator routes to Researcher if topic research is needed + 2. Coordinator routes to CodeExplorer if code analysis is needed + 3. Coordinator always routes to Animator for final composition and rendering + 4. Animator creates HTML/GSAP composition following authoring guidelines + 5. Animator tests with lint and render tools + 6. Animator iterates on failures up to maximum retries + 7. Coordinator validates final output has concrete file path + + expected_output: | + A rendered MP4 file with: + - Concrete file path (e.g., /renders/video_123.mp4) + - Video duration matching user request + - High-quality motion graphics explaining the concept + - No render errors or fabricated paths + + agent: "Coordinator" + +# Team configuration +team: + type: "hierarchical" + manager: "Coordinator" + verbose: true + +# Tool configuration +tools: + - name: "GitTools" + config: + base_dir: "./repos" + + - name: "RenderTools" + config: + backend: "html" + workspace: "./renders" + max_retries: 3 + + - name: "FileTools" + config: + base_dir: "./workspace" + +# Output configuration +output: + format: "mp4" + quality: "standard" + fps: 30 + workspace: "./motion_graphics_output" \ No newline at end of file diff --git a/praisonai_tools/tools/git_tools.py b/praisonai_tools/tools/git_tools.py new file mode 100644 index 0000000..79847d5 --- /dev/null +++ b/praisonai_tools/tools/git_tools.py @@ -0,0 +1,494 @@ +"""Git tools for safe read-only repository operations.""" + +import os +import re +import subprocess +import tempfile +import urllib.parse +from pathlib import Path +from typing import Optional, List, Dict, Any + + +class GitTools: + """Git tools for safe read-only repository operations. + + Features: + - Clone repositories on-demand from GitHub URLs or owner/repo format + - Read-only git operations (no writes allowed) + - Path escape protection + - GitHub token support via environment variable + """ + + def __init__(self, base_dir: Optional[str] = None): + """Initialize GitTools. + + Args: + base_dir: Base directory for cloned repositories. + Defaults to temporary directory. + """ + if base_dir: + self.base_dir = Path(base_dir) + else: + self.base_dir = Path(tempfile.gettempdir()) / "praison_git_repos" + + self.base_dir.mkdir(parents=True, exist_ok=True) + + # GitHub token for private repos + self.github_token = os.getenv("GITHUB_ACCESS_TOKEN") + + def clone_repo(self, repo_url_or_name: str, branch: Optional[str] = None) -> str: + """Clone or update a repository. + + Args: + repo_url_or_name: Repository URL or GitHub owner/repo format + branch: Optional branch to checkout + + Returns: + Path to cloned repository + + Raises: + ValueError: If repo format is invalid + subprocess.CalledProcessError: If git operations fail + """ + # Parse repository URL or name + repo_url, repo_name = self._parse_repo_input(repo_url_or_name) + + # Safe repository path + repo_path = self._get_safe_repo_path(repo_name) + + if repo_path.exists(): + # Repository exists, pull latest changes + self._git_pull(repo_path, branch) + else: + # Clone repository + self._git_clone(repo_url, repo_path, branch) + + return str(repo_path) + + def list_repos(self) -> List[str]: + """List cloned repositories. + + Returns: + List of repository names + """ + if not self.base_dir.exists(): + return [] + + repos = [] + for item in self.base_dir.iterdir(): + if item.is_dir() and (item / ".git").exists(): + repos.append(item.name) + + return repos + + def repo_summary(self, repo_name: str) -> Dict[str, Any]: + """Get repository summary information. + + Args: + repo_name: Repository name + + Returns: + Dictionary with repository information + """ + repo_path = self._get_repo_path(repo_name) + + try: + # Get basic info + remote_url = self._run_git_command( + ["remote", "get-url", "origin"], repo_path + ).strip() + + current_branch = self._run_git_command( + ["branch", "--show-current"], repo_path + ).strip() + + # Get commit count + commit_count = self._run_git_command( + ["rev-list", "--count", "HEAD"], repo_path + ).strip() + + # Get last commit info + last_commit = self._run_git_command( + ["log", "-1", "--pretty=format:%H|%an|%ae|%ad|%s"], repo_path + ).strip() + + commit_parts = last_commit.split("|", 4) + + return { + "name": repo_name, + "remote_url": remote_url, + "current_branch": current_branch, + "commit_count": int(commit_count), + "last_commit": { + "hash": commit_parts[0] if len(commit_parts) > 0 else "", + "author_name": commit_parts[1] if len(commit_parts) > 1 else "", + "author_email": commit_parts[2] if len(commit_parts) > 2 else "", + "date": commit_parts[3] if len(commit_parts) > 3 else "", + "message": commit_parts[4] if len(commit_parts) > 4 else "", + } + } + + except subprocess.CalledProcessError as e: + raise RuntimeError(f"Failed to get repo summary: {e}") + + def git_log( + self, + repo_name: str, + max_count: int = 10, + since: Optional[str] = None, + path: Optional[str] = None + ) -> List[Dict[str, str]]: + """Get git log for repository. + + Args: + repo_name: Repository name + max_count: Maximum number of commits + since: Only commits after this date (e.g., "2024-01-01") + path: Only commits affecting this path + + Returns: + List of commit information + """ + repo_path = self._get_repo_path(repo_name) + + cmd = ["log", f"--max-count={max_count}", "--pretty=format:%H|%an|%ae|%ad|%s"] + + if since: + cmd.append(f"--since={since}") + + if path: + safe_path = self._validate_file_path(path) + cmd.extend(["--", safe_path]) + + try: + output = self._run_git_command(cmd, repo_path) + commits = [] + + for line in output.strip().split("\n"): + if not line: + continue + + parts = line.split("|", 4) + if len(parts) >= 5: + commits.append({ + "hash": parts[0], + "author_name": parts[1], + "author_email": parts[2], + "date": parts[3], + "message": parts[4], + }) + + return commits + + except subprocess.CalledProcessError as e: + raise RuntimeError(f"Failed to get git log: {e}") + + def git_diff( + self, + repo_name: str, + commit1: str, + commit2: Optional[str] = None, + path: Optional[str] = None + ) -> str: + """Get git diff between commits. + + Args: + repo_name: Repository name + commit1: First commit hash + commit2: Second commit hash (if None, compares with working tree) + path: Only diff for this path + + Returns: + Diff output + """ + repo_path = self._get_repo_path(repo_name) + + cmd = ["diff"] + + if commit2: + cmd.extend([commit1, commit2]) + else: + cmd.append(commit1) + + if path: + safe_path = self._validate_file_path(path) + cmd.extend(["--", safe_path]) + + try: + return self._run_git_command(cmd, repo_path) + except subprocess.CalledProcessError as e: + raise RuntimeError(f"Failed to get git diff: {e}") + + def git_show(self, repo_name: str, commit: str, path: Optional[str] = None) -> str: + """Show git commit or file content. + + Args: + repo_name: Repository name + commit: Commit hash + path: Optional file path + + Returns: + Commit or file content + """ + repo_path = self._get_repo_path(repo_name) + + if path: + safe_path = self._validate_file_path(path) + ref = f"{commit}:{safe_path}" + else: + ref = commit + + try: + return self._run_git_command(["show", ref], repo_path) + except subprocess.CalledProcessError as e: + raise RuntimeError(f"Failed to show git object: {e}") + + def git_blame(self, repo_name: str, file_path: str) -> List[Dict[str, str]]: + """Get git blame for a file. + + Args: + repo_name: Repository name + file_path: File path to blame + + Returns: + List of blame information for each line + """ + repo_path = self._get_repo_path(repo_name) + safe_path = self._validate_file_path(file_path) + + try: + output = self._run_git_command( + ["blame", "--porcelain", safe_path], repo_path + ) + + blame_data = [] + lines = output.split("\n") + i = 0 + + while i < len(lines): + line = lines[i] + if not line or line.startswith("\t"): + i += 1 + continue + + # Parse blame entry + parts = line.split(" ", 3) + if len(parts) >= 3: + commit_hash = parts[0] + line_num = int(parts[2]) + + # Find content line + content = "" + for j in range(i + 1, len(lines)): + if lines[j].startswith("\t"): + content = lines[j][1:] # Remove leading tab + break + + blame_data.append({ + "commit": commit_hash, + "line_number": line_num, + "content": content + }) + + i += 1 + + return blame_data + + except subprocess.CalledProcessError as e: + raise RuntimeError(f"Failed to get git blame: {e}") + + def git_branches(self, repo_name: str) -> List[str]: + """Get list of branches. + + Args: + repo_name: Repository name + + Returns: + List of branch names + """ + repo_path = self._get_repo_path(repo_name) + + try: + output = self._run_git_command(["branch", "-r"], repo_path) + branches = [] + + for line in output.strip().split("\n"): + if line and not "origin/HEAD" in line: + branch = line.strip().replace("origin/", "") + branches.append(branch) + + return branches + + except subprocess.CalledProcessError as e: + raise RuntimeError(f"Failed to get branches: {e}") + + def read_file(self, repo_name: str, file_path: str, commit: str = "HEAD") -> str: + """Read file content from repository. + + Args: + repo_name: Repository name + file_path: File path relative to repo root + commit: Commit hash or reference (default: HEAD) + + Returns: + File content + """ + safe_path = self._validate_file_path(file_path) + return self.git_show(repo_name, commit, safe_path) + + def get_github_remote(self, repo_name: str) -> Optional[Dict[str, str]]: + """Extract GitHub owner and repo from remote URL. + + Args: + repo_name: Repository name + + Returns: + Dict with 'owner' and 'repo' keys, or None if not GitHub + """ + try: + repo_path = self._get_repo_path(repo_name) + remote_url = self._run_git_command( + ["remote", "get-url", "origin"], repo_path + ).strip() + + # Parse GitHub URL + github_match = re.match( + r"(?:https://github\.com/|git@github\.com:)([^/]+)/([^/]+?)(?:\.git)?/?$", + remote_url + ) + + if github_match: + return { + "owner": github_match.group(1), + "repo": github_match.group(2) + } + + return None + + except subprocess.CalledProcessError: + return None + + def _parse_repo_input(self, repo_input: str) -> tuple[str, str]: + """Parse repository input to URL and name. + + Args: + repo_input: Repository URL or owner/repo format + + Returns: + Tuple of (url, name) + """ + if repo_input.startswith(("https://", "git@")): + # Full URL + repo_url = repo_input + + # Extract name from URL + if repo_input.startswith("https://github.com/"): + path_part = repo_input.replace("https://github.com/", "") + elif repo_input.startswith("git@github.com:"): + path_part = repo_input.replace("git@github.com:", "") + else: + raise ValueError("Only GitHub URLs are supported") + + path_part = path_part.rstrip("/").replace(".git", "") + repo_name = path_part.replace("/", "_") + + elif "/" in repo_input: + # owner/repo format + if repo_input.count("/") != 1: + raise ValueError("Invalid owner/repo format") + + owner, repo = repo_input.split("/") + repo_name = f"{owner}_{repo}" + + # Build GitHub URL with token if available + if self.github_token: + repo_url = f"https://{self.github_token}@github.com/{owner}/{repo}.git" + else: + repo_url = f"https://github.com/{owner}/{repo}.git" + else: + raise ValueError("Invalid repository format. Use 'owner/repo' or full URL") + + return repo_url, repo_name + + def _get_safe_repo_path(self, repo_name: str) -> Path: + """Get safe repository path, preventing path traversal.""" + # Sanitize repo name + safe_name = re.sub(r"[^a-zA-Z0-9_-]", "_", repo_name) + repo_path = self.base_dir / safe_name + + # Ensure path is under base_dir + if not repo_path.is_relative_to(self.base_dir): + raise ValueError("Invalid repository path") + + return repo_path + + def _get_repo_path(self, repo_name: str) -> Path: + """Get repository path and validate it exists.""" + repo_path = self._get_safe_repo_path(repo_name) + + if not repo_path.exists(): + raise FileNotFoundError(f"Repository not found: {repo_name}") + + if not (repo_path / ".git").exists(): + raise ValueError(f"Not a git repository: {repo_name}") + + return repo_path + + def _validate_file_path(self, file_path: str) -> str: + """Validate file path is safe (no path traversal).""" + # Check for absolute paths + if file_path.startswith("/"): + raise ValueError("Invalid file path") + + # Remove any leading slashes + safe_path = file_path.lstrip("/") + + # Check for path traversal attempts + if ".." in safe_path: + raise ValueError("Invalid file path") + + # Basic path validation - above checks already prevent most traversal attacks + + return safe_path + + def _git_clone(self, repo_url: str, repo_path: Path, branch: Optional[str] = None): + """Clone repository.""" + cmd = ["git", "clone"] + + if branch: + cmd.extend(["-b", branch]) + + cmd.extend([repo_url, str(repo_path)]) + + try: + subprocess.run(cmd, check=True, capture_output=True, text=True) + except subprocess.CalledProcessError as e: + raise RuntimeError(f"Failed to clone repository: {e.stderr}") + + def _git_pull(self, repo_path: Path, branch: Optional[str] = None): + """Pull latest changes.""" + if branch: + # Checkout specific branch + self._run_git_command(["checkout", branch], repo_path) + + # Pull latest changes + self._run_git_command(["pull"], repo_path) + + def _run_git_command(self, cmd: List[str], repo_path: Path) -> str: + """Run git command in repository directory.""" + full_cmd = ["git"] + cmd + + try: + result = subprocess.run( + full_cmd, + cwd=repo_path, + check=True, + capture_output=True, + text=True, + timeout=30 + ) + return result.stdout + except subprocess.CalledProcessError as e: + raise subprocess.CalledProcessError( + e.returncode, e.cmd, e.stdout, e.stderr + ) \ No newline at end of file diff --git a/praisonai_tools/video/__init__.py b/praisonai_tools/video/__init__.py index a1f7792..3a9f441 100644 --- a/praisonai_tools/video/__init__.py +++ b/praisonai_tools/video/__init__.py @@ -5,6 +5,7 @@ - Audio transcription with word-level timestamps - LLM-based content analysis and edit planning - FFmpeg-based video rendering +- Motion graphics creation with HTML/GSAP compositions Usage: # As CLI @@ -14,6 +15,7 @@ # As library from praisonai_tools.video import probe_video, transcribe_video, edit_video + from praisonai_tools.video.motion_graphics import motion_graphics_team """ # Lazy imports to avoid dependency issues when running standalone @@ -46,6 +48,9 @@ def __getattr__(name): elif name == "VideoEditResult": from .pipeline import VideoEditResult return VideoEditResult + elif name == "motion_graphics": + from . import motion_graphics + return motion_graphics raise AttributeError(f"module {__name__!r} has no attribute {name!r}") __all__ = [ @@ -58,4 +63,5 @@ def __getattr__(name): "render_video", "edit_video", "VideoEditResult", + "motion_graphics", ] diff --git a/praisonai_tools/video/motion_graphics/__init__.py b/praisonai_tools/video/motion_graphics/__init__.py new file mode 100644 index 0000000..490b31d --- /dev/null +++ b/praisonai_tools/video/motion_graphics/__init__.py @@ -0,0 +1,57 @@ +"""Motion Graphics Video Pipeline for PraisonAI. + +This module provides a programmatic motion-graphics video pipeline that can turn +natural-language prompts into short explainer MP4s without depending on paid +generative video APIs. The pipeline uses an agent-centric team approach where +a coordinator routes to specialists that research, read code, author HTML/CSS/JS +compositions with GSAP animations, and render to MP4 via a headless browser. + +Usage: + # Team preset + from praisonai_tools.video.motion_graphics import motion_graphics_team + + team = motion_graphics_team() + team.start("Animate Dijkstra's algorithm on a small weighted graph, 30s.") + + # Individual agent + from praisonai_tools.video.motion_graphics import create_motion_graphics_agent + + agent = create_motion_graphics_agent() + agent.start("Create an animation explaining the CAP theorem") +""" + +# Lazy imports to avoid dependency issues when running standalone +def __getattr__(name): + """Lazy load motion graphics tools.""" + if name == "RenderBackendProtocol": + from .protocols import RenderBackendProtocol + return RenderBackendProtocol + elif name == "RenderOpts": + from .protocols import RenderOpts + return RenderOpts + elif name == "RenderResult": + from .protocols import RenderResult + return RenderResult + elif name == "LintResult": + from .protocols import LintResult + return LintResult + elif name == "HtmlRenderBackend": + from .backend_html import HtmlRenderBackend + return HtmlRenderBackend + elif name == "create_motion_graphics_agent": + from .agent import create_motion_graphics_agent + return create_motion_graphics_agent + elif name == "motion_graphics_team": + from .team import motion_graphics_team + return motion_graphics_team + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + +__all__ = [ + "RenderBackendProtocol", + "RenderOpts", + "RenderResult", + "LintResult", + "HtmlRenderBackend", + "create_motion_graphics_agent", + "motion_graphics_team", +] \ No newline at end of file diff --git a/praisonai_tools/video/motion_graphics/_render_loop.py b/praisonai_tools/video/motion_graphics/_render_loop.py new file mode 100644 index 0000000..f1d69e6 --- /dev/null +++ b/praisonai_tools/video/motion_graphics/_render_loop.py @@ -0,0 +1,79 @@ +"""Render loop helper with bounded retries.""" + +import asyncio +from typing import Callable, Awaitable, Any +from .protocols import RenderResult + + +async def render_iterate( + write_fn: Callable[..., Awaitable[Any]], + lint_fn: Callable[..., Awaitable[Any]], + render_fn: Callable[..., Awaitable[RenderResult]], + patch_fn: Callable[[str], Awaitable[Any]], + max_retries: int = 3, + **kwargs +) -> RenderResult: + """Bounded write → lint → render → patch loop. + + This helper implements the standard motion graphics authoring loop: + 1. Write HTML/GSAP composition + 2. Lint for common issues + 3. Render to MP4 + 4. If render fails, patch the composition and retry + + Args: + write_fn: Function to write initial composition + lint_fn: Function to lint composition + render_fn: Function to render composition to MP4 + patch_fn: Function to patch composition based on error + max_retries: Maximum number of retry attempts + **kwargs: Arguments passed to write_fn + + Returns: + RenderResult with final render status + """ + last_error = "" + + for attempt in range(max_retries + 1): + try: + # Step 1: Write composition + if attempt == 0: + await write_fn(**kwargs) + + # Step 2: Lint composition + lint_result = await lint_fn() + if not lint_result.ok: + # Lint failed - try to patch + lint_error = "; ".join(lint_result.messages) + await patch_fn(f"Lint errors: {lint_error}") + continue + + # Step 3: Render composition + render_result = await render_fn() + + if render_result.ok: + # Success! + return render_result + + # Render failed - prepare for retry + last_error = render_result.stderr + + if attempt < max_retries: + # Try to patch based on error + await patch_fn(f"Render error: {last_error}") + + except Exception as e: + last_error = str(e) + + if attempt < max_retries: + # Try to patch based on exception + await patch_fn(f"Exception: {last_error}") + + # All retries exhausted + return RenderResult( + ok=False, + output_path=None, + bytes_=None, + stderr=f"Failed after {max_retries} retries. Last error: {last_error}", + size_kb=0 + ) \ No newline at end of file diff --git a/praisonai_tools/video/motion_graphics/agent.py b/praisonai_tools/video/motion_graphics/agent.py new file mode 100644 index 0000000..f73c56d --- /dev/null +++ b/praisonai_tools/video/motion_graphics/agent.py @@ -0,0 +1,224 @@ +"""Motion graphics agent factory.""" + +import tempfile +from pathlib import Path +from typing import Union, Any + +try: + from praisonaiagents import Agent + from praisonaiagents.tools import FileTools +except ImportError: + # Fallback for development + Agent = None + FileTools = None + +from .protocols import RenderBackendProtocol +from .backend_html import HtmlRenderBackend +from .skill import MOTION_GRAPHICS_SKILL + + +class RenderTools: + """Tools for motion graphics rendering.""" + + def __init__(self, backend: RenderBackendProtocol, workspace: Path, max_retries: int = 3): + self.backend = backend + self.workspace = workspace + self.max_retries = max_retries + + async def lint_composition(self, strict: bool = False) -> dict: + """Lint the motion graphics composition. + + Args: + strict: Enable strict linting rules + + Returns: + Dict with lint results + """ + result = await self.backend.lint(self.workspace, strict) + return { + "ok": result.ok, + "messages": result.messages, + "raw": result.raw + } + + async def render_composition( + self, + output_name: str = "video.mp4", + fps: int = 30, + quality: str = "standard" + ) -> dict: + """Render the motion graphics composition to MP4 with bounded retries. + + Args: + output_name: Output filename + fps: Frames per second + quality: Quality setting (draft/standard/high) + + Returns: + Dict with render results + """ + from .protocols import RenderOpts + + opts = RenderOpts( + output_name=output_name, + fps=fps, + quality=quality + ) + + # Direct render with bounded retries for transient failures + # Note: For composition authoring retries, the agent should use render_iterate + # with appropriate write/lint/patch functions + result = await self.backend.render(self.workspace, opts) + attempts = 1 + + while not result.ok and attempts < self.max_retries: + # Retry for transient failures (e.g., ffmpeg timeouts) + result = await self.backend.render(self.workspace, opts) + attempts += 1 + + return { + "ok": result.ok, + "output_path": str(result.output_path) if result.output_path else None, + "size_kb": result.size_kb, + "stderr": result.stderr, + "attempts": attempts, + "bytes": result.bytes_ + } + + async def render_with_bounded_retries( + self, + write_fn, + patch_fn, + output_name: str = "video.mp4", + fps: int = 30, + quality: str = "standard", + **kwargs + ) -> dict: + """Use render_iterate for full composition authoring with retries. + + Args: + write_fn: Function to write initial composition + patch_fn: Function to patch composition based on error + output_name: Output filename + fps: Frames per second + quality: Quality setting + **kwargs: Arguments passed to write_fn + + Returns: + Dict with render results + """ + from ._render_loop import render_iterate + from .protocols import RenderOpts + + opts = RenderOpts(output_name=output_name, fps=fps, quality=quality) + + async def lint_fn(): + return await self.backend.lint(self.workspace, strict=False) + + async def render_fn(): + return await self.backend.render(self.workspace, opts) + + result = await render_iterate( + write_fn=write_fn, + lint_fn=lint_fn, + render_fn=render_fn, + patch_fn=patch_fn, + max_retries=self.max_retries, + **kwargs + ) + + return { + "ok": result.ok, + "output_path": str(result.output_path) if result.output_path else None, + "size_kb": result.size_kb, + "stderr": result.stderr, + "bytes": result.bytes_ + } + + +def create_motion_graphics_agent( + *, + backend: Union[RenderBackendProtocol, str] = "html", + workspace: Union[str, Path] = None, + max_retries: int = 3, + llm: str = "claude-sonnet-4", + **agent_kwargs: Any +) -> Any: + """Create a motion graphics agent. + + Args: + backend: Render backend or backend name + workspace: Workspace directory for compositions + max_retries: Maximum render retry attempts + llm: LLM model to use + **agent_kwargs: Additional arguments for Agent constructor + + Returns: + Agent configured for motion graphics authoring + """ + if Agent is None: + raise ImportError( + "praisonaiagents not available. Install with: pip install praisonaiagents" + ) + + # Set up workspace + if workspace is None: + workspace = Path(tempfile.mkdtemp(prefix="motion_graphics_")) + else: + workspace = Path(workspace) + + workspace.mkdir(parents=True, exist_ok=True) + + # Resolve backend + render_backend = _resolve_backend(backend) + + # Base instructions for motion graphics authoring + base_instructions = f""" +You are a motion graphics specialist agent. Your role is to create HTML/CSS/GSAP +compositions that render to high-quality MP4 videos. + +Key responsibilities: +1. Author HTML files with GSAP animations based on user prompts +2. Follow the motion graphics authoring skill guide precisely +3. Test compositions with lint and render tools +4. Iterate on failures with a maximum of {max_retries} attempts +5. Return concrete file paths and render status + +CRITICAL OUTPUT VALIDATION: +- A render succeeded ONLY IF the output contains a concrete file path AND no error indicators +- Never fabricate file paths +- Always surface actual errors from the render backend +- Stop after {max_retries} failed attempts + +Workspace directory: {workspace} +""" + + # Create tools + file_tools = FileTools(base_dir=str(workspace)) + render_tools = RenderTools(render_backend, workspace, max_retries) + + # Create agent + agent = Agent( + instructions=base_instructions + "\n\n" + MOTION_GRAPHICS_SKILL, + tools=[file_tools, render_tools], + llm=llm, + **agent_kwargs + ) + + # Store additional attributes + agent._motion_graphics_backend = render_backend + agent._motion_graphics_workspace = workspace + agent._motion_graphics_max_retries = max_retries + + return agent + + +def _resolve_backend(backend: Union[RenderBackendProtocol, str]) -> RenderBackendProtocol: + """Resolve backend specification to backend instance.""" + if isinstance(backend, str): + if backend == "html": + return HtmlRenderBackend() + else: + raise ValueError(f"Unknown backend: {backend}") + + return backend \ No newline at end of file diff --git a/praisonai_tools/video/motion_graphics/backend_html.py b/praisonai_tools/video/motion_graphics/backend_html.py new file mode 100644 index 0000000..fed0f53 --- /dev/null +++ b/praisonai_tools/video/motion_graphics/backend_html.py @@ -0,0 +1,347 @@ +"""HTML/GSAP render backend using Playwright and FFmpeg.""" + +import asyncio +import json +import os +import shutil +import subprocess +import tempfile +from pathlib import Path +from typing import Optional + +try: + from playwright.async_api import async_playwright +except ImportError: + async_playwright = None + +try: + import imageio_ffmpeg +except ImportError: + imageio_ffmpeg = None + +from .protocols import RenderBackendProtocol, RenderOpts, RenderResult, LintResult + + +class HtmlRenderBackend: + """HTML/GSAP render backend using Playwright + FFmpeg. + + This backend: + 1. Runs Chromium headless to load HTML/GSAP compositions + 2. Drives GSAP timelines frame-by-frame via JavaScript + 3. Captures frames as images + 4. Encodes frames to MP4 using FFmpeg + + Security features: + - Network requests blocked except allowlisted GSAP CDN + - Workspace path validation to prevent escapes + - Subprocess timeout limits + """ + + def __init__(self, base_dir: Optional[Path] = None): + if async_playwright is None: + raise ImportError( + "Playwright not installed. Install with: pip install playwright" + ) + if imageio_ffmpeg is None: + raise ImportError( + "imageio-ffmpeg not installed. Install with: pip install imageio-ffmpeg" + ) + + # Set allowed base directory for workspace safety + if base_dir is None: + # Default to current working directory and temp directory as allowed roots + self._allowed_base = Path.cwd() + else: + self._allowed_base = Path(base_dir).resolve() + + async def lint(self, workspace: Path, strict: bool = False) -> LintResult: + """Lint HTML composition for common issues.""" + index_html = workspace / "index.html" + + if not index_html.exists(): + return LintResult( + ok=False, + messages=["index.html not found in workspace"], + raw="" + ) + + try: + content = index_html.read_text(encoding="utf-8") + except Exception as e: + return LintResult( + ok=False, + messages=[f"Failed to read index.html: {e}"], + raw="" + ) + + messages = [] + + # Check for required GSAP timeline setup + if "window.__timelines" not in content: + messages.append("Missing window.__timelines setup") + + # Check for required data attributes + if 'data-duration' not in content: + messages.append("Missing data-duration attribute on timeline elements") + + # Check for problematic patterns + if "Math.random" in content: + messages.append("Math.random() detected - animations must be deterministic") + + if "repeat: -1" in content: + messages.append("Infinite repeat detected (`repeat: -1`) — use finite repeat counts") + + if strict: + # Additional strict checks + if "visibility" in content or "display" in content: + messages.append("Avoid animating visibility/display properties - use opacity instead") + + return LintResult( + ok=len(messages) == 0, + messages=messages, + raw=content + ) + + async def render(self, workspace: Path, opts: RenderOpts) -> RenderResult: + """Render HTML composition to MP4.""" + # Validate workspace path + if not self._is_safe_workspace(workspace): + return RenderResult( + ok=False, + output_path=None, + bytes_=None, + stderr="Unsafe workspace path" + ) + + # Check if index.html exists + index_html = workspace / "index.html" + if not index_html.exists(): + return RenderResult( + ok=False, + output_path=None, + bytes_=None, + stderr="index.html not found in workspace" + ) + + try: + return await self._render_with_playwright(workspace, opts) + except Exception as e: + return RenderResult( + ok=False, + output_path=None, + bytes_=None, + stderr=str(e) + ) + + def _is_safe_workspace(self, workspace: Path) -> bool: + """Check if workspace path is safe (prevents path traversal).""" + try: + workspace_abs = workspace.resolve(strict=True) + allowed_base_abs = self._allowed_base.resolve() + + # Also allow temp directories + temp_base = Path(tempfile.gettempdir()).resolve() + + # Check if workspace is under allowed base or temp directory + try: + # Check if workspace is relative to allowed base + workspace_abs.relative_to(allowed_base_abs) + return True + except ValueError: + pass + + try: + # Check if workspace is relative to temp directory + workspace_abs.relative_to(temp_base) + return True + except ValueError: + pass + + return False + except (OSError, ValueError): + return False + + async def _render_with_playwright(self, workspace: Path, opts: RenderOpts) -> RenderResult: + """Render using Playwright + FFmpeg.""" + output_path = workspace / opts.output_name + + async with async_playwright() as p: + # Launch browser with security options + browser = await p.chromium.launch( + headless=True, + args=[ + "--disable-dev-shm-usage", + "--no-sandbox", + "--disable-setuid-sandbox", + "--disable-background-timer-throttling", + "--disable-backgrounding-occluded-windows", + "--disable-renderer-backgrounding" + ] + ) + + try: + page = await browser.new_page( + viewport={"width": 1920, "height": 1080} + ) + + # Block network requests except allowlisted domains + await page.route("**/*", self._handle_network_request) + + # Load the HTML file + file_url = f"file://{workspace.absolute()}/index.html" + await page.goto(file_url, wait_until="networkidle") + + # Wait for GSAP and timeline setup + await page.wait_for_function("window.__timelines", timeout=10000) + + # Get timeline duration + duration = await page.evaluate(""" + () => { + const timelines = window.__timelines; + if (!timelines || timelines.length === 0) return 0; + return Math.max(...timelines.map(tl => tl.duration())); + } + """) + + if duration <= 0: + raise ValueError("Invalid timeline duration") + + # Calculate frame count + frame_count = int(duration * opts.fps) + + # Create temporary directory for frames + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + frame_paths = [] + + # Capture frames + for frame in range(frame_count): + time = frame / opts.fps + + # Seek timeline to specific time + await page.evaluate(f""" + () => {{ + const timelines = window.__timelines; + timelines.forEach(tl => {{ + tl.seek({time}); + }}); + }} + """) + + # Wait a bit for animations to settle + await page.wait_for_timeout(50) + + # Capture frame + frame_path = temp_path / f"frame_{frame:06d}.png" + await page.screenshot(path=str(frame_path), full_page=True) + frame_paths.append(frame_path) + + # Encode to MP4 using FFmpeg + await self._encode_frames_to_mp4( + frame_paths, output_path, opts + ) + + finally: + await browser.close() + + # Read video bytes + video_bytes = None + size_kb = 0 + if output_path.exists(): + video_bytes = output_path.read_bytes() + size_kb = len(video_bytes) // 1024 + + return RenderResult( + ok=output_path.exists(), + output_path=output_path if output_path.exists() else None, + bytes_=video_bytes, + stderr="", + size_kb=size_kb + ) + + async def _handle_network_request(self, route, request): + """Handle network requests with allowlist.""" + url = request.url + + # Allow GSAP CDN + if "cdnjs.cloudflare.com" in url and "gsap" in url: + await route.continue_() + return + + # Allow local files + if url.startswith("file://"): + await route.continue_() + return + + # Block everything else + await route.abort() + + async def _encode_frames_to_mp4( + self, + frame_paths: list[Path], + output_path: Path, + opts: RenderOpts + ): + """Encode frame sequence to MP4 using FFmpeg.""" + if not frame_paths: + raise ValueError("No frames to encode") + + # Get FFmpeg path + ffmpeg_path = self._get_ffmpeg_path() + + # Build FFmpeg command + cmd = [ + ffmpeg_path, + "-y", # Overwrite output + "-framerate", str(opts.fps), + "-i", str(frame_paths[0].parent / "frame_%06d.png"), + "-c:v", "libx264", + "-preset", "fast", + "-crf", self._get_crf_for_quality(opts.quality), + "-pix_fmt", "yuv420p", + str(output_path) + ] + + # Run FFmpeg + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + + try: + stdout, stderr = await asyncio.wait_for( + process.communicate(), + timeout=opts.timeout + ) + except asyncio.TimeoutError: + process.kill() + raise RuntimeError("FFmpeg encoding timed out") + + if process.returncode != 0: + raise RuntimeError(f"FFmpeg failed: {stderr.decode()}") + + def _get_ffmpeg_path(self) -> str: + """Get FFmpeg executable path.""" + # Try imageio-ffmpeg first + if imageio_ffmpeg: + try: + return imageio_ffmpeg.get_ffmpeg_exe() + except Exception: + pass + + # Fallback to system FFmpeg + ffmpeg = shutil.which("ffmpeg") + if ffmpeg: + return ffmpeg + + raise FileNotFoundError("FFmpeg not found") + + def _get_crf_for_quality(self, quality: str) -> str: + """Get CRF value for quality setting.""" + quality_map = { + "draft": "28", + "standard": "23", + "high": "18" + } + return quality_map.get(quality, "23") \ No newline at end of file diff --git a/praisonai_tools/video/motion_graphics/protocols.py b/praisonai_tools/video/motion_graphics/protocols.py new file mode 100644 index 0000000..9608f3d --- /dev/null +++ b/praisonai_tools/video/motion_graphics/protocols.py @@ -0,0 +1,71 @@ +"""Protocols and data structures for motion graphics rendering.""" + +from typing import Protocol, runtime_checkable, Literal +from dataclasses import dataclass +from pathlib import Path + + +Quality = Literal["draft", "standard", "high"] +Format = Literal["mp4", "webm", "mov"] + + +@dataclass +class RenderOpts: + """Options for rendering motion graphics.""" + output_name: str = "video.mp4" + fps: int = 30 + quality: Quality = "standard" + format: Format = "mp4" + strict: bool = False + timeout: int = 300 + + +@dataclass +class LintResult: + """Result of linting motion graphics code.""" + ok: bool + messages: list[str] + raw: str = "" + + +@dataclass +class RenderResult: + """Result of rendering motion graphics.""" + ok: bool + output_path: Path | None + bytes_: bytes | None + stderr: str = "" + size_kb: int = 0 + + +@runtime_checkable +class RenderBackendProtocol(Protocol): + """Protocol for motion graphics render backends. + + This protocol defines the interface for different rendering engines + (HTML/GSAP, Manim, Remotion, etc.) to implement. + """ + + async def lint(self, workspace: Path, strict: bool = False) -> LintResult: + """Lint the motion graphics code in the workspace. + + Args: + workspace: Path to the workspace containing the motion graphics code + strict: If True, enforce stricter linting rules + + Returns: + LintResult with validation status and messages + """ + ... + + async def render(self, workspace: Path, opts: RenderOpts) -> RenderResult: + """Render motion graphics to video. + + Args: + workspace: Path to the workspace containing the motion graphics code + opts: Rendering options + + Returns: + RenderResult with video bytes and metadata + """ + ... \ No newline at end of file diff --git a/praisonai_tools/video/motion_graphics/skill.py b/praisonai_tools/video/motion_graphics/skill.py new file mode 100644 index 0000000..1161ba8 --- /dev/null +++ b/praisonai_tools/video/motion_graphics/skill.py @@ -0,0 +1,102 @@ +"""Compact HTML/GSAP authoring skill for motion graphics agents.""" + +MOTION_GRAPHICS_SKILL = """ +# Motion Graphics Authoring Skill + +You create deterministic, frame-perfect HTML/CSS/GSAP compositions for video export. + +## Required HTML Structure + +```html + + + + + + + +
+ +
+ + + + +``` + +## Critical Rules + +1. **Deterministic**: No Math.random(), Date.now(), or any non-deterministic functions +2. **Finite**: No infinite loops or repeat: -1 +3. **Timeline Export**: Must set window.__timelines = [tl] +4. **Duration Attribute**: Add data-duration to stage element +5. **Paused Timeline**: Always create with { paused: true } +6. **Fixed Viewport**: Use 1920x1080 stage size + +## Animation Guidelines + +- Use transform properties (x, y, scale, rotation) for smooth animation +- Avoid animating width, height, visibility, display +- Use opacity for fade effects +- Prefer ease functions: "power2.out", "back.out", "elastic.out" +- Chain animations with timeline positioning: "-=0.5", "+=1" +- Keep total duration under 60 seconds + +## Text Animations + +```javascript +// Split text for word/character animation +tl.to(".title .word", { + duration: 0.8, + y: 0, + opacity: 1, + stagger: 0.1, + ease: "power3.out" +}); +``` + +## Shape Animations + +```javascript +// Animate SVG paths +tl.fromTo("#path", + { drawSVG: "0%" }, + { duration: 2, drawSVG: "100%", ease: "power2.inOut" } +); +``` + +## Common Patterns + +- Fade in: `{ opacity: 0 }` to `{ opacity: 1 }` +- Slide in: `{ x: -100 }` to `{ x: 0 }` +- Scale up: `{ scale: 0 }` to `{ scale: 1 }` +- Stagger: Use stagger property for multiple elements + +## Performance + +- Keep element count under 100 +- Use CSS transforms, not absolute positioning +- Minimize DOM queries - store references +- Use timeline labels for complex sequences + +## Debugging + +- Add data-debug="true" to stage for visual timeline scrubber +- Use tl.duration() to verify total duration matches data-duration +- Test with tl.seek(time) at key moments + +Write complete, working HTML files. Test all animations before export. +""" \ No newline at end of file diff --git a/praisonai_tools/video/motion_graphics/team.py b/praisonai_tools/video/motion_graphics/team.py new file mode 100644 index 0000000..e20c0d7 --- /dev/null +++ b/praisonai_tools/video/motion_graphics/team.py @@ -0,0 +1,159 @@ +"""Motion graphics team preset.""" + +import tempfile +from pathlib import Path +from typing import Union, Any, Optional + +try: + from praisonaiagents import Agent, AgentTeam + from praisonaiagents.tools import FileTools, search_web +except ImportError: + # Fallback for development + Agent = None + AgentTeam = None + FileTools = None + search_web = None + +from .agent import create_motion_graphics_agent + + +def motion_graphics_team( + *, + research: bool = True, + code_exploration: bool = True, + backend: str = "html", + workspace: Union[str, Path] = None, + llm: str = "claude-sonnet-4", + **team_kwargs: Any +) -> Any: + """Create a motion graphics team preset. + + This team includes: + - Coordinator: Routes requests and validates outputs + - Researcher: Optional web search for content research + - CodeExplorer: Optional git repository exploration + - Animator: HTML/GSAP composition authoring and rendering + + Args: + research: Include research specialist + code_exploration: Include code exploration specialist + backend: Render backend name + workspace: Workspace directory + llm: LLM model to use + **team_kwargs: Additional arguments for AgentTeam + + Returns: + AgentTeam configured for motion graphics creation + """ + if AgentTeam is None: + raise ImportError( + "praisonaiagents not available. Install with: pip install praisonaiagents" + ) + + # Set up workspace + if workspace is None: + workspace = Path(tempfile.mkdtemp(prefix="motion_graphics_team_")) + else: + workspace = Path(workspace) + + workspace.mkdir(parents=True, exist_ok=True) + renders_dir = workspace / "renders" + renders_dir.mkdir(parents=True, exist_ok=True) + + # Create agents + agents = [] + + # Coordinator agent with output validation + coordinator = Agent( + name="coordinator", + instructions=f""" +You are the motion graphics team coordinator. Your role is to: + +1. Analyze incoming requests and route them to appropriate specialists +2. Coordinate between team members +3. Validate final outputs with strict rules +4. Return final results to users + +CRITICAL OUTPUT VALIDATION RULES: +- A render succeeded ONLY IF the reply contains a concrete file path under '{renders_dir}' AND no error indicators +- Never fabricate file paths or claim success without concrete evidence +- Surface all errors from the Animator +- Stop work after maximum retry budget is exceeded + +Routing guidelines: +- Use Researcher for gathering information about unfamiliar topics +- Use CodeExplorer for analyzing code repositories or programming concepts +- Always route to Animator for final HTML/GSAP authoring and rendering +- Validate that Animator provides concrete file paths before marking success + +Team workspace: {workspace} +""", + llm=llm + ) + agents.append(coordinator) + + # Optional researcher + if research and search_web is not None: + researcher = Agent( + name="researcher", + instructions=""" +You are a research specialist. Your role is to: + +1. Search the web for information about topics in user requests +2. Gather relevant facts, concepts, and examples +3. Summarize findings in a brief that the Animator can use for on-screen content +4. Focus on visual concepts and explanations that work well in motion graphics + +Keep research focused and concise - aim for key points that can become visual elements. +""", + tools=[search_web], + llm=llm + ) + agents.append(researcher) + + # Optional code explorer + if code_exploration: + from ...tools.git_tools import GitTools + + code_explorer = Agent( + name="code_explorer", + instructions=""" +You are a code exploration specialist. Your role is to: + +1. Clone and explore git repositories on-demand +2. Read source code and understand implementations +3. Extract key algorithms, data structures, and concepts +4. Provide code walkthroughs that the Animator can visualize + +IMPORTANT: You are READ-ONLY. Never write or modify code. +Always validate file paths to prevent directory traversal. + +Focus on extracting the essential concepts that can be animated visually. +""", + tools=[GitTools(base_dir=str(workspace / "repos"))], + llm=llm + ) + agents.append(code_explorer) + + # Animator - the core specialist + animator = create_motion_graphics_agent( + backend=backend, + workspace=renders_dir, + llm=llm + ) + animator.name = "animator" + agents.append(animator) + + # Create team with coordinator as manager via hierarchical process + team = AgentTeam( + agents=agents, + process="hierarchical", + manager_llm=llm, + **team_kwargs + ) + + # Store team metadata + team._motion_graphics_workspace = workspace + team._motion_graphics_backend = backend + + return team \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index ca12859..7ca2f13 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,6 +57,10 @@ n8n = [ langextract = [ "langextract>=0.1.0", ] +video-motion = [ + "playwright>=1.40", + "imageio-ffmpeg>=0.5", +] [project.urls] Homepage = "https://docs.praison.ai" diff --git a/tests/integration/test_motion_graphics_team.py b/tests/integration/test_motion_graphics_team.py new file mode 100644 index 0000000..4795480 --- /dev/null +++ b/tests/integration/test_motion_graphics_team.py @@ -0,0 +1,288 @@ +"""Integration tests for motion graphics team.""" + +import pytest +import tempfile +from pathlib import Path +from unittest.mock import Mock, patch + +from praisonai_tools.video.motion_graphics.team import motion_graphics_team + + +class MockAgent: + """Mock agent for testing.""" + + def __init__(self, name="", instructions="", tools=None, llm=""): + self.name = name + self.instructions = instructions + self.tools = tools or [] + self.llm = llm + + +class MockAgentTeam: + """Mock agent team for testing.""" + + def __init__(self, agents=None, leader=None, **kwargs): + self.agents = agents or [] + self.leader = leader + self.kwargs = kwargs + + +class MockSearch: + """Mock web search tool.""" + pass + + +class MockGitTools: + """Mock GitTools.""" + + def __init__(self, base_dir=""): + self.base_dir = base_dir + + +class TestMotionGraphicsTeam: + """Test motion graphics team preset.""" + + @patch('praisonai_tools.video.motion_graphics.team.AgentTeam', MockAgentTeam) + @patch('praisonai_tools.video.motion_graphics.team.Agent', MockAgent) + @patch('praisonai_tools.video.motion_graphics.team.search_web', MockSearch()) + def test_team_missing_praisonaiagents(self): + """Test team creation when praisonaiagents is not available.""" + with patch('praisonai_tools.video.motion_graphics.team.AgentTeam', None): + with pytest.raises(ImportError, match="praisonaiagents not available"): + motion_graphics_team() + + @patch('praisonai_tools.video.motion_graphics.team.AgentTeam', MockAgentTeam) + @patch('praisonai_tools.video.motion_graphics.team.Agent', MockAgent) + @patch('praisonai_tools.video.motion_graphics.team.search_web', MockSearch()) + @patch('praisonai_tools.video.motion_graphics.team.create_motion_graphics_agent') + def test_team_default_configuration(self, mock_create_agent): + """Test team with default configuration.""" + mock_animator = MockAgent(name="animator") + mock_create_agent.return_value = mock_animator + + with tempfile.TemporaryDirectory() as tmpdir: + team = motion_graphics_team(workspace=tmpdir) + + assert isinstance(team, MockAgentTeam) + assert len(team.agents) == 4 # coordinator, researcher, code_explorer, animator + + # Check coordinator + coordinator = team.agents[0] + assert coordinator.name == "coordinator" + assert "motion graphics team coordinator" in coordinator.instructions.lower() + + # Check researcher + researcher = team.agents[1] + assert researcher.name == "researcher" + assert "research specialist" in researcher.instructions.lower() + + # Check code explorer + code_explorer = team.agents[2] + assert code_explorer.name == "code_explorer" + assert "code exploration specialist" in researcher.instructions.lower() + + # Check animator + animator = team.agents[3] + assert animator is mock_animator + + @patch('praisonai_tools.video.motion_graphics.team.AgentTeam', MockAgentTeam) + @patch('praisonai_tools.video.motion_graphics.team.Agent', MockAgent) + @patch('praisonai_tools.video.motion_graphics.team.search_web', None) # No search available + @patch('praisonai_tools.video.motion_graphics.team.create_motion_graphics_agent') + def test_team_no_research(self, mock_create_agent): + """Test team creation without research capability.""" + mock_animator = MockAgent(name="animator") + mock_create_agent.return_value = mock_animator + + with tempfile.TemporaryDirectory() as tmpdir: + team = motion_graphics_team(research=False, workspace=tmpdir) + + # Should have coordinator, code_explorer, animator (no researcher) + assert len(team.agents) == 3 + agent_names = [agent.name for agent in team.agents] + assert "coordinator" in agent_names + assert "code_explorer" in agent_names + assert "animator" in agent_names + assert "researcher" not in agent_names + + @patch('praisonai_tools.video.motion_graphics.team.AgentTeam', MockAgentTeam) + @patch('praisonai_tools.video.motion_graphics.team.Agent', MockAgent) + @patch('praisonai_tools.video.motion_graphics.team.search_web', MockSearch()) + @patch('praisonai_tools.video.motion_graphics.team.create_motion_graphics_agent') + def test_team_no_code_exploration(self, mock_create_agent): + """Test team creation without code exploration.""" + mock_animator = MockAgent(name="animator") + mock_create_agent.return_value = mock_animator + + with tempfile.TemporaryDirectory() as tmpdir: + team = motion_graphics_team(code_exploration=False, workspace=tmpdir) + + # Should have coordinator, researcher, animator (no code_explorer) + assert len(team.agents) == 3 + agent_names = [agent.name for agent in team.agents] + assert "coordinator" in agent_names + assert "researcher" in agent_names + assert "animator" in agent_names + assert "code_explorer" not in agent_names + + @patch('praisonai_tools.video.motion_graphics.team.AgentTeam', MockAgentTeam) + @patch('praisonai_tools.video.motion_graphics.team.Agent', MockAgent) + @patch('praisonai_tools.video.motion_graphics.team.search_web', MockSearch()) + @patch('praisonai_tools.video.motion_graphics.team.create_motion_graphics_agent') + def test_team_minimal_configuration(self, mock_create_agent): + """Test team with minimal configuration.""" + mock_animator = MockAgent(name="animator") + mock_create_agent.return_value = mock_animator + + with tempfile.TemporaryDirectory() as tmpdir: + team = motion_graphics_team( + research=False, + code_exploration=False, + workspace=tmpdir + ) + + # Should have only coordinator and animator + assert len(team.agents) == 2 + agent_names = [agent.name for agent in team.agents] + assert "coordinator" in agent_names + assert "animator" in agent_names + + @patch('praisonai_tools.video.motion_graphics.team.AgentTeam', MockAgentTeam) + @patch('praisonai_tools.video.motion_graphics.team.Agent', MockAgent) + @patch('praisonai_tools.video.motion_graphics.team.search_web', MockSearch()) + @patch('praisonai_tools.video.motion_graphics.team.create_motion_graphics_agent') + def test_team_custom_parameters(self, mock_create_agent): + """Test team with custom parameters.""" + mock_animator = MockAgent(name="animator") + mock_create_agent.return_value = mock_animator + + with tempfile.TemporaryDirectory() as tmpdir: + team = motion_graphics_team( + workspace=tmpdir, + backend="html", + llm="gpt-4", + custom_param="test" + ) + + # Check that custom parameters are passed through + assert team.kwargs == {"custom_param": "test"} + + # Check that workspace is set + assert team._motion_graphics_workspace == Path(tmpdir) + assert team._motion_graphics_backend == "html" + + @patch('praisonai_tools.video.motion_graphics.team.AgentTeam', MockAgentTeam) + @patch('praisonai_tools.video.motion_graphics.team.Agent', MockAgent) + @patch('praisonai_tools.video.motion_graphics.team.search_web', MockSearch()) + @patch('praisonai_tools.video.motion_graphics.team.create_motion_graphics_agent') + def test_team_workspace_creation(self, mock_create_agent): + """Test team creates workspace directory.""" + mock_animator = MockAgent(name="animator") + mock_create_agent.return_value = mock_animator + + with tempfile.TemporaryDirectory() as tmpdir: + workspace_path = Path(tmpdir) / "custom_team_workspace" + + team = motion_graphics_team(workspace=workspace_path) + + assert workspace_path.exists() + assert team._motion_graphics_workspace == workspace_path + + @patch('praisonai_tools.video.motion_graphics.team.AgentTeam', MockAgentTeam) + @patch('praisonai_tools.video.motion_graphics.team.Agent', MockAgent) + @patch('praisonai_tools.video.motion_graphics.team.search_web', MockSearch()) + @patch('praisonai_tools.video.motion_graphics.team.create_motion_graphics_agent') + def test_team_auto_workspace(self, mock_create_agent): + """Test team creates workspace automatically.""" + mock_animator = MockAgent(name="animator") + mock_create_agent.return_value = mock_animator + + team = motion_graphics_team() + + # Should create workspace automatically + assert hasattr(team, '_motion_graphics_workspace') + assert team._motion_graphics_workspace.exists() + assert "motion_graphics_team_" in str(team._motion_graphics_workspace) + + @patch('praisonai_tools.video.motion_graphics.team.AgentTeam', MockAgentTeam) + @patch('praisonai_tools.video.motion_graphics.team.Agent', MockAgent) + @patch('praisonai_tools.video.motion_graphics.team.search_web', MockSearch()) + @patch('praisonai_tools.video.motion_graphics.team.create_motion_graphics_agent') + def test_coordinator_is_leader(self, mock_create_agent): + """Test that coordinator is set as team leader.""" + mock_animator = MockAgent(name="animator") + mock_create_agent.return_value = mock_animator + + with tempfile.TemporaryDirectory() as tmpdir: + team = motion_graphics_team(workspace=tmpdir) + + assert team.leader is not None + assert team.leader.name == "coordinator" + + @patch('praisonai_tools.video.motion_graphics.team.AgentTeam', MockAgentTeam) + @patch('praisonai_tools.video.motion_graphics.team.Agent', MockAgent) + @patch('praisonai_tools.video.motion_graphics.team.search_web', MockSearch()) + @patch('praisonai_tools.video.motion_graphics.team.create_motion_graphics_agent') + @patch('praisonai_tools.tools.git_tools.GitTools', MockGitTools) + def test_code_explorer_tools(self, mock_create_agent): + """Test code explorer gets GitTools.""" + mock_animator = MockAgent(name="animator") + mock_create_agent.return_value = mock_animator + + with tempfile.TemporaryDirectory() as tmpdir: + team = motion_graphics_team(workspace=tmpdir) + + # Find code explorer + code_explorer = None + for agent in team.agents: + if agent.name == "code_explorer": + code_explorer = agent + break + + assert code_explorer is not None + assert len(code_explorer.tools) == 1 + assert isinstance(code_explorer.tools[0], MockGitTools) + + @patch('praisonai_tools.video.motion_graphics.team.AgentTeam', MockAgentTeam) + @patch('praisonai_tools.video.motion_graphics.team.Agent', MockAgent) + @patch('praisonai_tools.video.motion_graphics.team.search_web', MockSearch()) + @patch('praisonai_tools.video.motion_graphics.team.create_motion_graphics_agent') + def test_researcher_tools(self, mock_create_agent): + """Test researcher gets search tools.""" + mock_animator = MockAgent(name="animator") + mock_create_agent.return_value = mock_animator + + with tempfile.TemporaryDirectory() as tmpdir: + team = motion_graphics_team(workspace=tmpdir) + + # Find researcher + researcher = None + for agent in team.agents: + if agent.name == "researcher": + researcher = agent + break + + assert researcher is not None + assert len(researcher.tools) == 1 + assert isinstance(researcher.tools[0], MockSearch) + + @patch('praisonai_tools.video.motion_graphics.team.AgentTeam', MockAgentTeam) + @patch('praisonai_tools.video.motion_graphics.team.Agent', MockAgent) + @patch('praisonai_tools.video.motion_graphics.team.search_web', MockSearch()) + @patch('praisonai_tools.video.motion_graphics.team.create_motion_graphics_agent') + def test_coordinator_instructions(self, mock_create_agent): + """Test coordinator has proper instructions.""" + mock_animator = MockAgent(name="animator") + mock_create_agent.return_value = mock_animator + + with tempfile.TemporaryDirectory() as tmpdir: + team = motion_graphics_team(workspace=tmpdir) + + coordinator = team.leader + instructions = coordinator.instructions.lower() + + assert "coordinator" in instructions + assert "output validation" in instructions + assert "never fabricate" in instructions + assert "concrete file path" in instructions + assert "route" in instructions or "routing" in instructions \ No newline at end of file diff --git a/tests/smoke/test_motion_graphics_smoke.py b/tests/smoke/test_motion_graphics_smoke.py new file mode 100644 index 0000000..6016fb9 --- /dev/null +++ b/tests/smoke/test_motion_graphics_smoke.py @@ -0,0 +1,287 @@ +"""Smoke tests for motion graphics module.""" + +import pytest +import tempfile +from pathlib import Path + +# Skip tests if dependencies not available +playwright_available = True +try: + import playwright +except ImportError: + playwright_available = False + +imageio_ffmpeg_available = True +try: + import imageio_ffmpeg +except ImportError: + imageio_ffmpeg_available = False + + +class TestMotionGraphicsImports: + """Test basic imports work.""" + + def test_import_protocols(self): + """Test importing protocols.""" + from praisonai_tools.video.motion_graphics import ( + RenderBackendProtocol, + RenderOpts, + RenderResult, + LintResult + ) + + assert RenderBackendProtocol is not None + assert RenderOpts is not None + assert RenderResult is not None + assert LintResult is not None + + def test_import_backend(self): + """Test importing HTML backend.""" + if not playwright_available or not imageio_ffmpeg_available: + pytest.skip("Playwright or imageio-ffmpeg not available") + + from praisonai_tools.video.motion_graphics import HtmlRenderBackend + + assert HtmlRenderBackend is not None + + def test_import_agent_factory(self): + """Test importing agent factory.""" + from praisonai_tools.video.motion_graphics import create_motion_graphics_agent + + assert create_motion_graphics_agent is not None + + def test_import_team_preset(self): + """Test importing team preset.""" + from praisonai_tools.video.motion_graphics import motion_graphics_team + + assert motion_graphics_team is not None + + def test_import_git_tools(self): + """Test importing GitTools.""" + from praisonai_tools.tools.git_tools import GitTools + + assert GitTools is not None + + +class TestProtocolsBasic: + """Test protocols work at basic level.""" + + def test_render_opts_creation(self): + """Test RenderOpts can be created.""" + from praisonai_tools.video.motion_graphics import RenderOpts + + opts = RenderOpts() + assert opts.output_name == "video.mp4" + assert opts.fps == 30 + + custom_opts = RenderOpts(output_name="custom.mp4", fps=60) + assert custom_opts.output_name == "custom.mp4" + assert custom_opts.fps == 60 + + def test_lint_result_creation(self): + """Test LintResult can be created.""" + from praisonai_tools.video.motion_graphics import LintResult + + result = LintResult(ok=True, messages=[]) + assert result.ok is True + assert result.messages == [] + + def test_render_result_creation(self): + """Test RenderResult can be created.""" + from praisonai_tools.video.motion_graphics import RenderResult + + result = RenderResult( + ok=True, + output_path=Path("/tmp/test.mp4"), + bytes_=b"test", + size_kb=1 + ) + assert result.ok is True + assert result.output_path == Path("/tmp/test.mp4") + + +class TestHtmlBackendBasic: + """Test HTML backend basic functionality.""" + + @pytest.mark.skipif( + not playwright_available or not imageio_ffmpeg_available, + reason="Playwright or imageio-ffmpeg not available" + ) + def test_backend_creation(self): + """Test HTML backend can be created.""" + from praisonai_tools.video.motion_graphics import HtmlRenderBackend + + backend = HtmlRenderBackend() + assert backend is not None + + @pytest.mark.skipif( + not playwright_available or not imageio_ffmpeg_available, + reason="Playwright or imageio-ffmpeg not available" + ) + @pytest.mark.asyncio + async def test_backend_lint_basic(self): + """Test backend linting works at basic level.""" + from praisonai_tools.video.motion_graphics import HtmlRenderBackend + + backend = HtmlRenderBackend() + + with tempfile.TemporaryDirectory() as tmpdir: + workspace = Path(tmpdir) + + # Test missing index.html + result = await backend.lint(workspace) + assert result.ok is False + assert "index.html not found" in result.messages[0] + + # Test with valid HTML + html_content = """ + + + +
+ + + + """ + (workspace / "index.html").write_text(html_content) + + result = await backend.lint(workspace) + assert result.ok is True + + +class TestGitToolsBasic: + """Test GitTools basic functionality.""" + + def test_git_tools_creation(self): + """Test GitTools can be created.""" + from praisonai_tools.tools.git_tools import GitTools + + with tempfile.TemporaryDirectory() as tmpdir: + git_tools = GitTools(base_dir=tmpdir) + assert git_tools.base_dir == Path(tmpdir) + + def test_git_tools_parse_repo(self): + """Test repository parsing.""" + from praisonai_tools.tools.git_tools import GitTools + + git_tools = GitTools() + + # Test owner/repo format + url, name = git_tools._parse_repo_input("owner/repo") + assert "github.com/owner/repo" in url + assert name == "owner_repo" + + # Test HTTPS URL + url, name = git_tools._parse_repo_input("https://github.com/owner/repo.git") + assert url == "https://github.com/owner/repo.git" + assert name == "owner_repo" + + def test_git_tools_safety(self): + """Test path safety features.""" + from praisonai_tools.tools.git_tools import GitTools + + git_tools = GitTools() + + # Test safe file paths + assert git_tools._validate_file_path("README.md") == "README.md" + assert git_tools._validate_file_path("src/main.py") == "src/main.py" + + # Test unsafe file paths + with pytest.raises(ValueError): + git_tools._validate_file_path("../etc/passwd") + + with pytest.raises(ValueError): + git_tools._validate_file_path("../../secret.txt") + + +class TestEndToEndBasic: + """Test basic end-to-end functionality.""" + + def test_lazy_imports_work(self): + """Test that lazy imports work correctly.""" + # This should not fail even if optional dependencies are missing + import praisonai_tools.video.motion_graphics + + # Test accessing attributes triggers lazy loading + try: + _ = praisonai_tools.video.motion_graphics.RenderOpts + protocols_available = True + except ImportError: + protocols_available = False + + # Protocols should always be available + assert protocols_available + + def test_package_structure(self): + """Test package structure is correct.""" + from praisonai_tools.video import motion_graphics + + # Check __all__ is defined + assert hasattr(motion_graphics, '__all__') + assert len(motion_graphics.__all__) > 0 + + # Check key exports are listed + assert 'RenderBackendProtocol' in motion_graphics.__all__ + assert 'RenderOpts' in motion_graphics.__all__ + assert 'create_motion_graphics_agent' in motion_graphics.__all__ + assert 'motion_graphics_team' in motion_graphics.__all__ + + @pytest.mark.skipif( + not playwright_available or not imageio_ffmpeg_available, + reason="Playwright or imageio-ffmpeg not available" + ) + def test_backend_protocol_compliance(self): + """Test backend implements protocol correctly.""" + from praisonai_tools.video.motion_graphics import ( + HtmlRenderBackend, + RenderBackendProtocol + ) + + backend = HtmlRenderBackend() + assert isinstance(backend, RenderBackendProtocol) + + def test_agent_factory_imports(self): + """Test agent factory can be imported.""" + from praisonai_tools.video.motion_graphics.agent import ( + create_motion_graphics_agent, + RenderTools, + _resolve_backend + ) + + assert create_motion_graphics_agent is not None + assert RenderTools is not None + assert _resolve_backend is not None + + def test_team_preset_imports(self): + """Test team preset can be imported.""" + from praisonai_tools.video.motion_graphics.team import motion_graphics_team + + assert motion_graphics_team is not None + + +class TestSkillContent: + """Test skill content is valid.""" + + def test_skill_import(self): + """Test skill can be imported.""" + from praisonai_tools.video.motion_graphics.skill import MOTION_GRAPHICS_SKILL + + assert MOTION_GRAPHICS_SKILL is not None + assert len(MOTION_GRAPHICS_SKILL) > 1000 # Should be substantial + + def test_skill_content(self): + """Test skill contains expected content.""" + from praisonai_tools.video.motion_graphics.skill import MOTION_GRAPHICS_SKILL + + skill = MOTION_GRAPHICS_SKILL.lower() + + # Should contain key concepts + assert "gsap" in skill + assert "timeline" in skill + assert "window.__timelines" in skill + assert "deterministic" in skill + assert "data-duration" in skill + assert "paused: true" in skill \ No newline at end of file diff --git a/tests/unit/tools/test_git_tools.py b/tests/unit/tools/test_git_tools.py new file mode 100644 index 0000000..c29c02d --- /dev/null +++ b/tests/unit/tools/test_git_tools.py @@ -0,0 +1,437 @@ +"""Unit tests for GitTools.""" + +import os +import pytest +import tempfile +import subprocess +from pathlib import Path +from unittest.mock import Mock, patch + +from praisonai_tools.tools.git_tools import GitTools + + +class TestGitToolsInit: + """Test GitTools initialization.""" + + def test_init_with_base_dir(self): + """Test initialization with custom base directory.""" + base_dir = "/tmp/custom_repos" + git_tools = GitTools(base_dir=base_dir) + + assert git_tools.base_dir == Path(base_dir) + + def test_init_without_base_dir(self): + """Test initialization without base directory.""" + git_tools = GitTools() + + # Should use temp directory + assert "praison_git_repos" in str(git_tools.base_dir) + assert git_tools.base_dir.exists() + + def test_init_with_github_token(self): + """Test initialization with GitHub token.""" + with patch.dict(os.environ, {"GITHUB_ACCESS_TOKEN": "test_token"}): + git_tools = GitTools() + assert git_tools.github_token == "test_token" + + def test_init_without_github_token(self): + """Test initialization without GitHub token.""" + with patch.dict(os.environ, {}, clear=True): + git_tools = GitTools() + assert git_tools.github_token is None + + +class TestParseRepoInput: + """Test repository input parsing.""" + + def setup_method(self): + """Set up test fixtures.""" + self.git_tools = GitTools() + + def test_parse_https_url(self): + """Test parsing HTTPS GitHub URL.""" + url = "https://github.com/owner/repo.git" + repo_url, repo_name = self.git_tools._parse_repo_input(url) + + assert repo_url == url + assert repo_name == "owner_repo" + + def test_parse_ssh_url(self): + """Test parsing SSH GitHub URL.""" + url = "git@github.com:owner/repo.git" + repo_url, repo_name = self.git_tools._parse_repo_input(url) + + assert repo_url == url + assert repo_name == "owner_repo" + + def test_parse_owner_repo_format(self): + """Test parsing owner/repo format.""" + repo_input = "owner/repo" + repo_url, repo_name = self.git_tools._parse_repo_input(repo_input) + + assert repo_url == "https://github.com/owner/repo.git" + assert repo_name == "owner_repo" + + def test_parse_owner_repo_with_token(self): + """Test parsing owner/repo format with token.""" + with patch.object(self.git_tools, 'github_token', "test_token"): + repo_input = "owner/repo" + repo_url, repo_name = self.git_tools._parse_repo_input(repo_input) + + assert repo_url == "https://test_token@github.com/owner/repo.git" + assert repo_name == "owner_repo" + + def test_parse_non_github_url(self): + """Test parsing non-GitHub URL raises error.""" + url = "https://gitlab.com/owner/repo.git" + + with pytest.raises(ValueError, match="Only GitHub URLs are supported"): + self.git_tools._parse_repo_input(url) + + def test_parse_invalid_owner_repo_format(self): + """Test parsing invalid owner/repo format raises error.""" + invalid_inputs = [ + "owner", # Missing repo + "owner/repo/extra", # Too many slashes + "", # Empty + ] + + for invalid_input in invalid_inputs: + with pytest.raises(ValueError): + self.git_tools._parse_repo_input(invalid_input) + + def test_parse_url_without_git_suffix(self): + """Test parsing URL without .git suffix.""" + url = "https://github.com/owner/repo" + repo_url, repo_name = self.git_tools._parse_repo_input(url) + + assert repo_url == url + assert repo_name == "owner_repo" + + +class TestSafety: + """Test safety features.""" + + def setup_method(self): + """Set up test fixtures.""" + self.git_tools = GitTools() + + def test_get_safe_repo_path(self): + """Test safe repository path generation.""" + repo_name = "owner_repo" + repo_path = self.git_tools._get_safe_repo_path(repo_name) + + assert repo_path.is_relative_to(self.git_tools.base_dir) + assert repo_path.name == repo_name + + def test_get_safe_repo_path_sanitizes_name(self): + """Test repository name sanitization.""" + repo_name = "owner@repo/with#special$chars" + repo_path = self.git_tools._get_safe_repo_path(repo_name) + + # Should sanitize special characters + assert "@" not in repo_path.name + assert "/" not in repo_path.name + assert "#" not in repo_path.name + assert "$" not in repo_path.name + + def test_validate_file_path_safe(self): + """Test file path validation for safe paths.""" + safe_paths = [ + "README.md", + "src/main.py", + "docs/api.md", + "tests/test_file.py" + ] + + for safe_path in safe_paths: + result = self.git_tools._validate_file_path(safe_path) + assert result == safe_path + + def test_validate_file_path_unsafe(self): + """Test file path validation for unsafe paths.""" + unsafe_paths = [ + "../etc/passwd", + "../../secret.txt", + "/absolute/path.txt", + "dir/../../../escape.txt" + ] + + for unsafe_path in unsafe_paths: + with pytest.raises(ValueError, match="Invalid file path"): + self.git_tools._validate_file_path(unsafe_path) + + def test_validate_file_path_rejects_absolute_paths(self): + """Test file path validation rejects absolute paths.""" + with pytest.raises(ValueError, match="Invalid file path"): + self.git_tools._validate_file_path("/src/main.py") + + def test_validate_file_path_accepts_relative_paths(self): + """Test file path validation accepts relative paths.""" + result = self.git_tools._validate_file_path("src/main.py") + assert result == "src/main.py" + + +class TestGitOperations: + """Test git operations.""" + + def setup_method(self): + """Set up test fixtures.""" + with tempfile.TemporaryDirectory() as tmpdir: + self.base_dir = tmpdir + self.git_tools = GitTools(base_dir=tmpdir) + + @patch('subprocess.run') + def test_git_clone(self, mock_run): + """Test git clone operation.""" + mock_run.return_value = Mock(returncode=0) + + repo_url = "https://github.com/owner/repo.git" + repo_path = Path(self.base_dir) / "test_repo" + + self.git_tools._git_clone(repo_url, repo_path) + + mock_run.assert_called_once() + call_args = mock_run.call_args[0][0] + assert call_args[:2] == ["git", "clone"] + assert repo_url in call_args + assert str(repo_path) in call_args + + @patch('subprocess.run') + def test_git_clone_with_branch(self, mock_run): + """Test git clone with specific branch.""" + mock_run.return_value = Mock(returncode=0) + + repo_url = "https://github.com/owner/repo.git" + repo_path = Path(self.base_dir) / "test_repo" + branch = "develop" + + self.git_tools._git_clone(repo_url, repo_path, branch) + + call_args = mock_run.call_args[0][0] + assert "-b" in call_args + assert branch in call_args + + @patch('subprocess.run') + def test_git_clone_failure(self, mock_run): + """Test git clone failure handling.""" + mock_run.side_effect = subprocess.CalledProcessError( + 1, ["git", "clone"], stderr="Repository not found" + ) + + repo_url = "https://github.com/owner/nonexistent.git" + repo_path = Path(self.base_dir) / "test_repo" + + with pytest.raises(RuntimeError, match="Failed to clone repository"): + self.git_tools._git_clone(repo_url, repo_path) + + @patch('subprocess.run') + def test_run_git_command_success(self, mock_run): + """Test successful git command execution.""" + mock_run.return_value = Mock( + returncode=0, + stdout="command output" + ) + + repo_path = Path(self.base_dir) / "test_repo" + result = self.git_tools._run_git_command(["status"], repo_path) + + assert result == "command output" + mock_run.assert_called_once() + call_args, call_kwargs = mock_run.call_args + assert call_args[0] == ["git", "status"] + assert call_kwargs["cwd"] == repo_path + + @patch('subprocess.run') + def test_run_git_command_failure(self, mock_run): + """Test git command failure handling.""" + mock_run.side_effect = subprocess.CalledProcessError( + 1, ["git", "status"], output="", stderr="not a git repository" + ) + + repo_path = Path(self.base_dir) / "test_repo" + + with pytest.raises(subprocess.CalledProcessError): + self.git_tools._run_git_command(["status"], repo_path) + + def test_get_repo_path_nonexistent(self): + """Test getting path for nonexistent repository.""" + with pytest.raises(FileNotFoundError, match="Repository not found"): + self.git_tools._get_repo_path("nonexistent_repo") + + +class TestRepositoryMethods: + """Test repository management methods.""" + + def setup_method(self): + """Set up test fixtures.""" + self.git_tools = GitTools() + + @patch.object(GitTools, '_parse_repo_input') + @patch.object(GitTools, '_get_safe_repo_path') + @patch.object(GitTools, '_git_clone') + @patch.object(GitTools, '_git_pull') + def test_clone_repo_new(self, mock_pull, mock_clone, mock_path, mock_parse): + """Test cloning new repository.""" + mock_parse.return_value = ("https://github.com/owner/repo.git", "owner_repo") + mock_path.return_value = Path("/tmp/owner_repo") + + # Repository doesn't exist + with patch.object(Path, 'exists', return_value=False): + result = self.git_tools.clone_repo("owner/repo") + + assert result == str(Path("/tmp/owner_repo")) + mock_clone.assert_called_once() + mock_pull.assert_not_called() + + @patch.object(GitTools, '_parse_repo_input') + @patch.object(GitTools, '_get_safe_repo_path') + @patch.object(GitTools, '_git_clone') + @patch.object(GitTools, '_git_pull') + def test_clone_repo_existing(self, mock_pull, mock_clone, mock_path, mock_parse): + """Test updating existing repository.""" + mock_parse.return_value = ("https://github.com/owner/repo.git", "owner_repo") + mock_path.return_value = Path("/tmp/owner_repo") + + # Repository exists + with patch.object(Path, 'exists', return_value=True): + result = self.git_tools.clone_repo("owner/repo") + + assert result == str(Path("/tmp/owner_repo")) + mock_clone.assert_not_called() + mock_pull.assert_called_once() + + def test_list_repos_empty(self): + """Test listing repositories when none exist.""" + with tempfile.TemporaryDirectory() as tmpdir: + git_tools = GitTools(base_dir=tmpdir) + repos = git_tools.list_repos() + assert repos == [] + + def test_list_repos_with_repos(self): + """Test listing repositories.""" + with tempfile.TemporaryDirectory() as tmpdir: + git_tools = GitTools(base_dir=tmpdir) + + # Create fake git repositories + (Path(tmpdir) / "repo1" / ".git").mkdir(parents=True) + (Path(tmpdir) / "repo2" / ".git").mkdir(parents=True) + (Path(tmpdir) / "not_a_repo").mkdir() # Should be ignored + + repos = git_tools.list_repos() + assert sorted(repos) == ["repo1", "repo2"] + + @patch.object(GitTools, '_run_git_command') + @patch.object(GitTools, '_get_repo_path') + def test_repo_summary(self, mock_get_path, mock_run_git): + """Test getting repository summary.""" + repo_path = Path("/tmp/test_repo") + mock_get_path.return_value = repo_path + + # Mock git command outputs + mock_run_git.side_effect = [ + "https://github.com/owner/repo.git", # remote url + "main", # current branch + "42", # commit count + "abc123|John Doe|john@example.com|2024-01-01|Initial commit" # last commit + ] + + summary = self.git_tools.repo_summary("test_repo") + + assert summary["name"] == "test_repo" + assert summary["remote_url"] == "https://github.com/owner/repo.git" + assert summary["current_branch"] == "main" + assert summary["commit_count"] == 42 + assert summary["last_commit"]["hash"] == "abc123" + assert summary["last_commit"]["author_name"] == "John Doe" + assert summary["last_commit"]["message"] == "Initial commit" + + @patch.object(GitTools, '_run_git_command') + @patch.object(GitTools, '_get_repo_path') + def test_git_log(self, mock_get_path, mock_run_git): + """Test git log retrieval.""" + repo_path = Path("/tmp/test_repo") + mock_get_path.return_value = repo_path + + mock_run_git.return_value = ( + "abc123|John Doe|john@example.com|2024-01-01|Initial commit\n" + "def456|Jane Doe|jane@example.com|2024-01-02|Second commit" + ) + + commits = self.git_tools.git_log("test_repo", max_count=2) + + assert len(commits) == 2 + assert commits[0]["hash"] == "abc123" + assert commits[0]["author_name"] == "John Doe" + assert commits[1]["hash"] == "def456" + assert commits[1]["author_name"] == "Jane Doe" + + @patch.object(GitTools, '_run_git_command') + @patch.object(GitTools, '_get_repo_path') + def test_git_diff(self, mock_get_path, mock_run_git): + """Test git diff retrieval.""" + repo_path = Path("/tmp/test_repo") + mock_get_path.return_value = repo_path + + mock_run_git.return_value = "diff output" + + result = self.git_tools.git_diff("test_repo", "abc123", "def456") + + assert result == "diff output" + mock_run_git.assert_called_once_with(["diff", "abc123", "def456"], repo_path) + + @patch.object(GitTools, '_run_git_command') + @patch.object(GitTools, '_get_repo_path') + def test_git_show(self, mock_get_path, mock_run_git): + """Test git show command.""" + repo_path = Path("/tmp/test_repo") + mock_get_path.return_value = repo_path + + mock_run_git.return_value = "file content" + + result = self.git_tools.git_show("test_repo", "abc123", "README.md") + + assert result == "file content" + mock_run_git.assert_called_once_with(["show", "abc123:README.md"], repo_path) + + @patch.object(GitTools, '_run_git_command') + @patch.object(GitTools, '_get_repo_path') + def test_git_branches(self, mock_get_path, mock_run_git): + """Test git branches listing.""" + repo_path = Path("/tmp/test_repo") + mock_get_path.return_value = repo_path + + mock_run_git.return_value = ( + " origin/main\n" + " origin/develop\n" + " origin/HEAD -> origin/main" + ) + + branches = self.git_tools.git_branches("test_repo") + + assert sorted(branches) == ["develop", "main"] + + @patch.object(GitTools, '_run_git_command') + @patch.object(GitTools, '_get_repo_path') + def test_get_github_remote(self, mock_get_path, mock_run_git): + """Test GitHub remote parsing.""" + repo_path = Path("/tmp/test_repo") + mock_get_path.return_value = repo_path + + # Test HTTPS URL + mock_run_git.return_value = "https://github.com/owner/repo.git" + result = self.git_tools.get_github_remote("test_repo") + + assert result == {"owner": "owner", "repo": "repo"} + + # Test SSH URL + mock_run_git.return_value = "git@github.com:owner/repo.git" + result = self.git_tools.get_github_remote("test_repo") + + assert result == {"owner": "owner", "repo": "repo"} + + # Test non-GitHub URL + mock_run_git.return_value = "https://gitlab.com/owner/repo.git" + result = self.git_tools.get_github_remote("test_repo") + + assert result is None \ No newline at end of file diff --git a/tests/unit/video/test_backend_safety.py b/tests/unit/video/test_backend_safety.py new file mode 100644 index 0000000..d7c3d1d --- /dev/null +++ b/tests/unit/video/test_backend_safety.py @@ -0,0 +1,69 @@ +"""Tests for backend workspace safety.""" + +import pytest +import tempfile +from pathlib import Path +from unittest.mock import Mock, AsyncMock + +try: + from praisonai_tools.video.motion_graphics.backend_html import HtmlRenderBackend + from praisonai_tools.video.motion_graphics.protocols import RenderOpts + BACKEND_AVAILABLE = True +except ImportError: + BACKEND_AVAILABLE = False + + +@pytest.mark.skipif(not BACKEND_AVAILABLE, reason="Motion graphics backend not available") +class TestBackendSafety: + """Test workspace safety features.""" + + def setup_method(self): + """Set up test fixtures.""" + self.temp_dir = Path(tempfile.mkdtemp()) + self.backend = HtmlRenderBackend(base_dir=self.temp_dir) + + def teardown_method(self): + """Clean up test fixtures.""" + import shutil + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_safe_workspace_under_base_dir(self): + """Test workspace under base directory is considered safe.""" + safe_workspace = self.temp_dir / "workspace" + safe_workspace.mkdir() + + assert self.backend._is_safe_workspace(safe_workspace) + + def test_unsafe_workspace_outside_base_dir(self): + """Test workspace outside base directory is rejected.""" + # Try to escape to parent directory + unsafe_workspace = self.temp_dir.parent / "escaped" + + assert not self.backend._is_safe_workspace(unsafe_workspace) + + def test_workspace_in_temp_dir_allowed(self): + """Test workspace in temp directory is allowed.""" + import tempfile + temp_workspace = Path(tempfile.gettempdir()) / "test_workspace_mg" + temp_workspace.mkdir(exist_ok=True) + try: + assert self.backend._is_safe_workspace(temp_workspace) + finally: + temp_workspace.rmdir() + + def test_nonexistent_workspace_rejected(self): + """Test non-existent workspace is rejected.""" + nonexistent = Path("/does/not/exist/workspace") + + assert not self.backend._is_safe_workspace(nonexistent) + + @pytest.mark.asyncio + async def test_unsafe_workspace_blocks_render(self): + """Test unsafe workspace blocks render operation.""" + unsafe_workspace = Path("/etc") # System directory + opts = RenderOpts(output_name="test.mp4", fps=30, quality="standard") + + result = await self.backend.render(unsafe_workspace, opts) + + assert not result.ok + assert "Unsafe workspace path" in result.stderr \ No newline at end of file diff --git a/tests/unit/video/test_html_backend.py b/tests/unit/video/test_html_backend.py new file mode 100644 index 0000000..0d8d828 --- /dev/null +++ b/tests/unit/video/test_html_backend.py @@ -0,0 +1,390 @@ +"""Unit tests for HTML render backend.""" + +import pytest +import tempfile +import asyncio +from pathlib import Path +from unittest.mock import Mock, patch, AsyncMock + +from praisonai_tools.video.motion_graphics.backend_html import HtmlRenderBackend +from praisonai_tools.video.motion_graphics.protocols import RenderOpts, LintResult, RenderResult + + +class TestHtmlRenderBackend: + """Test HTML render backend.""" + + def test_import_error_handling(self): + """Test that import errors are handled properly.""" + with patch('praisonai_tools.video.motion_graphics.backend_html.async_playwright', None): + with pytest.raises(ImportError, match="Playwright not installed"): + HtmlRenderBackend() + + with patch('praisonai_tools.video.motion_graphics.backend_html.imageio_ffmpeg', None): + with pytest.raises(ImportError, match="imageio-ffmpeg not installed"): + HtmlRenderBackend() + + def test_init_success(self): + """Test successful initialization.""" + with patch('praisonai_tools.video.motion_graphics.backend_html.async_playwright', Mock()): + with patch('praisonai_tools.video.motion_graphics.backend_html.imageio_ffmpeg', Mock()): + backend = HtmlRenderBackend() + assert backend is not None + + +class TestLinting: + """Test linting functionality.""" + + def setup_method(self): + """Set up test fixtures.""" + with patch('praisonai_tools.video.motion_graphics.backend_html.async_playwright', Mock()): + with patch('praisonai_tools.video.motion_graphics.backend_html.imageio_ffmpeg', Mock()): + self.backend = HtmlRenderBackend() + + @pytest.mark.asyncio + async def test_lint_missing_index_html(self): + """Test linting when index.html is missing.""" + with tempfile.TemporaryDirectory() as tmpdir: + workspace = Path(tmpdir) + result = await self.backend.lint(workspace) + + assert result.ok is False + assert "index.html not found" in result.messages[0] + + @pytest.mark.asyncio + async def test_lint_valid_composition(self): + """Test linting valid composition.""" + with tempfile.TemporaryDirectory() as tmpdir: + workspace = Path(tmpdir) + + # Create valid HTML + html_content = """ + + + + +
+ + + + """ + (workspace / "index.html").write_text(html_content) + + result = await self.backend.lint(workspace) + + assert result.ok is True + assert result.messages == [] + + @pytest.mark.asyncio + async def test_lint_missing_timeline_setup(self): + """Test linting when timeline setup is missing.""" + with tempfile.TemporaryDirectory() as tmpdir: + workspace = Path(tmpdir) + + # Create HTML without timeline setup + html_content = """ + + +
+ + """ + (workspace / "index.html").write_text(html_content) + + result = await self.backend.lint(workspace) + + assert result.ok is False + assert any("window.__timelines" in msg for msg in result.messages) + + @pytest.mark.asyncio + async def test_lint_missing_duration_attribute(self): + """Test linting when duration attribute is missing.""" + with tempfile.TemporaryDirectory() as tmpdir: + workspace = Path(tmpdir) + + # Create HTML without duration attribute + html_content = """ + + + +
+ + + + """ + (workspace / "index.html").write_text(html_content) + + result = await self.backend.lint(workspace) + + assert result.ok is False + assert any("data-duration" in msg for msg in result.messages) + + @pytest.mark.asyncio + async def test_lint_problematic_patterns(self): + """Test linting detects problematic patterns.""" + with tempfile.TemporaryDirectory() as tmpdir: + workspace = Path(tmpdir) + + # Create HTML with problematic patterns + html_content = """ + + + +
+ + + + """ + (workspace / "index.html").write_text(html_content) + + result = await self.backend.lint(workspace) + + assert result.ok is False + assert any("Math.random" in msg for msg in result.messages) + assert any("repeat: -1" in msg for msg in result.messages) + + @pytest.mark.asyncio + async def test_lint_strict_mode(self): + """Test strict linting mode.""" + with tempfile.TemporaryDirectory() as tmpdir: + workspace = Path(tmpdir) + + # Create HTML with visibility animation + html_content = """ + + + +
+ + + + """ + (workspace / "index.html").write_text(html_content) + + # Non-strict mode should pass + result = await self.backend.lint(workspace, strict=False) + assert not any("visibility" in msg for msg in result.messages) + + # Strict mode should fail + result = await self.backend.lint(workspace, strict=True) + assert any("visibility" in msg for msg in result.messages) + + @pytest.mark.asyncio + async def test_lint_read_error(self): + """Test linting when file cannot be read.""" + with tempfile.TemporaryDirectory() as tmpdir: + workspace = Path(tmpdir) + + # Create empty file that can't be read as UTF-8 + (workspace / "index.html").write_bytes(b'\xff\xfe\x00\x00') + + result = await self.backend.lint(workspace) + + assert result.ok is False + assert any("Failed to read" in msg for msg in result.messages) + + +class TestRendering: + """Test rendering functionality.""" + + def setup_method(self): + """Set up test fixtures.""" + with patch('praisonai_tools.video.motion_graphics.backend_html.async_playwright', Mock()): + with patch('praisonai_tools.video.motion_graphics.backend_html.imageio_ffmpeg', Mock()): + self.backend = HtmlRenderBackend() + + @pytest.mark.asyncio + async def test_render_missing_index_html(self): + """Test rendering when index.html is missing.""" + with tempfile.TemporaryDirectory() as tmpdir: + workspace = Path(tmpdir) + opts = RenderOpts() + + result = await self.backend.render(workspace, opts) + + assert result.ok is False + assert "index.html not found" in result.stderr + + @pytest.mark.asyncio + async def test_render_unsafe_workspace(self): + """Test rendering with unsafe workspace path.""" + # Create a mock for _is_safe_workspace that returns False + with patch.object(self.backend, '_is_safe_workspace', return_value=False): + workspace = Path("/tmp/test") + opts = RenderOpts() + + result = await self.backend.render(workspace, opts) + + assert result.ok is False + assert "Unsafe workspace path" in result.stderr + + def test_is_safe_workspace(self): + """Test workspace safety validation.""" + # Test with actual path that exists + with tempfile.TemporaryDirectory() as tmpdir: + workspace = Path(tmpdir) + assert self.backend._is_safe_workspace(workspace) is True + + # Test with subdirectory that exists + sub_workspace = Path(tmpdir) / "test_workspace" + sub_workspace.mkdir() + assert self.backend._is_safe_workspace(sub_workspace) is True + + def test_get_crf_for_quality(self): + """Test CRF value mapping for quality settings.""" + assert self.backend._get_crf_for_quality("draft") == "28" + assert self.backend._get_crf_for_quality("standard") == "23" + assert self.backend._get_crf_for_quality("high") == "18" + assert self.backend._get_crf_for_quality("unknown") == "23" # default + + @patch('shutil.which') + @patch('praisonai_tools.video.motion_graphics.backend_html.imageio_ffmpeg') + def test_get_ffmpeg_path(self, mock_imageio, mock_which): + """Test FFmpeg path resolution.""" + # Test imageio-ffmpeg path + mock_imageio.get_ffmpeg_exe.return_value = "/usr/local/bin/ffmpeg" + result = self.backend._get_ffmpeg_path() + assert result == "/usr/local/bin/ffmpeg" + + # Test system FFmpeg fallback + mock_imageio.get_ffmpeg_exe.side_effect = Exception("Not found") + mock_which.return_value = "/usr/bin/ffmpeg" + result = self.backend._get_ffmpeg_path() + assert result == "/usr/bin/ffmpeg" + + # Test FFmpeg not found + mock_which.return_value = None + with pytest.raises(FileNotFoundError, match="FFmpeg not found"): + self.backend._get_ffmpeg_path() + + @pytest.mark.asyncio + async def test_handle_network_request_gsap_allowed(self): + """Test that GSAP CDN requests are allowed.""" + mock_route = AsyncMock() + mock_request = Mock() + mock_request.url = "https://cdnjs.cloudflare.com/ajax/libs/gsap/3.12.2/gsap.min.js" + + await self.backend._handle_network_request(mock_route, mock_request) + + mock_route.continue_.assert_called_once() + mock_route.abort.assert_not_called() + + @pytest.mark.asyncio + async def test_handle_network_request_local_files_allowed(self): + """Test that local file requests are allowed.""" + mock_route = AsyncMock() + mock_request = Mock() + mock_request.url = "file:///tmp/test/index.html" + + await self.backend._handle_network_request(mock_route, mock_request) + + mock_route.continue_.assert_called_once() + mock_route.abort.assert_not_called() + + @pytest.mark.asyncio + async def test_handle_network_request_blocked(self): + """Test that other requests are blocked.""" + mock_route = AsyncMock() + mock_request = Mock() + mock_request.url = "https://malicious.com/script.js" + + await self.backend._handle_network_request(mock_route, mock_request) + + mock_route.continue_.assert_not_called() + mock_route.abort.assert_called_once() + + @pytest.mark.asyncio + async def test_encode_frames_no_frames(self): + """Test encoding with no frames raises error.""" + opts = RenderOpts() + output_path = Path("/tmp/test.mp4") + + with pytest.raises(ValueError, match="No frames to encode"): + await self.backend._encode_frames_to_mp4([], output_path, opts) + + @pytest.mark.asyncio + @patch('asyncio.create_subprocess_exec') + async def test_encode_frames_success(self, mock_subprocess): + """Test successful frame encoding.""" + # Setup mock subprocess + mock_process = AsyncMock() + mock_process.communicate.return_value = (b"stdout", b"stderr") + mock_process.returncode = 0 + mock_subprocess.return_value = mock_process + + with patch.object(self.backend, '_get_ffmpeg_path', return_value='ffmpeg'): + with tempfile.TemporaryDirectory() as tmpdir: + # Create fake frame files + frame_dir = Path(tmpdir) + frame_paths = [] + for i in range(3): + frame_path = frame_dir / f"frame_{i:06d}.png" + frame_path.write_bytes(b"fake png data") + frame_paths.append(frame_path) + + opts = RenderOpts(fps=30, quality="standard", timeout=300) + output_path = Path(tmpdir) / "test.mp4" + + await self.backend._encode_frames_to_mp4(frame_paths, output_path, opts) + + # Verify subprocess was called correctly + mock_subprocess.assert_called_once() + call_args = mock_subprocess.call_args[0] + + assert call_args[0] == 'ffmpeg' + assert '-y' in call_args + assert '-framerate' in call_args + assert '30' in call_args + assert '-crf' in call_args + assert '23' in call_args # standard quality + + @pytest.mark.asyncio + @patch('asyncio.create_subprocess_exec') + async def test_encode_frames_ffmpeg_failure(self, mock_subprocess): + """Test FFmpeg failure handling.""" + # Setup mock subprocess that fails + mock_process = AsyncMock() + mock_process.communicate.return_value = (b"stdout", b"encoding failed") + mock_process.returncode = 1 + mock_subprocess.return_value = mock_process + + with patch.object(self.backend, '_get_ffmpeg_path', return_value='ffmpeg'): + with tempfile.TemporaryDirectory() as tmpdir: + frame_path = Path(tmpdir) / "frame_000000.png" + frame_path.write_bytes(b"fake") + + opts = RenderOpts() + output_path = Path(tmpdir) / "test.mp4" + + with pytest.raises(RuntimeError, match="FFmpeg failed"): + await self.backend._encode_frames_to_mp4([frame_path], output_path, opts) + + @pytest.mark.asyncio + @patch('asyncio.create_subprocess_exec') + async def test_encode_frames_timeout(self, mock_subprocess): + """Test FFmpeg timeout handling.""" + # Setup mock subprocess that times out + mock_process = AsyncMock() + mock_process.communicate.side_effect = asyncio.TimeoutError() + mock_process.kill = Mock() + mock_subprocess.return_value = mock_process + + with patch.object(self.backend, '_get_ffmpeg_path', return_value='ffmpeg'): + with tempfile.TemporaryDirectory() as tmpdir: + frame_path = Path(tmpdir) / "frame_000000.png" + frame_path.write_bytes(b"fake") + + opts = RenderOpts(timeout=1) # Short timeout + output_path = Path(tmpdir) / "test.mp4" + + with pytest.raises(RuntimeError, match="FFmpeg encoding timed out"): + await self.backend._encode_frames_to_mp4([frame_path], output_path, opts) \ No newline at end of file diff --git a/tests/unit/video/test_motion_graphics_agent.py b/tests/unit/video/test_motion_graphics_agent.py new file mode 100644 index 0000000..9e5acaf --- /dev/null +++ b/tests/unit/video/test_motion_graphics_agent.py @@ -0,0 +1,220 @@ +"""Unit tests for motion graphics agent factory.""" + +import pytest +import tempfile +from pathlib import Path +from unittest.mock import Mock, patch, AsyncMock + +from praisonai_tools.video.motion_graphics.agent import ( + create_motion_graphics_agent, + RenderTools, + _resolve_backend +) +from praisonai_tools.video.motion_graphics.protocols import RenderOpts, RenderResult, LintResult + + +class MockAgent: + """Mock agent for testing.""" + + def __init__(self, instructions="", tools=None, llm="", **kwargs): + self.instructions = instructions + self.tools = tools or [] + self.llm = llm + self.kwargs = kwargs + + +class MockFileTools: + """Mock FileTools for testing.""" + + def __init__(self, base_dir=""): + self.base_dir = base_dir + + +class MockBackend: + """Mock render backend for testing.""" + + async def lint(self, workspace, strict=False): + return LintResult(ok=True, messages=[]) + + async def render(self, workspace, opts): + output_name = getattr(opts, 'output_name', 'test.mp4') + return RenderResult( + ok=True, + output_path=workspace / output_name, + bytes_=b"test video data", + size_kb=1024 + ) + + +class TestRenderTools: + """Test RenderTools class.""" + + def setup_method(self): + """Set up test fixtures.""" + self.backend = MockBackend() + self.workspace = Path("/tmp/test_workspace") + self.render_tools = RenderTools(self.backend, self.workspace) + + @pytest.mark.asyncio + async def test_lint_composition(self): + """Test linting composition.""" + result = await self.render_tools.lint_composition(strict=True) + + assert result["ok"] is True + assert result["messages"] == [] + assert "raw" in result + + @pytest.mark.asyncio + async def test_render_composition(self): + """Test rendering composition.""" + result = await self.render_tools.render_composition( + output_name="custom.mp4", + fps=60, + quality="high" + ) + + assert result["ok"] is True + assert "custom.mp4" in str(result["output_path"]) + assert result["size_kb"] == 1024 + assert result["bytes"] == b"test video data" + assert result["stderr"] == "" + + +class TestResolveBackend: + """Test backend resolution.""" + + def test_resolve_string_backend(self): + """Test resolving string backend specification.""" + backend = _resolve_backend("html") + + from praisonai_tools.video.motion_graphics.backend_html import HtmlRenderBackend + assert isinstance(backend, HtmlRenderBackend) + + def test_resolve_unknown_backend(self): + """Test resolving unknown backend raises error.""" + with pytest.raises(ValueError, match="Unknown backend"): + _resolve_backend("unknown") + + def test_resolve_backend_instance(self): + """Test resolving backend instance passes through.""" + mock_backend = MockBackend() + result = _resolve_backend(mock_backend) + + assert result is mock_backend + + +class TestCreateMotionGraphicsAgent: + """Test motion graphics agent factory.""" + + @patch('praisonai_tools.video.motion_graphics.agent.Agent', MockAgent) + @patch('praisonai_tools.video.motion_graphics.agent.FileTools', MockFileTools) + def test_create_agent_defaults(self): + """Test creating agent with default parameters.""" + with tempfile.TemporaryDirectory() as tmpdir: + agent = create_motion_graphics_agent(workspace=tmpdir) + + assert isinstance(agent, MockAgent) + assert agent.llm == "claude-sonnet-4" + assert len(agent.tools) == 2 # FileTools and RenderTools + assert "motion graphics specialist" in agent.instructions.lower() + + @patch('praisonai_tools.video.motion_graphics.agent.Agent', MockAgent) + @patch('praisonai_tools.video.motion_graphics.agent.FileTools', MockFileTools) + def test_create_agent_custom_params(self): + """Test creating agent with custom parameters.""" + with tempfile.TemporaryDirectory() as tmpdir: + agent = create_motion_graphics_agent( + workspace=tmpdir, + max_retries=5, + llm="gpt-4", + custom_param="test" + ) + + assert agent.llm == "gpt-4" + assert "5" in agent.instructions # max_retries mentioned + assert agent.kwargs == {"custom_param": "test"} + + @patch('praisonai_tools.video.motion_graphics.agent.Agent', None) + def test_create_agent_missing_praisonaiagents(self): + """Test creating agent when praisonaiagents is not available.""" + with pytest.raises(ImportError, match="praisonaiagents not available"): + create_motion_graphics_agent() + + @patch('praisonai_tools.video.motion_graphics.agent.Agent', MockAgent) + @patch('praisonai_tools.video.motion_graphics.agent.FileTools', MockFileTools) + def test_create_agent_auto_workspace(self): + """Test creating agent with automatic workspace creation.""" + agent = create_motion_graphics_agent() + + # Should create workspace automatically + assert hasattr(agent, '_motion_graphics_workspace') + assert agent._motion_graphics_workspace.exists() + + @patch('praisonai_tools.video.motion_graphics.agent.Agent', MockAgent) + @patch('praisonai_tools.video.motion_graphics.agent.FileTools', MockFileTools) + def test_create_agent_custom_backend(self): + """Test creating agent with custom backend.""" + mock_backend = MockBackend() + + with tempfile.TemporaryDirectory() as tmpdir: + agent = create_motion_graphics_agent( + workspace=tmpdir, + backend=mock_backend + ) + + assert agent._motion_graphics_backend is mock_backend + + @patch('praisonai_tools.video.motion_graphics.agent.Agent', MockAgent) + @patch('praisonai_tools.video.motion_graphics.agent.FileTools', MockFileTools) + def test_create_agent_workspace_creation(self): + """Test agent creates workspace directory.""" + with tempfile.TemporaryDirectory() as tmpdir: + workspace_path = Path(tmpdir) / "custom_workspace" + + agent = create_motion_graphics_agent(workspace=workspace_path) + + assert workspace_path.exists() + assert agent._motion_graphics_workspace == workspace_path + + @patch('praisonai_tools.video.motion_graphics.agent.Agent', MockAgent) + @patch('praisonai_tools.video.motion_graphics.agent.FileTools', MockFileTools) + def test_create_agent_skill_included(self): + """Test agent includes motion graphics skill.""" + with tempfile.TemporaryDirectory() as tmpdir: + agent = create_motion_graphics_agent(workspace=tmpdir) + + # Should include the skill content + assert "GSAP" in agent.instructions + assert "timeline" in agent.instructions + assert "window.__timelines" in agent.instructions + + @patch('praisonai_tools.video.motion_graphics.agent.Agent', MockAgent) + @patch('praisonai_tools.video.motion_graphics.agent.FileTools', MockFileTools) + def test_create_agent_output_validation(self): + """Test agent includes output validation rules.""" + with tempfile.TemporaryDirectory() as tmpdir: + agent = create_motion_graphics_agent(workspace=tmpdir, max_retries=3) + + # Should include strict output validation + assert "CRITICAL OUTPUT VALIDATION" in agent.instructions + assert "Never fabricate file paths" in agent.instructions + assert "3 failed attempts" in agent.instructions + + @patch('praisonai_tools.video.motion_graphics.agent.Agent', MockAgent) + @patch('praisonai_tools.video.motion_graphics.agent.FileTools', MockFileTools) + def test_agent_tools_configuration(self): + """Test agent tools are configured correctly.""" + with tempfile.TemporaryDirectory() as tmpdir: + agent = create_motion_graphics_agent(workspace=tmpdir) + + assert len(agent.tools) == 2 + + # Check FileTools + file_tools = agent.tools[0] + assert isinstance(file_tools, MockFileTools) + assert file_tools.base_dir == str(tmpdir) + + # Check RenderTools + render_tools = agent.tools[1] + assert isinstance(render_tools, RenderTools) + assert render_tools.workspace == Path(tmpdir) \ No newline at end of file diff --git a/tests/unit/video/test_motion_graphics_protocols.py b/tests/unit/video/test_motion_graphics_protocols.py new file mode 100644 index 0000000..ac93f34 --- /dev/null +++ b/tests/unit/video/test_motion_graphics_protocols.py @@ -0,0 +1,190 @@ +"""Unit tests for motion graphics protocols.""" + +import pytest +from pathlib import Path +from praisonai_tools.video.motion_graphics.protocols import ( + RenderOpts, + LintResult, + RenderResult, + RenderBackendProtocol +) + + +class TestRenderOpts: + """Test RenderOpts dataclass.""" + + def test_default_values(self): + """Test default values are set correctly.""" + opts = RenderOpts() + + assert opts.output_name == "video.mp4" + assert opts.fps == 30 + assert opts.quality == "standard" + assert opts.format == "mp4" + assert opts.strict is False + assert opts.timeout == 300 + + def test_custom_values(self): + """Test custom values are set correctly.""" + opts = RenderOpts( + output_name="custom.webm", + fps=60, + quality="high", + format="webm", + strict=True, + timeout=600 + ) + + assert opts.output_name == "custom.webm" + assert opts.fps == 60 + assert opts.quality == "high" + assert opts.format == "webm" + assert opts.strict is True + assert opts.timeout == 600 + + +class TestLintResult: + """Test LintResult dataclass.""" + + def test_success_result(self): + """Test successful lint result.""" + result = LintResult(ok=True, messages=[]) + + assert result.ok is True + assert result.messages == [] + assert result.raw == "" + + def test_failure_result(self): + """Test failed lint result.""" + messages = ["Error 1", "Error 2"] + raw_content = "test" + + result = LintResult( + ok=False, + messages=messages, + raw=raw_content + ) + + assert result.ok is False + assert result.messages == messages + assert result.raw == raw_content + + +class TestRenderResult: + """Test RenderResult dataclass.""" + + def test_success_result(self): + """Test successful render result.""" + output_path = Path("/tmp/video.mp4") + video_bytes = b"fake video data" + + result = RenderResult( + ok=True, + output_path=output_path, + bytes_=video_bytes, + size_kb=1024 + ) + + assert result.ok is True + assert result.output_path == output_path + assert result.bytes_ == video_bytes + assert result.stderr == "" + assert result.size_kb == 1024 + + def test_failure_result(self): + """Test failed render result.""" + stderr = "Render failed: invalid timeline" + + result = RenderResult( + ok=False, + output_path=None, + bytes_=None, + stderr=stderr + ) + + assert result.ok is False + assert result.output_path is None + assert result.bytes_ is None + assert result.stderr == stderr + assert result.size_kb == 0 + + +class TestRenderBackendProtocol: + """Test RenderBackendProtocol protocol.""" + + def test_protocol_runtime_checkable(self): + """Test that protocol is runtime checkable.""" + from praisonai_tools.video.motion_graphics.backend_html import HtmlRenderBackend + + backend = HtmlRenderBackend() + assert isinstance(backend, RenderBackendProtocol) + + def test_protocol_methods_exist(self): + """Test that protocol methods are properly defined.""" + # This should not raise an error if protocol is properly defined + assert hasattr(RenderBackendProtocol, 'lint') + assert hasattr(RenderBackendProtocol, 'render') + + +class MockRenderBackend: + """Mock render backend for testing.""" + + def __init__(self, lint_result=None, render_result=None): + self.lint_result = lint_result or LintResult(ok=True, messages=[]) + self.render_result = render_result or RenderResult( + ok=True, + output_path=Path("/tmp/test.mp4"), + bytes_=b"test", + size_kb=1 + ) + self.lint_calls = [] + self.render_calls = [] + + async def lint(self, workspace: Path, strict: bool = False) -> LintResult: + """Mock lint implementation.""" + self.lint_calls.append((workspace, strict)) + return self.lint_result + + async def render(self, workspace: Path, opts: RenderOpts) -> RenderResult: + """Mock render implementation.""" + self.render_calls.append((workspace, opts)) + return self.render_result + + +class TestMockRenderBackend: + """Test mock render backend.""" + + def test_mock_backend_protocol_compliance(self): + """Test that mock backend implements protocol.""" + backend = MockRenderBackend() + assert isinstance(backend, RenderBackendProtocol) + + @pytest.mark.asyncio + async def test_mock_backend_lint(self): + """Test mock backend lint method.""" + lint_result = LintResult(ok=False, messages=["test error"]) + backend = MockRenderBackend(lint_result=lint_result) + + workspace = Path("/tmp/test") + result = await backend.lint(workspace, strict=True) + + assert result == lint_result + assert backend.lint_calls == [(workspace, True)] + + @pytest.mark.asyncio + async def test_mock_backend_render(self): + """Test mock backend render method.""" + render_result = RenderResult( + ok=False, + output_path=None, + bytes_=None, + stderr="test error" + ) + backend = MockRenderBackend(render_result=render_result) + + workspace = Path("/tmp/test") + opts = RenderOpts(output_name="test.mp4") + result = await backend.render(workspace, opts) + + assert result == render_result + assert backend.render_calls == [(workspace, opts)] \ No newline at end of file diff --git a/tests/unit/video/test_render_retries.py b/tests/unit/video/test_render_retries.py new file mode 100644 index 0000000..66211d1 --- /dev/null +++ b/tests/unit/video/test_render_retries.py @@ -0,0 +1,108 @@ +"""Tests for render retry functionality.""" + +import pytest +import tempfile +from pathlib import Path +from unittest.mock import AsyncMock + +try: + from praisonai_tools.video.motion_graphics.agent import RenderTools + from praisonai_tools.video.motion_graphics.protocols import RenderResult, RenderOpts + AGENT_AVAILABLE = True +except ImportError: + AGENT_AVAILABLE = False + + +class FailingBackend: + """Mock backend that fails a specified number of times.""" + + def __init__(self, max_fails: int = 999): + self.max_fails = max_fails + self.call_count = 0 + + async def lint(self, workspace, strict=False): + from praisonai_tools.video.motion_graphics.protocols import LintResult + return LintResult(ok=True, messages=[], raw="") + + async def render(self, workspace, opts): + self.call_count += 1 + + if self.call_count <= self.max_fails: + return RenderResult( + ok=False, + output_path=None, + bytes_=None, + stderr=f"Simulated failure {self.call_count}", + size_kb=0 + ) + else: + return RenderResult( + ok=True, + output_path=workspace / opts.output_name, + bytes_=b"test video", + stderr="", + size_kb=1024 + ) + + +@pytest.mark.skipif(not AGENT_AVAILABLE, reason="Motion graphics agent not available") +class TestRenderRetries: + """Test bounded retry functionality.""" + + def setup_method(self): + """Set up test fixtures.""" + self.temp_dir = Path(tempfile.mkdtemp()) + + def teardown_method(self): + """Clean up test fixtures.""" + import shutil + shutil.rmtree(self.temp_dir, ignore_errors=True) + + @pytest.mark.asyncio + async def test_render_composition_bounded_retries(self): + """Test render_composition respects max_retries limit.""" + backend = FailingBackend(max_fails=999) # Always fail + tools = RenderTools(backend, self.temp_dir, max_retries=3) + + result = await tools.render_composition(output_name="test.mp4") + + assert backend.call_count == 3 # Should try exactly 3 times + assert not result["ok"] + assert "Simulated failure" in result["stderr"] + + @pytest.mark.asyncio + async def test_render_composition_succeeds_after_retries(self): + """Test render_composition succeeds after some failures.""" + backend = FailingBackend(max_fails=2) # Fail twice, then succeed + tools = RenderTools(backend, self.temp_dir, max_retries=3) + + result = await tools.render_composition(output_name="test.mp4") + + assert backend.call_count == 3 # Fail twice, succeed on third + assert result["ok"] + assert result["attempts"] == 3 + + @pytest.mark.asyncio + async def test_render_with_bounded_retries_integration(self): + """Test render_with_bounded_retries with write/patch functions.""" + backend = FailingBackend(max_fails=1) # Fail once, then succeed + tools = RenderTools(backend, self.temp_dir, max_retries=2) + + write_calls = [] + patch_calls = [] + + async def mock_write_fn(**kwargs): + write_calls.append(kwargs) + + async def mock_patch_fn(error): + patch_calls.append(error) + + result = await tools.render_with_bounded_retries( + write_fn=mock_write_fn, + patch_fn=mock_patch_fn, + output_name="test.mp4" + ) + + assert len(write_calls) == 1 # write_fn called once initially + assert len(patch_calls) == 1 # patch_fn called once for first failure + assert result["ok"] # Eventually succeeds \ No newline at end of file