-
Notifications
You must be signed in to change notification settings - Fork 206
docs: add Restate integration guide #2733
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,151 @@ | ||
| --- | ||
| title: Trace Restate Workflows with Langfuse | ||
| sidebarTitle: Restate | ||
| logo: /images/integrations/restate_icon.png | ||
| description: Learn how to use Langfuse to monitor and evaluate resilient Restate agentic workflows via OpenTelemetry | ||
| category: Integrations | ||
| --- | ||
|
|
||
| # Trace Restate Workflows with Langfuse | ||
|
|
||
| This guide shows how to integrate Langfuse into your [Restate](https://restate.dev/) agentic workflows for full observability — LLM calls, tool invocations, and durable workflow steps — all in a single unified trace. | ||
|
|
||
| > **What is Restate?** [Restate](https://restate.dev/) is a durable execution platform that makes agents and workflows resumable and resilient. Every non-deterministic action (LLM calls, tool API calls, MCP calls) is persisted in a durable journal. On failure, Restate replays the journal and resumes where it left off — with automatic retries, recovery, and idempotent execution. | ||
|
|
||
| > **What is Langfuse?** [Langfuse](https://langfuse.com/) is an open-source observability platform for AI agents. It helps you monitor LLM calls, tool usage, cost, latency, and run automated evaluations. | ||
|
|
||
| ## Versioning | ||
|
|
||
| Restate's [versioning model](https://docs.restate.dev/operate/versioning) ensures that new deployments route new requests to the latest version, while ongoing executions continue on the version they started with. This means each Langfuse trace is linked to a single immutable artifact — one code version, one prompt version, one execution history — making it straightforward to compare quality across versions and spot regressions. | ||
|
|
||
| ## 1. Install Dependencies | ||
|
|
||
| ```bash | ||
| pip install restate-sdk[serde] openai-agents langfuse openinference-instrumentation-openai-agents hypercorn | ||
| ``` | ||
|
|
||
| ## 2. Configure Environment | ||
|
|
||
| Set up your API keys. You can get Langfuse keys from [Langfuse Cloud](https://langfuse.com/cloud) or by [self-hosting Langfuse](https://langfuse.com/self-hosting). | ||
|
|
||
| ```bash filename=".env" | ||
| LANGFUSE_PUBLIC_KEY=pk-lf-... | ||
| LANGFUSE_SECRET_KEY=sk-lf-... | ||
| LANGFUSE_HOST=https://cloud.langfuse.com | ||
| OPENAI_API_KEY=sk-proj-... | ||
| ``` | ||
|
|
||
| ## 3. Define the Agent | ||
|
|
||
| Use [Restate's OpenAI Agents SDK integration](https://docs.restate.dev/ai/sdk-integrations/openai-agents-sdk) to make agent steps durable. `DurableRunner` persists each LLM call in Restate's journal, so failed executions resume where they left off instead of restarting from scratch. | ||
|
|
||
| ```python filename="agent.py" | ||
| import restate | ||
| from agents import Agent | ||
| from restate.ext.openai import restate_context, DurableRunner, durable_function_tool | ||
|
|
||
| # Durable tool — executed exactly once, even across retries | ||
| @durable_function_tool | ||
| async def check_fraud_database(customer_id: str) -> dict[str, str]: | ||
| """Check the claim against the fraud database.""" | ||
| return await restate_context().run_typed( | ||
| "Query fraud DB", query_fraud_db, claim_id=customer_id | ||
| ) | ||
|
|
||
| # LLM agents | ||
| parse_agent = Agent( | ||
| name="DocumentParser", | ||
| instructions="Extract the customer ID, claim amount, currency, category, and description.", | ||
| output_type=ClaimData, | ||
| ) | ||
|
|
||
| analysis_agent = Agent( | ||
| name="ClaimsAnalyst", | ||
| instructions="Assess whether this claim is valid and provide detailed reasoning.", | ||
| output_type=ClaimAssessment, | ||
| tools=[check_fraud_database], | ||
| ) | ||
|
|
||
| # Main orchestrator | ||
| claim_service = restate.Service("InsuranceClaimAgent") | ||
|
|
||
| @claim_service.handler() | ||
| async def run(ctx: restate.Context, req: ClaimDocument) -> str: | ||
| # Step 1: Parse the claim document (LLM step) | ||
| parsed = await DurableRunner.run(parse_agent, req.text) | ||
| claim: ClaimData = parsed.final_output | ||
|
|
||
| # Step 2: Analyze the claim (LLM step) | ||
| response = await DurableRunner.run(analysis_agent, claim.model_dump_json()) | ||
| assessment: ClaimAssessment = response.final_output | ||
|
|
||
| if not assessment.valid: | ||
| return "Claim rejected" | ||
|
|
||
| # Step 3: Convert currency (regular durable step, no LLM) | ||
| converted = await ctx.run_typed( | ||
| "Convert currency", convert_currency, amount=claim.amount | ||
| ) | ||
|
|
||
| # Step 4: Process reimbursement (regular durable step, no LLM) | ||
| await ctx.run_typed("Reimburse", reimburse, amount=converted) | ||
|
|
||
| return "Claim reimbursed" | ||
| ``` | ||
|
|
||
| ## 4. Enable Langfuse Tracing | ||
|
|
||
| Initialize the Langfuse client and set up the tracing processor. This connects the OpenAI Agents SDK spans to Restate's execution traces, so everything appears as a single unified trace in Langfuse. | ||
|
|
||
| ```python filename="__main__.py" | ||
| import hypercorn | ||
| import asyncio | ||
| import restate | ||
|
|
||
| from langfuse import get_client | ||
| from opentelemetry import trace as trace_api | ||
| from openinference.instrumentation import OITracer, TraceConfig | ||
| from agents import set_trace_processors | ||
|
|
||
| from utils.tracing import RestateTracingProcessor | ||
| from agent import claim_service | ||
|
|
||
| # Initialize Langfuse (sets up the global OTel tracer provider + exporter) | ||
| langfuse = get_client() | ||
| tracer = OITracer( | ||
| trace_api.get_tracer("openinference.openai_agents"), config=TraceConfig() | ||
| ) | ||
| set_trace_processors([RestateTracingProcessor(tracer)]) | ||
|
|
||
| if __name__ == "__main__": | ||
| app = restate.app(services=[claim_service]) | ||
|
|
||
| conf = hypercorn.Config() | ||
| conf.bind = ["0.0.0.0:9080"] | ||
| asyncio.run(hypercorn.asyncio.serve(app, conf)) | ||
| ``` | ||
|
Comment on lines
+100
to
+126
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 The Extended reasoning...What the bug is and how it manifests In Python, importing a top-level package does not automatically import its subpackages or submodules. The The specific code path that triggers it The problematic pattern in the example: import hypercorn # Only loads hypercorn/__init__.py
...
asyncio.run(hypercorn.asyncio.serve(app, conf)) # AttributeError hereAt runtime, Python evaluates Why existing code does not prevent it The only guard would be if Impact Any developer copying this code snippet verbatim will encounter a runtime AttributeError when starting their server. This is the final step of the integration guide, meaning the code will appear to run (all imports succeed) but crash at the How to fix it Either add an explicit submodule import: import hypercorn
import hypercorn.asyncioOr, more idiomatically (matching hypercorn's own documentation): from hypercorn.asyncio import serve
...
asyncio.run(serve(app, conf))Step-by-step proof
|
||
|
|
||
| The `RestateTracingProcessor` (available in [Restate's example repo](https://github.com/restatedev/ai-examples/blob/main/openai-agents/examples/langfuse/utils/tracing.py)) flattens the OpenAI Agents SDK spans under Restate's parent span, so the trace hierarchy in Langfuse mirrors the actual execution flow. | ||
|
|
||
| Restate also exports its own execution traces (workflow steps, retries, recovery) as OpenTelemetry spans. By pointing Restate's tracing endpoint at Langfuse, both agentic and workflow spans appear in the same trace. | ||
|
|
||
| ## 5. View Traces in Langfuse | ||
|
|
||
| After running the workflow, the trace in Langfuse shows both the agentic steps and the workflow steps. For LLM calls, you can inspect inputs, prompts, model configuration, and outputs. | ||
|
|
||
| ## Prompt Management with Restate | ||
|
|
||
| You can use [Langfuse Prompt Management](/docs/prompt-management/overview) with Restate. Each prompt fetch becomes a durable step — retries reuse the same prompt, while new executions pick up updated versions. | ||
|
|
||
| ```python | ||
| from langfuse import get_client | ||
|
|
||
| langfuse = get_client() | ||
|
|
||
| def fetch_prompt() -> str: | ||
| prompt = langfuse.get_prompt("claim-agent", type="text") | ||
| return prompt.compile() | ||
|
|
||
| # Durably journaled — same prompt is used on retries | ||
| prompt = await ctx.run_typed("Fetch prompt", fetch_prompt) | ||
| ``` | ||
|
Comment on lines
+140
to
+151
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 The Prompt Management code snippet (lines 140-151) calls Extended reasoning...What the bug is and how it manifests The Prompt Management section ends with a standalone code snippet that uses A second independent error exists on the same line: The specific code path that triggers it The offending snippet is the final code block in the file (lines 140-151): from langfuse import get_client
langfuse = get_client()
def fetch_prompt() -> str:
prompt = langfuse.get_prompt("claim-agent", type="text")
return prompt.compile()
# Durably journaled — same prompt is used on retries
prompt = await ctx.run_typed("Fetch prompt", fetch_prompt)There is no surrounding function, no Why existing code does not prevent it Section 3 of the guide correctly shows What the impact would be Users following the Prompt Management section and copying the snippet to integrate Langfuse prompt fetching will receive either a How to fix it Wrap the from langfuse import get_client
langfuse = get_client()
def fetch_prompt() -> str:
prompt = langfuse.get_prompt("claim-agent", type="text")
return prompt.compile()
@claim_service.handler()
async def run(ctx: restate.Context, req: ClaimDocument) -> str:
# Durably journaled — same prompt is used on retries
prompt = await ctx.run_typed("Fetch prompt", fetch_prompt)
...Step-by-step proof
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 The
.envblock on line 34 usesLANGFUSE_HOSTbut the correct Langfuse SDK environment variable isLANGFUSE_BASE_URL. Users who self-host or use a regional endpoint will setLANGFUSE_HOST, which the SDK silently ignores, causing all traces to be sent tocloud.langfuse.cominstead of their configured endpoint with no error or warning.Extended reasoning...
Bug: Wrong Environment Variable Name (LANGFUSE_HOST should be LANGFUSE_BASE_URL)
What the bug is and how it manifests:
In the
.envconfiguration block (line 34), the guide instructs users to setLANGFUSE_HOST=https://cloud.langfuse.com. The Langfuse Python SDK does not recognizeLANGFUSE_HOSTas a valid environment variable -- it looks forLANGFUSE_BASE_URL. As a result, if a user sets onlyLANGFUSE_HOST, the SDK silently ignores it and defaults tohttps://cloud.langfuse.com.The specific code path that triggers it:
The guide uses the new-style v3+ Python SDK API (
from langfuse import get_client), which readsLANGFUSE_BASE_URLto determine the host. Whenget_client()is called in__main__.py, it reads environment variables. SinceLANGFUSE_HOSTis not recognized, any custom host value is silently dropped. The official SDK docs confirm: configure the host argument or theLANGFUSE_BASE_URLenvironment variable, and the v2-to-v3 upgrade guide explicitly states the Langfuse base URL environment variable is nowLANGFUSE_BASE_URL.Why existing code does not prevent it:
The SDK does not emit a warning when
LANGFUSE_HOSTis set butLANGFUSE_BASE_URLis absent. The call toget_client()succeeds regardless, defaulting to the cloud endpoint. There is no validation error or log message to alert the user that their configured host is being ignored.Impact:
This disproportionately affects self-hosting users and those using EU/US regional endpoints. A developer following this guide who self-hosts Langfuse would set
LANGFUSE_HOST=https://langfuse.mycompany.com, see the workflow run successfully, and then be confused why traces appear oncloud.langfuse.com-- or fail to appear anywhere if they have no cloud account. The root cause is not obvious since no error is surfaced.How to fix it:
Change
LANGFUSE_HOSTtoLANGFUSE_BASE_URLon line 34. This is consistent with every other modern framework integration guide in this repo (openai-agents.mdx, claude-agent-sdk.mdx, autogen.mdx, haystack.mdx, temporal.mdx, smolagents.mdx, etc.) and matches the official SDK documentation.Step-by-step proof:
https://langfuse.internal.company.comLANGFUSE_HOST=https://langfuse.internal.company.comin.env__main__.pycallsget_client()-- the SDK reads environment variables looking forLANGFUSE_BASE_URL, finds it absent, and silently defaults tohttps://cloud.langfuse.comcloud.langfuse.cominstead of the user's self-hosted instance