diff --git a/content/integrations/frameworks/meta.json b/content/integrations/frameworks/meta.json index d4480b0d9..bdca0b984 100644 --- a/content/integrations/frameworks/meta.json +++ b/content/integrations/frameworks/meta.json @@ -28,6 +28,7 @@ "pydantic-ai", "quarkus-langchain4j", "ragas", + "restate", "semantic-kernel", "smolagents", "spring-ai", diff --git a/content/integrations/frameworks/restate.mdx b/content/integrations/frameworks/restate.mdx new file mode 100644 index 000000000..da15ca614 --- /dev/null +++ b/content/integrations/frameworks/restate.mdx @@ -0,0 +1,151 @@ +--- +title: Trace Restate Workflows with Langfuse +sidebarTitle: Restate +logo: /images/integrations/restate_icon.png +description: Learn how to use Langfuse to monitor and evaluate resilient Restate agentic workflows via OpenTelemetry +category: Integrations +--- + +# Trace Restate Workflows with Langfuse + +This guide shows how to integrate Langfuse into your [Restate](https://restate.dev/) agentic workflows for full observability — LLM calls, tool invocations, and durable workflow steps — all in a single unified trace. + +> **What is Restate?** [Restate](https://restate.dev/) is a durable execution platform that makes agents and workflows resumable and resilient. Every non-deterministic action (LLM calls, tool API calls, MCP calls) is persisted in a durable journal. On failure, Restate replays the journal and resumes where it left off — with automatic retries, recovery, and idempotent execution. + +> **What is Langfuse?** [Langfuse](https://langfuse.com/) is an open-source observability platform for AI agents. It helps you monitor LLM calls, tool usage, cost, latency, and run automated evaluations. + +## Versioning + +Restate's [versioning model](https://docs.restate.dev/operate/versioning) ensures that new deployments route new requests to the latest version, while ongoing executions continue on the version they started with. This means each Langfuse trace is linked to a single immutable artifact — one code version, one prompt version, one execution history — making it straightforward to compare quality across versions and spot regressions. + +## 1. Install Dependencies + +```bash +pip install restate-sdk[serde] openai-agents langfuse openinference-instrumentation-openai-agents hypercorn +``` + +## 2. Configure Environment + +Set up your API keys. You can get Langfuse keys from [Langfuse Cloud](https://langfuse.com/cloud) or by [self-hosting Langfuse](https://langfuse.com/self-hosting). + +```bash filename=".env" +LANGFUSE_PUBLIC_KEY=pk-lf-... +LANGFUSE_SECRET_KEY=sk-lf-... +LANGFUSE_HOST=https://cloud.langfuse.com +OPENAI_API_KEY=sk-proj-... +``` + +## 3. Define the Agent + +Use [Restate's OpenAI Agents SDK integration](https://docs.restate.dev/ai/sdk-integrations/openai-agents-sdk) to make agent steps durable. `DurableRunner` persists each LLM call in Restate's journal, so failed executions resume where they left off instead of restarting from scratch. + +```python filename="agent.py" +import restate +from agents import Agent +from restate.ext.openai import restate_context, DurableRunner, durable_function_tool + +# Durable tool — executed exactly once, even across retries +@durable_function_tool +async def check_fraud_database(customer_id: str) -> dict[str, str]: + """Check the claim against the fraud database.""" + return await restate_context().run_typed( + "Query fraud DB", query_fraud_db, claim_id=customer_id + ) + +# LLM agents +parse_agent = Agent( + name="DocumentParser", + instructions="Extract the customer ID, claim amount, currency, category, and description.", + output_type=ClaimData, +) + +analysis_agent = Agent( + name="ClaimsAnalyst", + instructions="Assess whether this claim is valid and provide detailed reasoning.", + output_type=ClaimAssessment, + tools=[check_fraud_database], +) + +# Main orchestrator +claim_service = restate.Service("InsuranceClaimAgent") + +@claim_service.handler() +async def run(ctx: restate.Context, req: ClaimDocument) -> str: + # Step 1: Parse the claim document (LLM step) + parsed = await DurableRunner.run(parse_agent, req.text) + claim: ClaimData = parsed.final_output + + # Step 2: Analyze the claim (LLM step) + response = await DurableRunner.run(analysis_agent, claim.model_dump_json()) + assessment: ClaimAssessment = response.final_output + + if not assessment.valid: + return "Claim rejected" + + # Step 3: Convert currency (regular durable step, no LLM) + converted = await ctx.run_typed( + "Convert currency", convert_currency, amount=claim.amount + ) + + # Step 4: Process reimbursement (regular durable step, no LLM) + await ctx.run_typed("Reimburse", reimburse, amount=converted) + + return "Claim reimbursed" +``` + +## 4. Enable Langfuse Tracing + +Initialize the Langfuse client and set up the tracing processor. This connects the OpenAI Agents SDK spans to Restate's execution traces, so everything appears as a single unified trace in Langfuse. + +```python filename="__main__.py" +import hypercorn +import asyncio +import restate + +from langfuse import get_client +from opentelemetry import trace as trace_api +from openinference.instrumentation import OITracer, TraceConfig +from agents import set_trace_processors + +from utils.tracing import RestateTracingProcessor +from agent import claim_service + +# Initialize Langfuse (sets up the global OTel tracer provider + exporter) +langfuse = get_client() +tracer = OITracer( + trace_api.get_tracer("openinference.openai_agents"), config=TraceConfig() +) +set_trace_processors([RestateTracingProcessor(tracer)]) + +if __name__ == "__main__": + app = restate.app(services=[claim_service]) + + conf = hypercorn.Config() + conf.bind = ["0.0.0.0:9080"] + asyncio.run(hypercorn.asyncio.serve(app, conf)) +``` + +The `RestateTracingProcessor` (available in [Restate's example repo](https://github.com/restatedev/ai-examples/blob/main/openai-agents/examples/langfuse/utils/tracing.py)) flattens the OpenAI Agents SDK spans under Restate's parent span, so the trace hierarchy in Langfuse mirrors the actual execution flow. + +Restate also exports its own execution traces (workflow steps, retries, recovery) as OpenTelemetry spans. By pointing Restate's tracing endpoint at Langfuse, both agentic and workflow spans appear in the same trace. + +## 5. View Traces in Langfuse + +After running the workflow, the trace in Langfuse shows both the agentic steps and the workflow steps. For LLM calls, you can inspect inputs, prompts, model configuration, and outputs. + +## Prompt Management with Restate + +You can use [Langfuse Prompt Management](/docs/prompt-management/overview) with Restate. Each prompt fetch becomes a durable step — retries reuse the same prompt, while new executions pick up updated versions. + +```python +from langfuse import get_client + +langfuse = get_client() + +def fetch_prompt() -> str: + prompt = langfuse.get_prompt("claim-agent", type="text") + return prompt.compile() + +# Durably journaled — same prompt is used on retries +prompt = await ctx.run_typed("Fetch prompt", fetch_prompt) +``` diff --git a/public/images/integrations/restate_icon.png b/public/images/integrations/restate_icon.png new file mode 100644 index 000000000..13e06c19a Binary files /dev/null and b/public/images/integrations/restate_icon.png differ