A reusable library for building intelligent agents with safe code execution capabilities using PydanticAI, Temporal workflows, and Docker sandboxes.
temporal.pydanticai.codeact is a Python library that combines three powerful technologies to create AI agents that can write and execute code safely in isolated environments with persistent state:
- PydanticAI (v1.27.0+) - Type-safe agent framework for building production-grade GenAI applications
- Temporal (v1.19.0+) - Workflow orchestration for reliable, durable, long-running agent processes
- Docker - Containerized execution environment for secure code isolation
✨ Crash-Resistant Persistence - Docker volumes ensure Python variables and files survive crashes and restarts 🔒 Sandboxed Security - All code runs in isolated Docker containers with resource limits 🔄 Durable Workflows - Temporal ensures reliable execution with automatic retries and recovery 🎯 Type-Safe - Full Pydantic validation for all inputs and outputs 🛠️ Flexible Tools - Agents can execute Python, run bash commands, manage files, and query state 📦 Dynamic Packages - Install Python and system packages on-demand during execution 🔌 MCP Integration - Native support for Model Context Protocol servers as agent tools ⚙️ Custom Functions - Serialize and inject reusable functions with automatic dependency detection 🌐 Multi-Host Support - Optional NFS volumes for shared state across multiple workers 🎨 Extensible Design - Easy to create custom agents with specialized capabilities
- Python 3.13+
- Docker (running daemon)
- Temporal Server (local dev server or cloud)
- uv package manager
Install the published library in your project:
# Install from PyPI (when published)
pip install temporal-pydanticai-codeact
# Or with uv
uv add temporal-pydanticai-codeact
# Or with poetry
poetry add temporal-pydanticai-codeactThen use it in your code:
from temporal.pydanticai.codeact.agents.simple_agent import SimpleAgent
from temporal.pydanticai.codeact.workers.sandbox_worker import CodeActWorkerRunner
from temporal.pydanticai.codeact.datamodels.agent_builder import AgentBuilder
# Your code here...# Clone and install for development
git clone https://github.com/scalabreseGD/temporal-pydanticai-codeact.git
cd temporal-pydanticai-codeact
# Install with uv (recommended)
uv sync
# Or install in editable mode with pip
pip install -e .
# Install with dev dependencies
pip install -e ".[dev]"Create app_conf.yml in the project root:
temporal:
url: localhost:7233
namespace: default
llm:
gemini:
api_key: YOUR_GEMINI_API_KEY
model_name: gemini-2.5-proCreate agent_prompts.yml:
simple_agent:
system_prompt: "You are a Python coding assistant with access to a sandboxed execution environment."
instructions: |
You have access to a Docker container (ID: {{ container_id }}) with these packages: {{ python_packages }}.
Use the execute_python tool to run code and solve the user's task.import asyncio
from temporal.pydanticai.codeact.activities.common import get_temporal_client
from temporal.pydanticai.codeact.utils.common_utils import load_config
from temporal.pydanticai.codeact.agents.simple_agent import SimpleAgent
from temporal.pydanticai.codeact.datamodels.agent_builder import AgentBuilder
from temporal.pydanticai.codeact.workflows.simple_agent_workflow import SimpleAgentWorkflow
from pydantic_ai.durable_exec.temporal import PydanticAIPlugin
async def main():
# Connect to Temporal
config = load_config()
client = await get_temporal_client(
config['temporal'],
plugins=[PydanticAIPlugin()]
)
# Execute workflow
result = await client.execute_workflow(
SimpleAgentWorkflow.run,
id='my-agent-task',
task_queue='sample_queue'
)
print(result)
asyncio.run(main())The project uses a three-layer architecture:
┌─────────────────────────────────────────────────────────────┐
│ Agent Layer │
│ BaseAgent → CodeActAgent → SimpleAgent │
│ (PydanticAI agents with tool definitions) │
└──────────────────────┬──────────────────────────────────────┘
│
┌──────────────────────▼──────────────────────────────────────┐
│ Workflow Layer │
│ CodeActAgentWorkflow + SimpleAgentWorkflow │
│ (Temporal workflows for orchestration) │
└──────────────────────┬──────────────────────────────────────┘
│
┌──────────────────────▼──────────────────────────────────────┐
│ Sandbox Layer │
│ PersistentContainerSandbox (core operations) │
│ DurablePersistentContainerSandbox (Temporal activities) │
│ StatelessPersistentSandbox (agent tool adapter) │
└──────────────────────┬──────────────────────────────────────┘
│
┌──────────────────────▼──────────────────────────────────────┐
│ Docker Container │
│ Python 3.11 + uv + persistent state storage │
└──────────────────────┬──────────────────────────────────────┘
│
┌──────────────────────▼──────────────────────────────────────┐
│ Docker Volume (Persistent Storage) │
│ workflow-{id} volume at /persistent-storage/ │
│ Stores Python state and files (survives crashes) │
└─────────────────────────────────────────────────────────────┘
BaseAgent- Abstract base class with model configuration, MCP toolsets, and Temporal wrappingCodeActAgent- Code execution agent with Docker sandbox tools (blacklists container lifecycle ops)SimpleAgent- Minimal concrete implementation for basic code execution tasks
CodeActAgentWorkflow- Mixin providing container lifecycle management (start/stop)SimpleAgentWorkflow- Complete example workflow: start container → run agent → cleanupSandboxWorkflow- Lightweight child workflow for individual sandbox operations
PersistentContainerSandbox- Core implementation with container management and state persistenceDurablePersistentContainerSandbox- Wraps all methods as Temporal activitiesStatelessPersistentSandbox- Converts activities into PydanticAI agent tools via child workflows
CodeActWorkerRunner- Flexible worker builder for running agents with custom workflows and activities- Supports multiple agents with AgentPlugin
- Automatically includes sandbox activities and utilities
- Extensible with custom workflows and activities
sandbox.py- All sandbox task types and argument models (15+ operations)codeact.py-CodeActAgentDepsfor runtime container contextagent_builder.py-AgentBuilderfor configuring agents with prompts and modelsprompts.py-AgentPromptsmodel for system prompts and instructions
code-act-pydanticai/
├── src/
│ └── temporal/
│ └── pydanticai/
│ └── codeact/ # Main library package
│ ├── __init__.py # Package exports
│ │
│ ├── agents/ # AI agent implementations
│ │ ├── base/
│ │ │ ├── base_agent.py # Abstract base agent
│ │ │ ├── code_act_agent.py # Code execution agent
│ │ │ └── default_settings.py # Temporal activity configs
│ │ └── simple_agent.py # Basic concrete agent
│ │
│ ├── workflows/ # Temporal workflow definitions
│ │ ├── base/
│ │ │ └── codeact_agent_workflow.py # Container lifecycle mixin
│ │ ├── simple_agent_workflow.py # Example agent workflow
│ │ └── sandbox_workflow.py # Sandbox operation workflow
│ │
│ ├── docker_sandbox/ # Docker execution sandbox
│ │ ├── container_sandbox.py # 3 sandbox implementations
│ │ └── sandbox/ # State management scripts
│ │ ├── init_state.py
│ │ ├── load_state.py
│ │ ├── save_state.py
│ │ ├── get_state.py
│ │ ├── list_variables.py
│ │ ├── read_variable.py
│ │ └── clear_state.py
│ │
│ ├── datamodels/ # Pydantic data models
│ │ ├── sandbox.py # Sandbox task models
│ │ ├── codeact.py # Agent dependencies
│ │ ├── agent_builder.py # Agent configuration
│ │ └── prompts.py # Prompt models
│ │
│ ├── activities/ # Temporal activity functions
│ │ └── common.py # Config loading, prompts, utilities
│ │
│ ├── workers/ # Temporal workers
│ │ └── sandbox_worker.py # CodeActWorkerRunner
│ │
│ └── api/ # FastAPI application
│ └── main.py
│
├── tests/ # Test suite (mirrors src structure)
│ └── temporal/
│ └── pydanticai/
│ └── codeact/
│ ├── test_activities_common.py
│ ├── test_datamodels_agent_builder.py
│ ├── test_datamodels_codeact.py
│ ├── test_datamodels_prompts.py
│ ├── test_datamodels_sandbox.py
│ └── test_sandbox_container.py
│
├── examples/ # Usage examples
│ ├── simple_agent_example.py
│ └── agent_with_sandbox_tools.py
│
├── docs/ # Documentation
│ ├── architecture.md
│ ├── getting-started.md
│ ├── api-reference.md
│ ├── sandbox-operations.md
│ ├── examples.md
│ └── sandbox_workflow_demo.md
│
├── app_conf.yml # Temporal and LLM configuration
├── agent_prompts.yml # Agent prompts and instructions
├── pyproject.toml # Project dependencies
├── CLAUDE.md # Claude Code instructions
└── README.md
# Install Temporal CLI
brew install temporal # macOS
# Start local dev server
temporal server start-devCreate a worker script (e.g., my_worker.py):
import asyncio
import os
from temporal.pydanticai.codeact.workers.sandbox_worker import CodeActWorkerRunner
from temporal.pydanticai.codeact.activities.common import get_temporal_client
from temporal.pydanticai.codeact.utils.common_utils import load_config, read_prompts
from temporal.pydanticai.codeact.agents.simple_agent import SimpleAgent
from temporal.pydanticai.codeact.datamodels.agent_builder import AgentBuilder
from temporal.pydanticai.codeact.workflows.simple_agent_workflow import SimpleAgentWorkflow
from pydantic_ai.durable_exec.temporal import PydanticAIPlugin
async def main():
# Load configuration
config = load_config()
prompts = read_prompts()
# Create temporal client
client = await get_temporal_client(
config['temporal'],
plugins=[PydanticAIPlugin()]
)
# Build agents
agent = await SimpleAgent.from_agent_confs(
agent_builder=AgentBuilder(
prompts=prompts.agent_prompts,
model_configs=config['llm']['gemini']
)
)
# Create and run worker
worker = await CodeActWorkerRunner.from_args(
temporal_client=client,
task_queue=os.getenv('TASK_QUEUE', 'sample_queue'),
agents=[agent],
workflows=[SimpleAgentWorkflow]
)
await worker.run()
if __name__ == '__main__':
asyncio.run(main())Run the worker:
python my_worker.pyOption A: Run demo script
cd src
python run_sandbox_workflow.pyOption B: Execute SimpleAgentWorkflow
# See Quick Start section above📚 Comprehensive documentation is available in the /docs folder:
- Architecture Guide - System design, components, and patterns
- Getting Started - Step-by-step tutorial for beginners
- API Reference - Complete API documentation for all modules
- Custom Functions Guide - Serializing and injecting reusable functions
- Sandbox Operations - Detailed reference for all sandbox operations
- Examples - Practical examples with explanations
- Testing Guide - Comprehensive testing documentation
- Sandbox Workflow Demo - Complete demo walkthrough
The examples/ directory contains practical usage examples:
simple_agent_example.py- Basic agent instrumentation patternsagent_with_sandbox_tools.py- Advanced tool configurationdata_analysis_agent.py- Custom functions with automatic dependency detection
pytestmypy src/ruff check .ruff check --fix .The sandbox maintains Python variable state across executions using pickle serialization. Variables are stored in Docker volumes, ensuring they survive container crashes, worker restarts, and even Docker daemon restarts.
from temporal.pydanticai.codeact.docker_sandbox.container_sandbox import PersistentContainerSandbox
from temporal.pydanticai.codeact.datamodels.sandbox import ExecutePythonArgs
sandbox = PersistentContainerSandbox()
# First execution
result = await sandbox.execute_python(
ExecutePythonArgs(
container_id=container_id,
code="x = 42\nprint(x)"
)
)
# Second execution - x is still available!
result = await sandbox.execute_python(
ExecutePythonArgs(
container_id=container_id,
code="print(x * 2)" # Outputs: 84
)
)For comprehensive documentation on persistence, volume management, multi-host deployments, and troubleshooting, see the Persistent Storage section.
Workflows orchestrate long-running agent tasks with automatic retries:
from temporalio import workflow
from temporal.pydanticai.codeact.workflows.base.codeact_agent_workflow import CodeActAgentWorkflow
from temporal.pydanticai.codeact.agents.simple_agent import SimpleAgent
from temporal.pydanticai.codeact.datamodels.codeact import CodeActAgentDeps
@workflow.defn
class SimpleAgentWorkflow(CodeActAgentWorkflow):
@workflow.run
async def run(self, user_task: str) -> str:
await self._start_sandbox_container(python_packages=['numpy'])
try:
agent = await SimpleAgent.from_agent_confs(builder)
result = await agent.run(
user_prompt=user_task,
deps=CodeActAgentDeps(container_id=self.container_id)
)
return result.output
finally:
await self._stop_sandbox_container() # Always cleanupAgents automatically receive sandbox operations as tools:
from temporal.pydanticai.codeact.docker_sandbox.container_sandbox import StatelessPersistentSandbox
# StatelessPersistentSandbox converts activities to agent tools
sandbox = StatelessPersistentSandbox()
tools = await sandbox.code_sandbox_tools(
blacklist=['start_container', 'stop_container'] # Exclude lifecycle ops
)
# Agent can now call: execute_python, execute_bash, read_file, etc.The library provides native support for Model Context Protocol (MCP) servers at two levels:
- Agent-Level Integration - MCP servers as tools available to CodeActAgent subclasses
- Sandbox-Level Integration - MCP tools callable from within sandboxed Python code execution
This enables powerful combinations like using time APIs, web scrapers, file systems, and other external tools seamlessly in agent workflows.
When you execute Python code with MCP servers, the sandbox:
- Starts MCP servers inside the container
- Extracts tools from each server
- Creates synchronous Python function wrappers for async MCP tools
- Injects these functions into the execution namespace
- Executes your code with all tools available as regular functions
- Handles event loop coordination automatically
from temporal.pydanticai.codeact.docker_sandbox.container_sandbox import PersistentContainerSandbox
from temporal.pydanticai.codeact.datamodels.sandbox import StartContainerArgs, ExecutePythonArgs
from pydantic_ai.mcp import MCPServerStdio
sandbox = PersistentContainerSandbox()
# Start container
container_id = await sandbox.start_container(StartContainerArgs())
# Create MCP server(s)
time_server = MCPServerStdio("uvx", ["mcp-server-time"])
# Execute code that calls MCP tools as regular functions!
code = '''
# MCP tools are available as regular Python functions
current_time = get_current_time(timezone="America/New_York")
print(f"New York time: {current_time}")
'''
result = await sandbox.execute_python(
ExecutePythonArgs(container_id=container_id, code=code),
mcp_servers=[time_server]
)You can use multiple MCP servers simultaneously:
from pydantic_ai.mcp import MCPServerStdio
# Create multiple servers
time_server = MCPServerStdio("uvx", ["mcp-server-time"])
fetch_server = MCPServerStdio("uvx", ["mcp-server-fetch"])
code = '''
# Tools from both servers available!
time = get_current_time(timezone="UTC")
content = fetch(url="https://example.com")
print(f"Fetched at {time}")
print(f"Content length: {len(content)}")
'''
result = await sandbox.execute_python(
ExecutePythonArgs(container_id=container_id, code=code),
mcp_servers=[time_server, fetch_server]
)The MCP integration uses a file-based execution approach:
┌────────────────────────────────────────────────────────────────┐
│ execute_python() Call │
│ (user code + mcp_servers=[...]) │
└──────────────────────┬─────────────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────────┐
│ 1. Serialize MCP server configs to JSON │
│ 2. Write user code to /tmp/user_code.py in container │
│ 3. Set environment variables: │
│ - USER_CODE_PATH=/tmp/user_code.py │
│ - MCP_SERVERS_JSON=[...] │
└──────────────────────┬─────────────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────────┐
│ Execute: /app/sandbox/execute_with_mcp.py │
│ │
│ 1. Read config from environment │
│ 2. Start all MCP servers │
│ 3. Create MCPSandboxExecutor with servers │
│ 4. Get namespace with tool wrappers │
│ 5. Execute user code with tools available │
│ 6. Clean up servers │
└──────────────────────┬─────────────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────────┐
│ MCPSandboxExecutor Class │
│ │
│ - Coordinates async MCP tools with sync exec() context │
│ - Creates synchronous wrappers using │
│ asyncio.run_coroutine_threadsafe() │
│ - Provides namespace dict with all tools as functions │
│ - Handles multiple servers and tool name conflicts │
└─────────────────────────────────────────────────────────────────┘
The sandbox works with any MCP server that supports stdio transport:
mcp-server-time- Time and timezone queriesmcp-server-fetch- Web content fetchingmcp-server-filesystem- File operationsmcp-server-git- Git operationsmcp-server-sqlite- SQLite database access- Custom servers - Any stdio-based MCP server
In Container (/app/sandbox/):
execute_with_mcp.py- Entry point script that reads config, starts servers, and executes codemcp_executor.py- MCPSandboxExecutor class for event loop coordination and tool wrapping
In Host:
container_sandbox.py-_serialize_mcp_servers()converts MCPServerStdio to JSONexecute_python()- File-based execution when mcp_servers provided
✅ Automatic Tool Discovery - All tools from all servers become available functions ✅ Event Loop Coordination - Async MCP tools work in sync exec() context ✅ Multiple Servers - Use any number of MCP servers simultaneously ✅ Clean Architecture - File-based approach avoids complex string generation ✅ Error Handling - Proper server lifecycle management with AsyncExitStack ✅ Name Conflict Detection - Prevents tool name collisions across servers
⚠️ Only stdio-based MCP servers supported (no HTTP/SSE yet)⚠️ Tool functions return strings (MCP response serialized)⚠️ Tools execute with 30-second timeout⚠️ Container needs network access foruvxto install MCP servers
Run the MCP integration test:
pytest tests/test_mcp_integration.py -vSee tests/test_mcp_integration.py and src/example/dupa_test.py for complete examples.
CodeActAgent subclasses can integrate MCP servers directly, making tools available both as:
- Function signatures included in agent instructions (for awareness)
- Callable tools within sandbox Python executions (for actual usage)
This enables agents to understand what external tools are available and use them seamlessly in generated code.
How It Works:
When building a CodeActAgent, the system automatically:
- Calls
_get_mcp_toolsets()to retrieve MCP server configurations - Serializes server configs for container execution
- Extracts tool schemas as Python function signatures via
extract_mcp_tools_as_functions - Injects function signatures into agent instructions template (via
{{ tools_as_func }}variable) - Passes serialized servers to sandbox instrumentation for runtime execution
Creating an Agent with MCP Tools:
from temporal.pydanticai.codeact.agents.base.code_act_agent import CodeActAgent
from pydantic_ai import WrapperToolset
from pydantic_ai.mcp import MCPServerStdio
class DataAnalysisAgent(CodeActAgent):
"""Agent with access to time and fetch tools."""
@staticmethod
async def _get_mcp_toolsets(**kwargs):
"""Define MCP servers available to this agent."""
return {
'time': WrapperToolset(MCPServerStdio("uvx", ["mcp-server-time"])),
'fetch': WrapperToolset(MCPServerStdio("uvx", ["mcp-server-fetch"]))
}Agent Prompt Template:
# agent_prompts.yml
data_analysis_agent:
system_prompt: "You are a data analysis assistant with code execution capabilities."
instructions: |
You have access to a Docker sandbox (container: {{ container_id }}).
Installed packages: {{ python_packages }}
Current variables: {{ sandbox_variable_names }}
Files available: {{ sandbox_files }}
{% if tools_as_func %}
## External Tools Available
The following external tools are available as Python functions in your sandbox:
{% for func in tools_as_func %}
```python
{{ func }}
```
{% endfor %}
Use these tools by calling them as regular Python functions in your execute_python code.
{% endif %}
Solve the user's task using the available tools and packages.Usage Example:
from temporal.pydanticai.codeact.datamodels.agent_builder import AgentBuilder
from temporal.pydanticai.codeact.datamodels.codeact import CodeActAgentDeps
# Build agent with MCP integration
agent = await DataAnalysisAgent.from_agent_confs(
agent_builder=AgentBuilder(
prompts=prompts,
model_configs=model_configs
)
)
# Run agent - it can now use time and fetch tools in its code!
result = await agent.run(
user_prompt="Fetch the homepage of example.com and report the current time",
deps=CodeActAgentDeps(container_id=container_id)
)What the Agent Sees:
The agent's instructions will include the MCP tool signatures:
def get_current_time(timezone: str | None = None) -> Any:
"""Get the current time in a specific timezone."""
def fetch(url: str, max_length: int | None = None) -> Any:
"""Fetches a URL from the internet and extracts its contents as markdown."""The agent can then generate code like:
# Agent-generated code
time = get_current_time(timezone="UTC")
content = fetch(url="https://example.com", max_length=5000)
print(f"Fetched at {time}")
print(f"Content preview: {content[:200]}")Key Benefits:
✅ Tool Awareness - Agent knows what external tools are available and their signatures ✅ Seamless Integration - Tools work like regular Python functions in sandbox code ✅ Type Safety - Function signatures extracted from MCP JSON schemas ✅ Automatic Management - No manual tool registration or wrapper code needed ✅ Multiple Servers - Support for any number of MCP servers per agent
Architecture Flow:
┌─────────────────────────────────────────────────────────┐
│ CodeActAgent Subclass │
│ └─ _get_mcp_toolsets() → {name: MCPServerStdio} │
└──────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ _build_agent() │
│ 1. Serialize MCP servers │
│ 2. Extract tool signatures (Temporal activity) │
│ 3. Pass signatures to instruction renderer │
│ 4. Pass serialized servers to sandbox instrumentation │
└──────────────────┬──────────────────────────────────────┘
│
├──────────────────┬──────────────────┐
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌──────────────┐
│ Instructions │ │ Sandbox Tools │ │ MCP Servers │
│ (with tool │ │ (execute_*, │ │ (serialized │
│ signatures) │ │ read_file, │ │ for │
│ │ │ etc.) │ │ container) │
└─────────────────┘ └─────────────────┘ └──────────────┘
│
▼
┌───────────────────┐
│ execute_python │
│ + mcp_servers │
│ → Tools callable │
└───────────────────┘
Supported MCP Server Types:
- MCPServerStdio - Standard input/output transport (most common)
- MCPServerSSE - Server-Sent Events transport
- MCPServerStreamableHTTP - HTTP streaming transport
Example with Multiple Agents:
class TimeAwareAgent(CodeActAgent):
@staticmethod
async def _get_mcp_toolsets(**kwargs):
return {
'time': WrapperToolset(MCPServerStdio("uvx", ["mcp-server-time"]))
}
class WebScrapingAgent(CodeActAgent):
@staticmethod
async def _get_mcp_toolsets(**kwargs):
return {
'fetch': WrapperToolset(MCPServerStdio("uvx", ["mcp-server-fetch"])),
'filesystem': WrapperToolset(MCPServerStdio("uvx", ["mcp-server-filesystem"]))
}
# Each agent has access to only its defined toolsetsCustom functions allow you to define reusable helper functions in your Python codebase that are automatically serialized, analyzed for dependencies, and injected into the sandbox execution environment.
⭐ Automatic Sync Wrappers - Async functions are automatically wrapped so they can be called synchronously (no await needed!)
📚 Full Docstrings in Signatures - Agents see complete function documentation for better understanding
📦 Dependency Detection - Python's AST module automatically detects required packages
🔧 Auto-Installation - Dependencies are automatically installed in Docker containers
♻️ Reusable - Define once, use across multiple agents and workflows
from temporal.pydanticai.codeact.agents.base.code_act_agent import CodeActAgent
class DataAnalysisAgent(CodeActAgent):
agent_name = 'data_analysis_agent'
@staticmethod
async def _get_custom_functions(**kwargs) -> list:
"""Define custom functions for this agent."""
async def analyze_data(data_json: str) -> dict:
"""Analyze data using pandas."""
import pandas as pd
import numpy as np
df = pd.read_json(data_json)
return {
'mean': df.mean().to_dict(),
'std': df.std().to_dict()
}
def format_output(data: dict) -> str:
"""Format dictionary as markdown."""
return "\\n".join(f"- **{k}**: {v}" for k, v in data.items())
return [analyze_data, format_output]Agents can then use these functions in their generated code without using await:
# Agent generates code like this:
data = '[{"a": 1, "b": 2}, {"a": 3, "b": 4}]'
analysis = analyze_data(data) # NO await needed! Async functions are wrapped
output = format_output(analysis)
print(output)- vs. MCP Tools: Custom functions are ideal for business logic and reusable utilities that you want to version control alongside your code
- vs. Inline Code: Provides reusability, better testing, automatic dependency management, and clear documentation
- Simplicity: Async functions work without
await- the sync wrapper handles all event loop management
For comprehensive documentation, see Custom Functions Guide.
The sandbox provides automatic persistent storage using Docker volumes, ensuring that Python variables and files survive container crashes, worker restarts, and even Docker daemon restarts.
When you start a container with a workflow_id, the system automatically:
- Creates a Docker volume named
workflow-{workflow_id}(or reuses existing one) - Mounts it at
/persistent-storage/inside the container - Saves Python state to
/persistent-storage/{workflow_id}/state/globals.pkl - Restores state automatically when the workflow restarts with the same ID
Persistence is enabled by default:
from temporal.pydanticai.codeact.docker_sandbox.container_sandbox import PersistentContainerSandbox
from temporal.pydanticai.codeact.datamodels.sandbox import StartContainerArgs, ExecutePythonArgs
# Persistence enabled by default
sandbox = PersistentContainerSandbox()
# Start container with workflow_id
container_id = await sandbox.start_container(
StartContainerArgs(container_name="data-pipeline-123")
)
# Execute code - variables are saved automatically
await sandbox.execute_python(ExecutePythonArgs(
container_id=container_id,
code="results = {'accuracy': 0.95, 'loss': 0.03}"
))
# If worker crashes here and restarts with same ID...
# State is automatically recovered!
await sandbox.execute_python(ExecutePythonArgs(
container_id=container_id,
code="print(results)" # Still works!
))Via Constructor:
# Enable/disable persistence
sandbox = PersistentContainerSandbox(enable_persistence=True)
# Use NFS for multi-host deployments
sandbox = PersistentContainerSandbox(
volume_driver='nfs',
volume_driver_opts={
'type': 'nfs',
'o': 'addr=nfs-server.company.com,rw',
'device': ':/exports/workflows'
}
)
# Disable persistence for ephemeral workflows
sandbox = PersistentContainerSandbox(enable_persistence=False)Via Environment Variables:
# .env
ENABLE_PERSISTENCE=true
VOLUME_DRIVER=local # or 'nfs'
NFS_SERVER=nfs-server.company.com
NFS_PATH=/exports/workflowsList all workflow volumes:
volumes = await sandbox.list_workflow_volumes()
for vol in volumes:
print(f"Workflow: {vol['workflow_id']}, Created: {vol['created']}")Cleanup completed workflows:
# When workflow completes and you don't need the data anymore
await sandbox.cleanup_workflow_volume("data-pipeline-123")Manual cleanup (via Docker CLI):
# List workflow volumes
docker volume ls | grep workflow-
# Inspect specific volume
docker volume inspect workflow-data-pipeline-123
# Remove specific volume
docker volume rm workflow-data-pipeline-123
# Remove all workflow volumes (careful!)
docker volume rm $(docker volume ls -q | grep "^workflow-")Inside containers:
- State:
/persistent-storage/{workflow_id}/state/globals.pkl - Output:
/persistent-storage/{workflow_id}/output/
On host:
- Local: Docker-managed (
/var/lib/docker/volumes/workflow-{id}) - NFS: On NFS server at configured path
For production deployments across multiple hosts:
sandbox = PersistentContainerSandbox(
volume_driver='nfs',
volume_driver_opts={
'type': 'nfs',
'o': 'addr=nfs-server.company.com,rw',
'device': ':/exports/workflows'
}
)
# Volumes now accessible from any host in the cluster!✅ Do:
- Use meaningful workflow IDs (
data-pipeline-2024-01-15-batch-001) - Clean up completed workflows with
cleanup_workflow_volume() - Monitor volume usage with
list_workflow_volumes() - Use NFS for multi-host production deployments
❌ Don't:
- Reuse workflow IDs (each workflow should have unique ID)
- Delete volumes manually (use
cleanup_workflow_volume()) - Disable persistence in production unless workflow is truly ephemeral
State not persisting?
# Check if persistence is enabled
print(f"Persistence: {sandbox.enable_persistence}")# Check if volume was created
docker volume ls | grep workflow-{your-workflow-id}
# Check container has volume mounted
docker inspect {container-id} | grep Mounts -A 10Volume already exists? This is normal! The sandbox reuses existing volumes. For a fresh start:
await sandbox.cleanup_workflow_volume("your-workflow-id")
container_id = await sandbox.start_container(...)| Feature | Docker Volumes | SeaweedFS/Other Distributed FS |
|---|---|---|
| Setup | ✅ None (built-in) | ❌ Complex (4+ containers) |
| Complexity | ✅ Simple | ❌ High |
| Single-host | ✅ Yes | ✅ Yes |
| Multi-host | ➕ With NFS | ✅ Native |
| Performance | ✅ Local disk | |
| Maintenance | ✅ Low |
Recommendation: Start with Docker volumes. Upgrade to NFS if you need multi-host. Only consider distributed file systems like SeaweedFS if you need advanced features.
TASK_QUEUE- Temporal task queue name (default:sample_queue)APP_CONFIG_PATH- Path to configuration fileAPP_PROMPTS_PATH- Path to agent prompts fileGEMINI_API_KEY- Google Gemini API key (or in app_conf.yml)ENABLE_PERSISTENCE- Enable/disable persistent storage (default:true)VOLUME_DRIVER- Volume driver for persistence (localornfs, default:local)NFS_SERVER- NFS server address (when using NFS driver)NFS_PATH- NFS export path (when using NFS driver)
Comprehensive test suite using pytest with async support:
# Run all tests
pytest
# Run unit tests only (no Docker/Temporal required)
pytest -m unit
# Run with coverage
pytest --cov=src --cov-report=html
# Run specific test file
pytest tests/test_datamodels_sandbox.py- Unit Tests (
-m unit) - Fast tests with mocked dependencies - Integration Tests (
-m integration) - Require Docker and/or Temporal - Docker Tests (
-m docker) - Require Docker daemon - Temporal Tests (
-m temporal) - Require Temporal server
See Testing Guide for comprehensive testing documentation.
The library is configured to package only the temporal namespace module. To build distribution packages:
# Build the package using uv (recommended for this project)
uv build
# Or using standard build tools
pip install build twine
python -m build
# This creates:
# - dist/temporal_pydanticai_codeact-0.1.0-py3-none-any.whl (wheel - this is what gets installed)
# - dist/temporal-pydanticai-codeact-0.1.0.tar.gz (source distribution)Verify the build:
# Check what's in the wheel (what users will install)
unzip -l dist/temporal_pydanticai_codeact-0.1.0-py3-none-any.whl | grep temporal
# You should see only:
# temporal/__init__.py
# temporal/pydanticai/__init__.py
# temporal/pydanticai/codeact/...The build process packages only the src/temporal/ directory, which contains:
temporal/__init__.py(namespace package)temporal/pydanticai/__init__.py(namespace package)temporal/pydanticai/codeact/(the actual library code)
This means users installing the package get:
site-packages/
└── temporal/
└── pydanticai/
└── codeact/
├── __init__.py
├── activities/
├── agents/
├── datamodels/
├── docker_sandbox/
├── workflows/
└── workers/
# Check the built package
twine check dist/*
# Upload to Test PyPI first (recommended)
twine upload --repository testpypi dist/*
# Test installation from TestPyPI
pip install --index-url https://test.pypi.org/simple/ temporal-pydanticai-codeact
# If everything works, publish to PyPI
twine upload dist/*For private use or internal projects:
# Configure your private registry
pip config set global.index-url https://your-registry.com/simple/
# Upload to private registry
twine upload --repository-url https://your-registry.com/legacy/ dist/*After publishing, users can install and use the library:
Example Project Structure:
my-agent-project/
├── pyproject.toml
├── requirements.txt
└── main.py
requirements.txt:
temporal-pydanticai-codeact>=0.1.0main.py:
import asyncio
import os
from temporal.pydanticai.codeact.workers.sandbox_worker import CodeActWorkerRunner
from temporal.pydanticai.codeact.activities.common import (
get_temporal_client
)
from temporal.pydanticai.codeact.utils.common_utils import load_config, read_prompts
from temporal.pydanticai.codeact.agents.simple_agent import SimpleAgent
from temporal.pydanticai.codeact.datamodels.agent_builder import AgentBuilder
from temporal.pydanticai.codeact.workflows.simple_agent_workflow import SimpleAgentWorkflow
from pydantic_ai.durable_exec.temporal import PydanticAIPlugin
async def main():
# Load configuration
config = load_config() # Looks for app_conf.yml in current directory
prompts = read_prompts() # Looks for agent_prompts.yml
# Connect to Temporal
client = await get_temporal_client(
config['temporal'],
plugins=[PydanticAIPlugin()]
)
# Build agent
agent = await SimpleAgent.from_agent_confs(
agent_builder=AgentBuilder(
prompts=prompts.agent_prompts,
model_configs=config['llm']['gemini']
)
)
# Create and run worker
worker = await CodeActWorkerRunner.from_args(
temporal_client=client,
task_queue=os.getenv('TASK_QUEUE', 'my-queue'),
agents=[agent],
workflows=[SimpleAgentWorkflow]
)
print("Worker started. Press Ctrl+C to stop.")
await worker.run()
if __name__ == '__main__':
asyncio.run(main())Install and run:
# Install dependencies (includes temporal-pydanticai-codeact)
pip install -r requirements.txt
# Run your agent
python main.pyContributions are welcome! Please ensure:
- Code follows existing patterns and style
- All tests pass (
pytest) - Unit tests pass without external services (
pytest -m unit) - Type checking passes (
mypy src/) - Linting passes (
ruff check .) - New features include tests, documentation, and examples
MIT
Built with:
- PydanticAI
- Temporal
- Docker
- uv package manager