This document defines the architecture, design principles, and responsibility boundaries for udspy. Use this as the authoritative reference when deciding where to place new code or how to extend the library.
- Core Philosophy
- Layered Architecture
- Core Abstractions
- Module System
- LM Abstraction (Language Model Layer)
- Signatures
- Tools
- History
- Streaming
- Confirmation & Suspend/Resume
- Callbacks
- How to Extend
- Design Patterns
- Decision Tree
udspy is a minimal, async-first framework for building LLM applications with clear abstractions and separation of concerns.
-
Simplicity Over Completeness
- Provide core primitives, not every possible feature
- Make common cases easy, complex cases possible
-
Async-First
- All core operations are async
- Sync wrappers (
forward(),__call__()) useasyncio.run()internally - Natural support for streaming and concurrent operations
-
Clear Responsibility Boundaries
- Each layer has ONE well-defined purpose
- Minimal coupling between layers
- Easy to test and modify independently
-
Type Safety
- Pydantic models for runtime validation
- Type hints throughout
- Fail fast with clear errors
-
Native Tool Calling
- Use OpenAI's native function calling API
- No custom prompt hacking for structured outputs
- Leverages provider optimizations
udspy is organized into clear layers with well-defined responsibilities:
┌─────────────────────────────────────────────────────────┐
│ User Application │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Module Layer │
│ (Predict, ChainOfThought, ReAct, Custom Modules) │
│ - Business logic and orchestration │
│ - Compose other modules │
│ - Handle tool execution loops │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ LM Layer (Provider Abstraction) │
│ - Abstract interface to LLM providers │
│ - Currently: OpenAI via settings.lm │
│ - Extensible: Anthropic, local models, etc. │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Adapter Layer │
│ - Format signatures → messages │
│ - Parse LLM outputs → structured data │
│ - Convert tools → provider schemas │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Supporting Infrastructure │
│ - History: Conversation state │
│ - Tools: Function calling │
│ - Streaming: Event queue and chunks │
│ - Confirmation: Human-in-the-loop │
│ - Callbacks: Telemetry and monitoring │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Settings │
│ - Global and context-specific configuration │
│ - Thread-safe via ContextVar │
└─────────────────────────────────────────────────────────┘
What it does: Orchestrates LLM calls and business logic
Responsibilities:
- Implements business logic (Predict, ChainOfThought, ReAct)
- Composes other modules
- Manages execution flow (tool loops, retry logic)
- Emits streaming events
- Returns final Prediction results
Key Files:
base.py- Module base class with aexecute/aforward/astreampredict.py- Core LLM prediction with tool callingchain_of_thought.py- Reasoning wrapperreact.py- Agent with tool iteration
DO: Business logic, orchestration, composition DON'T: Direct LLM API calls (use LM layer), message formatting (use Adapter)
What it does: Provides abstract interface to LLM providers
Current State:
- Direct usage of
settings.aclient(AsyncOpenAI) - No abstraction yet - coupled to OpenAI
Future Design:
class LM(ABC):
"""Abstract language model interface."""
@abstractmethod
async def acomplete(
self,
messages: list[dict],
tools: list[dict] | None = None,
stream: bool = False,
**kwargs
) -> AsyncGenerator[ChatCompletion, None] | ChatCompletion:
"""Complete a prompt with optional tools."""
pass
class OpenAILM(LM):
"""OpenAI implementation."""
def __init__(self, client: AsyncOpenAI, model: str):
self.client = client
self.model = model
async def acomplete(self, messages, tools=None, stream=False, **kwargs):
return await self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=tools,
stream=stream,
**kwargs
)Responsibilities:
- Normalize provider-specific APIs
- Handle retries and rate limiting
- Abstract away provider differences
- Provide unified interface for modules
Key Files (when implemented):
lm.py- Base LM class and interfacelm/openai.py- OpenAI implementationlm/anthropic.py- Anthropic implementation (future)
DO: Provider API calls, retry logic, rate limiting DON'T: Message formatting (use Adapter), business logic (use Module)
What it does: Translates between udspy concepts and provider formats
Responsibilities:
- Format Signature → system prompts
- Format inputs → user messages
- Parse LLM outputs → structured Prediction
- Convert Tool → OpenAI tool schema
- Type coercion and validation
Key Methods:
format_instructions(signature)- Signature → system messageformat_inputs(signature, inputs)- Inputs → user messageparse_outputs(signature, completion)- Completion → structured dicttools_to_openai_format(tools)- Tools → OpenAI schemas
DO: Format translation, schema conversion, parsing DON'T: LLM API calls (use LM), orchestration (use Module)
History (src/udspy/history.py)
- Stores conversation messages
- Simple list wrapper with convenience methods
- No LLM coupling - pure data structure
Tools (src/udspy/tool.py)
- Wraps functions as tools
- Extracts schemas from type hints
- Handles async/sync execution
- Integrates with confirmation system
Streaming (src/udspy/streaming.py)
- Event queue via ContextVar
- StreamEvent base class
- emit_event() for custom events
- Prediction as final event
Confirmation (src/udspy/confirmation.py)
- ConfirmationRequired exception
- ResumeState for continuation
- Context-based approval tracking
- @confirm_first decorator
Callbacks (src/udspy/callback.py)
- BaseCallback interface
- @with_callbacks decorator
- Telemetry and monitoring hooks
- Compatible with Opik, MLflow, etc.
What: A composable unit that encapsulates LLM operations
Interface:
class Module:
async def aexecute(self, *, stream: bool = False, **inputs) -> Prediction:
"""Core execution - implements business logic."""
async def aforward(self, **inputs) -> Prediction:
"""Non-streaming execution."""
return await self.aexecute(stream=False, **inputs)
async def astream(self, **inputs) -> AsyncGenerator[StreamEvent]:
"""Streaming execution - sets up queue and yields events."""
def forward(self, **inputs) -> Prediction:
"""Sync wrapper."""
return asyncio.run(self.aforward(**inputs))
def __call__(self, **inputs) -> Prediction:
"""Sync convenience."""
return self.forward(**inputs)Key Insight: aexecute() is the single source of truth. Both aforward() and astream() call it with different parameters.
What: Defines input/output contract for an LLM task using Pydantic
Purpose:
- Specify expected inputs and outputs
- Provide descriptions for prompt construction
- Enable runtime validation
- Generate tool schemas
Example:
class QA(Signature):
"""Answer questions concisely."""
question: str = InputField(description="User's question")
answer: str = OutputField(description="Concise answer")Where Used:
- Modules use signatures to define their I/O contract
- Adapter formats signatures into prompts
- Validation ensures type safety
What: Result of a module execution (dict-like with attribute access)
pred = Prediction(answer="Paris", reasoning="France's capital")
print(pred.answer) # "Paris"
print(pred["answer"]) # "Paris"
print(pred.is_final) # True if no pending tool callsKey Properties:
native_tool_calls- Pending tool calls (if any)is_final- True if execution is complete- Inherits from
dictandStreamEvent
What: Wrapper for a callable function that can be invoked by LLM
Creation:
@tool(name="search", description="Search the web")
def search(query: str = Field(description="Search query")) -> str:
return search_web(query)
# Or manually
search_tool = Tool(
func=search_fn,
name="search",
description="Search the web",
require_confirmation=True
)Schema Generation:
- Extracts type hints from function signature
- Uses Pydantic Field for parameter descriptions
- Converts to OpenAI function calling format
What: Conversation message storage
Usage:
history = History()
# Automatically managed by Predict
result = predictor(question="What is Python?", history=history)
# History now contains: [system, user, assistant]
result = predictor(question="What are its features?", history=history)
# LLM has context from previous turnStorage Format: OpenAI message format ({"role": "...", "content": "..."})
Core Concept: Every module has ONE implementation in aexecute(), which powers BOTH streaming and non-streaming interfaces.
class MyModule(Module):
async def aexecute(self, *, stream: bool = False, **inputs) -> Prediction:
"""Single source of truth for execution logic."""
# 1. Do the work (call LLM, process data, etc.)
result = await self.do_work(inputs, stream=stream)
# 2. Optionally emit streaming events
if self.should_emit_events():
emit_event(OutputStreamChunk(...))
# 3. Always return final Prediction
return Prediction(answer=result)Event Queue:
astream()sets up anasyncio.Queuevia ContextVar- Modules emit events using
emit_event(event) - Queue is automatically available to nested modules
Flow:
User calls module.astream()
↓
astream() creates queue, sets in ContextVar
↓
astream() calls aexecute(stream=True)
↓
aexecute() does work, emits events via emit_event()
↓
astream() yields events from queue
↓
Final Prediction is yielded
Example:
async for event in predictor.astream(question="What is AI?"):
if isinstance(event, OutputStreamChunk):
print(event.delta, end="", flush=True) # Real-time output
elif isinstance(event, Prediction):
result = event # Final resultSimple:
aforward()callsaexecute(stream=False)- No queue is set up
- Events are not emitted (or silently ignored)
- Only final Prediction is returned
result = await predictor.aforward(question="What is AI?")
print(result.answer) # Just the final answerModules can contain other modules:
class Pipeline(Module):
def __init__(self):
self.step1 = Predict(Signature1)
self.step2 = ChainOfThought(Signature2)
async def aexecute(self, *, stream: bool = False, **inputs):
# Get result from first module (don't stream intermediate steps)
result1 = await self.step1.aforward(**inputs)
# Stream final module if requested
result2 = await self.step2.aforward(
input=result1.output,
stream=stream # Pass down stream parameter
)
return Prediction(final=result2.answer)Key Pattern: Nested modules automatically emit to the active queue if one exists.
The LM (Language Model) abstraction provides a provider-agnostic interface for interacting with LLMs. This allows udspy to work with different providers (OpenAI, Anthropic, local models, etc.) through a common interface.
Location: src/udspy/lm/
Key Files:
lm/base.py- Abstract LM interfacelm/openai.py- OpenAI implementationlm/__init__.py- Public API exports
from abc import ABC, abstractmethod
class LM(ABC):
"""Abstract language model interface."""
@abstractmethod
async def acomplete(
self,
messages: list[dict[str, Any]],
*,
model: str,
tools: list[dict[str, Any]] | None = None,
stream: bool = False,
**kwargs: Any,
) -> Any | AsyncGenerator[Any, None]:
"""Generate completion from the language model.
Args:
messages: List of messages in OpenAI format
model: Model identifier (e.g., "gpt-4o")
tools: Optional list of tool schemas in OpenAI format
stream: If True, return an async generator of chunks
**kwargs: Provider-specific parameters
Returns:
If stream=False: Completion response object
If stream=True: AsyncGenerator yielding chunks
"""
passDesign Decisions:
- Single method interface - simple and focused
- OpenAI message format as standard (widely adopted)
- Generic return types to support any provider
- Provider implementations handle format conversion internally
from openai import AsyncOpenAI
from udspy.lm import OpenAILM
# Create instance
client = AsyncOpenAI(api_key="sk-...")
lm = OpenAILM(client, default_model="gpt-4o")
# Use directly
response = await lm.acomplete(
messages=[{"role": "user", "content": "Hello"}],
temperature=0.7
)Features:
- Wraps AsyncOpenAI client
- Supports default model (optional, can override per call)
- Passes through all OpenAI parameters
- Handles both streaming and non-streaming
The LM abstraction integrates with udspy's settings system:
import udspy
from udspy import LM
# Configure from environment variables (creates OpenAILM automatically)
# Set: UDSPY_LM_MODEL=gpt-4o, UDSPY_LM_API_KEY=sk-...
udspy.settings.configure()
# Or provide explicit LM instance
lm = LM(model="gpt-4o", api_key="sk-...")
udspy.settings.configure(lm=lm)
# Access the configured LM
lm = udspy.settings.lm # Returns OpenAILM instanceBackward Compatibility: settings.aclient still works but is deprecated. Use settings.lm for new code.
LM instances can be overridden per-context:
from udspy import LM
# Global settings
global_lm = LM(model="gpt-4o-mini", api_key="global-key")
udspy.settings.configure(lm=global_lm)
# Temporary override
custom_lm = LM(model="gpt-4", api_key="custom-key")
with udspy.settings.context(lm=custom_lm):
result = predictor(question="...") # Uses custom_lm
# Back to global LM
result = predictor(question="...") # Uses global LMPriority:
- Explicit
lmparameter (highest) aclientparameter (creates OpenAILM wrapper)api_keyparameter (creates new client + LM)- Global settings (fallback)
The Predict module accesses LMs via settings.lm:
# Non-streaming
response = await settings.lm.acomplete(
messages=messages,
model=model or settings.default_model,
tools=tool_schemas,
stream=False,
**kwargs
)
# Streaming
stream = await settings.lm.acomplete(
messages=messages,
model=model or settings.default_model,
tools=tool_schemas,
stream=True,
**kwargs
)This centralizes all LLM calls and makes provider swapping trivial.
To add a new provider, implement the LM interface:
from udspy.lm import LM
class AnthropicLM(LM):
"""Anthropic Claude implementation."""
def __init__(self, api_key: str, default_model: str | None = None):
from anthropic import AsyncAnthropic
self.client = AsyncAnthropic(api_key=api_key)
self.default_model = default_model
async def acomplete(
self,
messages: list[dict[str, Any]],
*,
model: str | None = None,
tools: list[dict[str, Any]] | None = None,
stream: bool = False,
**kwargs: Any,
) -> Any | AsyncGenerator[Any, None]:
actual_model = model or self.default_model
if not actual_model:
raise ValueError("No model specified")
# Convert OpenAI format to Anthropic format
anthropic_messages = self._convert_messages(messages)
anthropic_tools = self._convert_tools(tools) if tools else None
# Call Anthropic API
return await self.client.messages.create(
model=actual_model,
messages=anthropic_messages,
tools=anthropic_tools,
stream=stream,
**kwargs
)
def _convert_messages(self, messages):
"""Convert OpenAI → Anthropic format."""
# Implementation details...Usage:
from my_providers import AnthropicLM
lm = AnthropicLM(api_key="sk-ant-...", default_model="claude-3-5-sonnet")
udspy.settings.configure(lm=lm)
# All udspy features work with your custom provider!LM implementations should accept/return OpenAI message format:
[
{"role": "system", "content": "You are helpful."},
{"role": "user", "content": "Hello!"},
{"role": "assistant", "content": "Hi!"},
]Why OpenAI format?
- Industry standard
- Simple and flexible
- Easy to convert to other formats
- Well-documented
Custom providers convert internally.
LM Layer Owns:
- Making API calls to providers
- Handling streaming vs non-streaming responses
- Provider-specific parameter passing
- Format conversion (provider ↔ OpenAI format)
LM Layer Does NOT Own:
- Prompt formatting (Adapter Layer)
- Output parsing (Adapter Layer)
- Tool execution (Module Layer)
- Retry/error handling (Module Layer)
- Orchestration logic (Module Layer)
See LM Abstraction for comprehensive documentation including:
- Detailed API reference
- Custom provider implementation guide
- Context manager examples
- Type handling
- Best practices
Signatures define the contract between user inputs and LLM outputs.
class QA(Signature):
"""Answer questions concisely.""" # ← Instructions (docstring)
question: str = InputField(description="User's question") # ← Input
answer: str = OutputField(description="Concise answer") # ← Output- Definition: User defines Signature with InputField/OutputField
- Validation: SignatureMeta validates all fields are marked
- Formatting: Adapter converts to system prompt:
Answer questions concisely. Inputs: - question (str): User's question Outputs: - answer (str): Concise answer - Parsing: Adapter parses LLM output into structured dict
- Return: Module returns Prediction with typed attributes
# Get inputs
QA.get_input_fields() # {"question": FieldInfo(...)}
# Get outputs
QA.get_output_fields() # {"answer": FieldInfo(...)}
# Get instructions
QA.get_instructions() # "Answer questions concisely."# String format (all fields are str)
QA = Signature.from_string("question -> answer", "Answer questions")
# Programmatic (custom types)
QA = make_signature(
input_fields={"question": str},
output_fields={"answer": str},
instructions="Answer questions"
)Module Layer: Modules accept signatures to define I/O Adapter Layer: Formats signatures into prompts NOT in LM Layer: LM layer only sees formatted messages
1. Definition (by user)
↓
2. Schema Extraction (Tool.__init__)
↓
3. Schema Conversion (Adapter.tools_to_openai_format)
↓
4. LLM Call (Module → LM)
↓
5. LLM Returns Tool Calls
↓
6. Tool Execution (Module calls Tool.acall)
↓
7. Result Formatting (back to messages)
↓
8. Loop until final answer
from pydantic import Field
from udspy import tool
@tool(name="calculator", description="Perform arithmetic")
def calculator(
operation: str = Field(description="add, subtract, multiply, divide"),
a: float = Field(description="First number"),
b: float = Field(description="Second number"),
) -> float:
"""Perform arithmetic operations."""
ops = {"add": a + b, "subtract": a - b, ...}
return ops[operation]Tool class extracts schema from function signature:
# Automatic schema generation
tool.parameters →
{
"type": "object",
"properties": {
"operation": {"type": "string", "description": "add, subtract, ..."},
"a": {"type": "number", "description": "First number"},
"b": {"type": "number", "description": "Second number"}
},
"required": ["operation", "a", "b"]
}Adapter converts Tool → OpenAI schema:
# Tool provides the parameters schema
from udspy.adapter import ChatAdapter
adapter = ChatAdapter()
adapter.format_tool_schema(tool) →
{
"type": "function",
"function": {
"name": "calculator",
"description": "Perform arithmetic",
"parameters": tool.parameters # Tool provides this
}
}# In Predict module:
async def _execute_tool_calls(self, tool_calls, tools):
results = []
for tc in tool_calls:
tool = self._find_tool(tc.function.name, tools)
result = await tool.acall(**tc.arguments)
results.append({"role": "tool", "tool_call_id": tc.id, "content": result})
return results@tool(require_confirmation=True)
def delete_file(path: str) -> str:
os.remove(path)
return f"Deleted {path}"
# Raises ConfirmationRequired when called
# Module catches, saves state, waits for user approval
# Resumes execution after approvalStore conversation context for multi-turn interactions.
Simple wrapper around list[dict[str, Any]] with convenience methods:
class History:
messages: list[dict[str, Any]]
def add_user_message(self, content: str)
def add_assistant_message(self, content: str, tool_calls: list | None)
def add_tool_result(self, tool_call_id: str, content: str)
def add_system_message(self, content: str) # Appends to end
def set_system_message(self, content: str) # Always at position 0 (recommended)Note: Use set_system_message() instead of add_system_message() to ensure the system prompt is always at position 0. When using Predict, the system prompt is automatically managed.
history = History()
# First turn
result = predictor(question="What is Python?", history=history)
# history.messages = [
# {"role": "system", "content": "...instructions..."},
# {"role": "user", "content": "question: What is Python?"},
# {"role": "assistant", "content": "answer: A programming language"}
# ]
# Second turn (context preserved)
result = predictor(question="What are its features?", history=history)
# LLM sees full conversation historyPredict automatically manages the system prompt in history:
def _build_initial_messages(self, signature, inputs, history):
# Always set system message at position 0 (replaces if exists)
history.set_system_message(
self.adapter.format_instructions(signature)
)
# Add current user input
history.add_user_message(
self.adapter.format_inputs(signature, inputs)
)Key behaviors:
- System prompt is always at position 0 - Managed automatically from signature
- User message added at the end - Current input appended to history
- After generation - Assistant response added to history
- Tool calls recorded - Tool interactions preserved in history
This means you can pre-populate history with only user/assistant messages, and the system prompt will be automatically managed.
- Multi-turn conversations: Chatbots, assistants
- Context-dependent tasks: "It" and "that" references
- Iterative refinement: Follow-up questions
- Stateless tasks: One-off questions
- Independent requests: No cross-request context needed
Event Queue: ContextVar-based queue for thread-safe event emission
_stream_queue: ContextVar[asyncio.Queue | None] = ContextVar("_stream_queue", default=None)
async def emit_event(event: StreamEvent):
"""Emit event to active stream (if any)."""
queue = _stream_queue.get()
if queue is not None:
await queue.put(event)Base Class:
class StreamEvent:
"""Base for all events."""
passBuilt-in Events:
OutputStreamChunk- LLM output for a fieldThoughtStreamChunk- Reasoning/thought outputPrediction- Final result
Custom Events:
from dataclasses import dataclass
@dataclass
class ToolProgress(StreamEvent):
tool_name: str
progress: float
message: str
# Emit from anywhere
emit_event(ToolProgress("search", 0.5, "Searching..."))1. User calls module.astream()
↓
2. astream() creates Queue, sets in _stream_queue ContextVar
↓
3. astream() spawns task: aexecute(stream=True)
↓
4. aexecute() does work, calls emit_event() for chunks
↓
5. emit_event() puts events in queue
↓
6. astream() yields events from queue
↓
7. Final Prediction is yielded
@dataclass
class OutputStreamChunk(StreamEvent):
module: Module # Which module emitted this
field_name: str # Which output field
delta: str # New content since last chunk
content: str # Full accumulated content so far
is_complete: bool # Is this field done?async for event in predictor.astream(question="Explain AI"):
if isinstance(event, OutputStreamChunk):
if event.field_name == "answer":
print(event.delta, end="", flush=True)
elif isinstance(event, Prediction):
result = event
print(f"\n\nFinal: {result.answer}")Events from nested modules bubble up automatically:
class Pipeline(Module):
async def aexecute(self, *, stream: bool = False, **inputs):
# Nested module emits to same queue
async for event in self.predictor.aexecute(stream=stream, **inputs):
# Events automatically go to active queue
if isinstance(event, Prediction):
return eventEnable human-in-the-loop patterns where execution pauses for user input.
1. Tool/Module raises ConfirmationRequired
↓
2. Exception propagates to user code
↓
3. User sees question, responds
↓
4. User creates ResumeState(exception, response)
↓
5. User calls module.aforward(resume_state=resume_state)
↓
6. Module resumes execution with user response
↓
7. Execution completes
class ConfirmationRequired(Exception):
question: str # What to ask user
confirmation_id: str # Unique ID
tool_call: ToolCall | None # If raised by tool
context: dict[str, Any] # Module-specific state
class ConfirmationRejected(Exception):
message: str # Why rejected
confirmation_id: str # Which confirmation
tool_call: ToolCall | None # If raised by toolclass ResumeState:
exception: ConfirmationRequired # Original exception
user_response: str # User's answer ("yes", "no", JSON, etc.)from udspy import ResumeState
resume_state = None
while True:
try:
result = agent(
question="Delete all files",
resume_state=resume_state
)
break # Success
except ConfirmationRequired as e:
print(f"\n{e.question}")
user_input = input("Approve? (yes/no): ")
resume_state = ResumeState(e, user_input)async for event in agent.astream(question="Delete files"):
if isinstance(event, Prediction):
if not event.is_final:
# Has pending tool calls requiring confirmation
for tc in event.native_tool_calls:
# Show confirmation UI
approved = await ask_user(tc)
# Resume with response
resume_state = ResumeState(exception, "yes" if approved else "no")
# Continue streaming
async for event2 in agent.astream(resume_state=resume_state):
yield event2Modules that support suspend/resume must implement:
class MyModule(Module):
async def aexecute(self, *, stream: bool = False, resume_state=None, **inputs):
# Check for resume
if resume_state:
return await self.aresume(
user_response=resume_state.user_response,
saved_state=resume_state.exception.context
)
# Normal execution
try:
result = await self.do_work()
return Prediction(**result)
except ConfirmationRequired as e:
# Save state in exception context
e.context["saved_data"] = self.state
raise # Let user handle
async def aresume(self, user_response: str, saved_state: dict):
# Restore state
self.state = saved_state["saved_data"]
# Process user response
if user_response == "yes":
# Continue
return await self.do_work()
else:
# Abort
raise ConfirmationRejected("User rejected")from udspy import tool, confirm_first
@tool(require_confirmation=True)
def delete_file(path: str) -> str:
os.remove(path)
return f"Deleted {path}"
# Or manually
@confirm_first
def delete_file(path: str) -> str:
os.remove(path)
return f"Deleted {path}"Provide hooks for telemetry, monitoring, and observability.
class BaseCallback:
def on_module_start(self, call_id: str, instance: Module, inputs: dict):
"""Called when module execution starts."""
pass
def on_module_end(self, call_id: str, outputs: Any, exception: Exception | None):
"""Called when module execution ends."""
pass
def on_lm_start(self, call_id: str, instance: Any, inputs: dict):
"""Called when LLM call starts."""
pass
def on_lm_end(self, call_id: str, outputs: Any, exception: Exception | None):
"""Called when LLM call ends."""
pass
def on_tool_start(self, call_id: str, instance: Tool, inputs: dict):
"""Called when tool execution starts."""
pass
def on_tool_end(self, call_id: str, outputs: Any, exception: Exception | None):
"""Called when tool execution ends."""
passclass LoggingCallback(BaseCallback):
def on_lm_start(self, call_id, instance, inputs):
print(f"[{call_id}] LLM called")
print(f" Model: {inputs.get('model')}")
print(f" Messages: {len(inputs.get('messages', []))}")
def on_lm_end(self, call_id, outputs, exception):
if exception:
print(f"[{call_id}] LLM failed: {exception}")
else:
print(f"[{call_id}] LLM completed")
# Configure globally
udspy.settings.configure(callbacks=[LoggingCallback()])
# Or per-context
with udspy.settings.context(callbacks=[LoggingCallback()]):
result = predictor(question="...")Callbacks are triggered by @with_callbacks decorator:
@with_callbacks
async def aexecute(self, *, stream: bool = False, **inputs):
# Callbacks automatically called before/after
passCompatible with:
- Opik: MLOps platform for LLM applications
- MLflow: ML experiment tracking
- Custom: Any monitoring system
- Subclass Module
- Implement aexecute()
- Emit events (if streaming)
- Return Prediction
from udspy import Module, Prediction, OutputStreamChunk, emit_event
class MyModule(Module):
def __init__(self, signature):
self.signature = signature
self.predictor = Predict(signature)
async def aexecute(self, *, stream: bool = False, **inputs):
# Custom pre-processing
processed = self.preprocess(inputs)
# Call nested module
result = await self.predictor.aforward(**processed)
# Custom post-processing
final = self.postprocess(result)
# Anything listening to stream gets this chunk
emit_event(OutputStreamChunk(
module=self,
field_name="answer",
delta=final["answer"],
content=final["answer"],
is_complete=True
))
# Return final prediction
return Prediction(**final)- Create src/udspy/lm/ package
- Define LM base class
- Implement provider-specific class
- Update settings to use LM
# src/udspy/lm/base.py
from abc import ABC, abstractmethod
class LM(ABC):
@abstractmethod
async def acomplete(self, messages, *, tools=None, stream=False, **kwargs):
pass
# src/udspy/lm/anthropic.py
class AnthropicLM(LM):
def __init__(self, client, model):
self.client = client
self.model = model
async def acomplete(self, messages, *, tools=None, stream=False, **kwargs):
# Convert formats
# Call Anthropic API
# Convert response
passfrom dataclasses import dataclass
from udspy.streaming import StreamEvent, emit_event
@dataclass
class CustomEvent(StreamEvent):
message: str
progress: float
# Emit from anywhere
async def my_function():
emit_event(CustomEvent("Processing...", 0.5))from udspy import Tool
from pydantic import Field
@tool(name="custom_tool")
async def custom_tool(param: str = Field(...)) -> str:
"""Custom tool with async logic."""
result = await async_operation(param)
return resultPattern: All core logic is async, sync is a wrapper
async def aforward(self, **inputs) -> Prediction:
"""Async implementation."""
return await self.do_async_work(inputs)
def forward(self, **inputs) -> Prediction:
"""Sync wrapper."""
ensure_sync_context("forward")
return asyncio.run(self.aforward(**inputs))Why: Async is more flexible, can't go async→sync→async
Pattern: One aexecute() implementation for both streaming and non-streaming
async def aexecute(self, *, stream: bool = False, **inputs):
# Check if should stream
emit_event(chunk)
# Always return final result
return Prediction(...)Why: DRY, easier to maintain, composable
Pattern: Thread-safe, async-safe event queue
_stream_queue: ContextVar[Queue | None] = ContextVar("_stream_queue", default=None)
async def astream(self, **inputs):
queue = asyncio.Queue()
token = _stream_queue.set(queue)
try:
# Execute and yield from queue
async for event in self._yield_from_queue(queue):
yield event
finally:
_stream_queue.reset(token)Why: Works across async tasks, no global state
Pattern: Thread-safe configuration overrides
from udspy import LM
# Global
global_lm = LM(model="gpt-4o-mini", api_key="sk-...")
udspy.settings.configure(lm=global_lm)
# Context-specific
gpt4_lm = LM(model="gpt-4", api_key="sk-...")
with udspy.settings.context(lm=gpt4_lm):
result = predictor(...) # Uses gpt-4
# Back to global
result = predictor(...) # Uses gpt-4o-miniWhy: Multi-tenant safe, no parameter drilling
Pattern: Use exceptions for suspend/resume
try:
result = agent(question="...")
except ConfirmationRequired as e:
response = input(e.question)
result = agent(resume_state=ResumeState(e, response))Why: Clean interrupt, preserves state, composable
Is it about LLM provider API calls?
├─ YES → LM Layer (future: src/udspy/lm/)
└─ NO
├─ Is it about message formatting or parsing?
│ └─ YES → Adapter Layer (src/udspy/adapter.py)
└─ NO
├─ Is it about business logic or orchestration?
│ └─ YES → Module Layer (src/udspy/module/)
└─ NO
├─ Is it about conversation storage?
│ └─ YES → History (src/udspy/history.py)
├─ Is it about tool definition or execution?
│ └─ YES → Tool (src/udspy/tool.py)
├─ Is it about streaming events?
│ └─ YES → Streaming (src/udspy/streaming.py)
├─ Is it about human-in-the-loop?
│ └─ YES → Confirmation (src/udspy/confirmation.py)
├─ Is it about telemetry?
│ └─ YES → Callbacks (src/udspy/callback.py)
└─ Is it a utility used everywhere?
└─ YES → Utils (src/udspy/utils.py)
Does it encapsulate LLM call logic?
├─ NO → Not a module (maybe a helper/utility)
└─ YES
├─ Is it a variant of Predict?
│ ├─ YES → Probably wrapper (like ChainOfThought)
│ └─ NO → Custom module
└─ Does it need custom orchestration?
├─ YES → Create new module (like ReAct)
└─ NO → Compose existing modules
Does it talk to an LLM provider API?
├─ NO → Not LM layer
└─ YES
├─ Is it provider-specific (OpenAI, Anthropic)?
│ └─ YES → LM implementation (src/udspy/lm/openai.py)
└─ Is it provider-agnostic (retry, rate limiting)?
└─ YES → LM base (src/udspy/lm/base.py)
| Layer | Responsibilities | Key Files |
|---|---|---|
| Module | Business logic, orchestration, composition | module/predict.py, module/react.py |
| LM | Provider API calls, retries, format conversion | (Future) lm/openai.py, lm/anthropic.py |
| Adapter | Message formatting, output parsing, schema conversion | adapter.py |
| Signature | I/O contracts, validation | signature.py |
| Tool | Function wrapping, schema extraction, execution | tool.py |
| History | Conversation storage | history.py |
| Streaming | Event queue, chunks, emission | streaming.py |
| Confirmation | Suspend/resume, human-in-the-loop | confirmation.py |
| Callbacks | Telemetry, monitoring hooks | callback.py |
| Settings | Configuration, client management | settings.py |
- Async-first - All core operations are async
- Single execution path -
aexecute()powers everything - Event queue - ContextVar for streaming
- Exception-based flow - ConfirmationRequired for suspend/resume
- Context-based config - Thread-safe overrides
For implementation details, see: