Complete reference for all public APIs in Cap'n Web Python.
Configuration for the RPC client.
@dataclass(frozen=True)
class ClientConfig:
url: str # Server URL (http:// or ws://)
timeout: float = 30.0 # Request timeout in seconds
max_retries: int = 3 # Max retry attempts
retry_delay: float = 1.0 # Delay between retriesExample:
config = ClientConfig(
url="http://localhost:8080/rpc/batch",
timeout=10.0,
max_retries=5
)Main client class for making RPC calls.
class Client(RpcSession):
def __init__(self, config: ClientConfig)Make a single RPC call.
Parameters:
capability_id: ID of the capability to call (usually 0 for root)method: Name of the method to callargs: List of arguments to pass
Returns: The method's return value
Raises: RpcError on failure
Example:
async with Client(config) as client:
result = await client.call(0, "add", [5, 3])
print(result) # 8Close the client and clean up resources.
Example:
client = Client(config)
try:
result = await client.call(0, "method", [])
finally:
await client.close()async with Client(config) as client:
# Client automatically closed on exit
result = await client.call(0, "method", [])Configuration for the RPC server.
@dataclass(frozen=True)
class ServerConfig:
host: str = "127.0.0.1" # Listen address
port: int = 8080 # Listen port
max_batch_size: int = 100 # Max messages per batch
include_stack_traces: bool = False # Include stack traces (dev only!)
resume_token_ttl: float = 3600.0 # Token TTL in secondsSecurity Note: Never set include_stack_traces=True in production!
Example:
config = ServerConfig(
host="0.0.0.0", # Listen on all interfaces
port=8080,
max_batch_size=200
)Main server class for hosting RPC services.
class Server(RpcSession):
def __init__(self, config: ServerConfig)Register a local object as a capability.
Parameters:
export_id: ID to assign (usually 0 for root capability)target: Object implementingRpcTargetprotocol
Example:
server = Server(config)
server.register_capability(0, MyService())Start the server.
Example:
server = Server(config)
server.register_capability(0, service)
await server.start()Stop the server gracefully.
async with Server(config) as server:
server.register_capability(0, service)
# Server automatically started and stopped
await asyncio.Event().wait()Protocol for objects that can be exposed as RPC capabilities.
class RpcTarget(Protocol):
async def call(self, method: str, args: list) -> Any:
"""Handle method calls."""
async def get_property(self, property: str) -> Any:
"""Handle property access."""class Calculator(RpcTarget):
async def call(self, method: str, args: list) -> int:
match method:
case "add":
return args[0] + args[1]
case "multiply":
return args[0] * args[1]
case _:
raise RpcError.not_found(f"Method {method} not found")
async def get_property(self, property: str):
match property:
case "name":
return "Calculator"
case _:
raise RpcError.not_found(f"Property {property} not found")If your target needs cleanup, implement a dispose() method:
class DatabaseService(RpcTarget):
def __init__(self, connection):
self.conn = connection
async def call(self, method: str, args: list):
# ... implementation
async def get_property(self, property: str):
# ... implementation
def dispose(self):
"""Called when capability is released."""
self.conn.close()Represents a remote capability (returned from RPC calls).
class RpcStub:
def __getattr__(self, name: str) -> RpcStub:
"""Access properties."""
def __call__(self, *args) -> RpcPromise:
"""Call as a function."""Example:
user = await client.call(0, "getUser", ["alice"])
name = await user.name # Property access
greeting = await user.greet() # Method callRelease the capability and clean up resources.
Example:
stub = await client.call(0, "getResource", [])
try:
result = await stub.process()
finally:
stub.dispose()Represents a future value from an RPC call.
class RpcPromise:
def __await__(self):
"""Make the promise awaitable."""
def __getattr__(self, name: str) -> RpcPromise:
"""Access properties on the future value."""Example:
# Create promise (doesn't block)
promise = stub.getUser("alice")
# Chain operations (still doesn't block)
name_promise = promise.name
# Await to get final value (blocks)
name = await name_promiseStandard error codes.
class ErrorCode(Enum):
BAD_REQUEST = "bad_request" # Invalid request
NOT_FOUND = "not_found" # Resource not found
CAP_REVOKED = "cap_revoked" # Capability revoked
PERMISSION_DENIED = "permission_denied" # Access denied
CANCELED = "canceled" # Operation canceled
INTERNAL = "internal" # Internal server errorStructured RPC error with error code and optional data.
class RpcError(Exception):
code: ErrorCode
message: str
data: Any | None# Create errors using factory methods
raise RpcError.bad_request("Invalid input")
raise RpcError.not_found("User not found")
raise RpcError.permission_denied("Access denied")
raise RpcError.internal("Database connection failed")
# With custom data
raise RpcError.bad_request(
"Validation failed",
data={"field": "email", "reason": "invalid format"}
)try:
result = await client.call(0, "method", [args])
except RpcError as e:
match e.code:
case ErrorCode.NOT_FOUND:
print(f"Not found: {e.message}")
case ErrorCode.PERMISSION_DENIED:
print(f"Access denied: {e.message}")
case _:
print(f"Error {e.code}: {e.message}")
if e.data:
print(f"Additional data: {e.data}")Batch multiple RPC calls into a single network round-trip.
class PipelineBatch:
def __init__(self, client: Client, capability_id: int)Queue a method call (doesn't execute yet).
Parameters:
method: Method name to callargs: Arguments to pass
Returns: A PipelinePromise that can be awaited later
Example:
batch = PipelineBatch(client, capability_id=0)
call1 = batch.call("method1", [arg1])
call2 = batch.call("method2", [arg2])Execute all queued calls in a single network request.
Example:
batch = PipelineBatch(client, capability_id=0)
# Queue calls
promise1 = batch.call("add", [10, 20])
promise2 = batch.call("multiply", [5, 6])
# Execute batch (single network request)
await batch.execute()
# Get results
result1 = await promise1 # 30
result2 = await promise2 # 30Promise returned by PipelineBatch.call().
class PipelinePromise:
def __await__(self):
"""Await to get the final result."""
def __getattr__(self, name: str) -> PipelinePromise:
"""Chain property access."""Example:
batch = PipelineBatch(client, 0)
# Queue call and chain property access
user_promise = batch.call("getUser", ["alice"])
name_promise = user_promise.name # Chained access
# Execute
await batch.execute()
# Get result
name = await name_promise # "alice"HTTP-based batch transport (default).
Features:
- Simple request/response
- Automatic batching
- Works with any HTTP server
URL format: http://host:port/path or https://host:port/path
Example:
config = ClientConfig(url="http://localhost:8080/rpc/batch")WebSocket-based transport for persistent connections.
Features:
- Persistent connection
- Lower latency for multiple calls
- Server can push updates
URL format: ws://host:port/path or wss://host:port/path
Example:
config = ClientConfig(url="ws://localhost:8080/rpc/ws")Wraps data with ownership semantics (internal use).
class RpcPayload:
@classmethod
def from_app_params(cls, value: Any) -> RpcPayload:
"""Create from application parameters (will be copied)."""
@classmethod
def from_app_return(cls, value: Any) -> RpcPayload:
"""Create from application return value (ownership transferred)."""
@classmethod
def owned(cls, value: Any) -> RpcPayload:
"""Create from already-owned data (already copied)."""Note: Usually you don't need to work with RpcPayload directly - the framework handles it.
All public APIs are fully type-hinted. For best results, use a type checker:
# With pyrefly (recommended)
pyrefly check
# With mypy
mypy src/Example with type hints:
from capnweb.client import Client, ClientConfig
from capnweb.error import RpcError
async def get_user(client: Client, user_id: str) -> dict[str, Any]:
"""Fetch user data from the server."""
try:
user: dict[str, Any] = await client.call(0, "getUser", [user_id])
return user
except RpcError as e:
print(f"Error fetching user: {e}")
raise- Quickstart Guide - Get started quickly
- Architecture Guide - Understand the internals
- Advanced Topics - Resume tokens, bidirectional RPC