Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [custom_metric](custom_metric) - Custom metric to record the workflow type in the activity schedule to start latency.
* [dsl](dsl) - DSL workflow that executes steps defined in a YAML file.
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
* [expense](expense) - Human-in-the-loop processing and asynchronous activity completion.
* [gevent_async](gevent_async) - Combine gevent and Temporal.
* [langchain](langchain) - Orchestrate workflows for LangChain.
* [message_passing/introduction](message_passing/introduction/) - Introduction to queries, signals, and updates.
Expand Down
63 changes: 63 additions & 0 deletions expense/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Expense

This sample workflow processes an expense request. It demonstrates human-in-the loop processing and asynchronous activity completion.

## Overview

This sample demonstrates the following workflow:

1. **Create Expense**: The workflow executes the `create_expense_activity` to initialize a new expense report in the external system.

2. **Wait for Decision**: The workflow calls `wait_for_decision_activity`, which demonstrates asynchronous activity completion. The activity registers itself for external completion using its task token, then calls `activity.raise_complete_async()` to signal that it will complete later without blocking the worker.

3. **Async Completion**: When a human approves or rejects the expense, an external process uses the stored task token to call `workflow_client.get_async_activity_handle(task_token).complete()`, notifying Temporal that the waiting activity has finished and providing the decision result.

4. **Process Payment**: Once the workflow receives the approval decision, it executes the `payment_activity` to complete the simulated expense processing.

This pattern enables human-in-the-loop workflows where activities can wait as long as necessary for external decisions without consuming worker resources or timing out.

## Steps To Run Sample

* You need a Temporal service running. See the main [README.md](../README.md) for more details.
* Start the sample expense system UI:
```bash
uv run -m expense.ui
```
* Start workflow and activity workers:
```bash
uv run -m expense.worker
```
* Start expense workflow execution:
```bash
uv run -m expense.starter
```
* When you see the console print out that the expense is created, go to [localhost:8099/list](http://localhost:8099/list) to approve the expense.
* You should see the workflow complete after you approve the expense. You can also reject the expense.

## Running Tests

```bash
# Run all tests
uv run pytest expense/test_workflow.py -v

# Run a specific test
uv run pytest expense/test_workflow.py::TestSampleExpenseWorkflow::test_workflow_with_mock_activities -v
```

## Key Concepts Demonstrated

* **Human-in-the-Loop Workflows**: Long-running workflows that wait for human interaction
* **Async Activity Completion**: Using `activity.raise_complete_async()` to indicate an activity will complete asynchronously, then calling `complete()` on a handle to the async activity.
* **External System Integration**: Communication between workflows and external systems via web services.

## Troubleshooting

If you see the workflow failed, the cause may be a port conflict. You can try to change to a different port number in `__init__.py`. Then rerun everything.

## Files

* `workflow.py` - The main expense processing workflow
* `activities.py` - Three activities: create expense, wait for decision, process payment
* `ui.py` - A demonstration expense approval system web UI
* `worker.py` - Worker to run workflows
* `starter.py` - Client to start workflow executions by submitting an expense report
3 changes: 3 additions & 0 deletions expense/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
EXPENSE_SERVER_HOST = "localhost"
EXPENSE_SERVER_PORT = 8099
EXPENSE_SERVER_HOST_PORT = f"http://{EXPENSE_SERVER_HOST}:{EXPENSE_SERVER_PORT}"
89 changes: 89 additions & 0 deletions expense/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import httpx
from temporalio import activity

from expense import EXPENSE_SERVER_HOST_PORT


@activity.defn
async def create_expense_activity(expense_id: str) -> None:
if not expense_id:
raise ValueError("expense id is empty")

async with httpx.AsyncClient() as client:
response = await client.get(
f"{EXPENSE_SERVER_HOST_PORT}/create",
params={"is_api_call": "true", "id": expense_id},
)
response.raise_for_status()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically 4xx is probably a ApplicationError with non_retryable=True, but that's a bit pedantic. But with this setup, an activity that, say, has invalid auth will retry forever

body = response.text

if body == "SUCCEED":
activity.logger.info(f"Expense created. ExpenseID: {expense_id}")
return

raise Exception(body)


@activity.defn
async def wait_for_decision_activity(expense_id: str) -> str:
"""
Wait for the expense decision. This activity will complete asynchronously. When this function
calls activity.raise_complete_async(), the Temporal Python SDK recognizes this and won't mark this activity
as failed or completed. The Temporal server will wait until Client.complete_activity() is called or timeout happened
whichever happen first. In this sample case, the complete_activity() method is called by our sample expense system when
the expense is approved.
"""
if not expense_id:
raise ValueError("expense id is empty")

logger = activity.logger

# Save current activity info so it can be completed asynchronously when expense is approved/rejected
activity_info = activity.info()
task_token = activity_info.task_token

register_callback_url = f"{EXPENSE_SERVER_HOST_PORT}/registerCallback"

async with httpx.AsyncClient() as client:
response = await client.post(
register_callback_url,
params={"id": expense_id},
data={"task_token": task_token.hex()},
)
response.raise_for_status()
body = response.text

status = body
if status == "SUCCEED":
# register callback succeed
logger.info(f"Successfully registered callback. ExpenseID: {expense_id}")

# Raise the complete-async error which will return from this function but
# does not mark the activity as complete from the workflow perspective.
#
# Activity completion is signaled in the `notify_expense_state_change`
# function in `ui.py`.
activity.raise_complete_async()

logger.warning(f"Register callback failed. ExpenseStatus: {status}")
raise Exception(f"register callback failed status: {status}")


@activity.defn
async def payment_activity(expense_id: str) -> None:
if not expense_id:
raise ValueError("expense id is empty")

async with httpx.AsyncClient() as client:
response = await client.get(
f"{EXPENSE_SERVER_HOST_PORT}/action",
params={"is_api_call": "true", "type": "payment", "id": expense_id},
)
response.raise_for_status()
body = response.text

if body == "SUCCEED":
activity.logger.info(f"payment_activity succeed ExpenseID: {expense_id}")
return

raise Exception(body)
27 changes: 27 additions & 0 deletions expense/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import asyncio
import uuid

from temporalio.client import Client

from .workflow import SampleExpenseWorkflow


async def main():
# The client is a heavyweight object that should be created once per process.
client = await Client.connect("localhost:7233")

expense_id = str(uuid.uuid4())

# Start the workflow (don't wait for completion)
handle = await client.start_workflow(
SampleExpenseWorkflow.run,
expense_id,
id=f"expense_{expense_id}",
task_queue="expense",
)

print(f"Started workflow WorkflowID {handle.id} RunID {handle.result_run_id}")


if __name__ == "__main__":
asyncio.run(main())
193 changes: 193 additions & 0 deletions expense/ui.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import asyncio
from enum import Enum
from typing import Dict, Optional

import uvicorn
from fastapi import FastAPI, Form, Query
from fastapi.responses import HTMLResponse, PlainTextResponse
from temporalio.client import Client

from expense import EXPENSE_SERVER_HOST, EXPENSE_SERVER_PORT


class ExpenseState(str, Enum):
CREATED = "CREATED"
APPROVED = "APPROVED"
REJECTED = "REJECTED"
COMPLETED = "COMPLETED"


# Use memory store for this sample expense system
all_expenses: Dict[str, ExpenseState] = {}
token_map: Dict[str, bytes] = {}

app = FastAPI()

# Global client - will be initialized when starting the server
workflow_client: Optional[Client] = None


@app.get("/", response_class=HTMLResponse)
@app.get("/list", response_class=HTMLResponse)
async def list_handler():
html = """
<h1>SAMPLE EXPENSE SYSTEM</h1>
<a href="/list">HOME</a>
<h3>All expense requests:</h3>
<table border=1>
<tr><th>Expense ID</th><th>Status</th><th>Action</th></tr>
"""

# Sort keys for consistent display
for expense_id in sorted(all_expenses.keys()):
state = all_expenses[expense_id]
action_link = ""
if state == ExpenseState.CREATED:
action_link = f"""
<a href="/action?type=approve&id={expense_id}">
<button style="background-color:#4CAF50;">APPROVE</button>
</a>
&nbsp;&nbsp;
<a href="/action?type=reject&id={expense_id}">
<button style="background-color:#f44336;">REJECT</button>
</a>
"""
html += f"<tr><td>{expense_id}</td><td>{state}</td><td>{action_link}</td></tr>"

html += "</table>"
return html


@app.get("/action", response_class=HTMLResponse)
async def action_handler(
type: str = Query(...), id: str = Query(...), is_api_call: str = Query("false")
):
if id not in all_expenses:
if is_api_call == "true":
return PlainTextResponse("ERROR:INVALID_ID")
else:
return PlainTextResponse("Invalid ID")

old_state = all_expenses[id]

if type == "approve":
all_expenses[id] = ExpenseState.APPROVED
elif type == "reject":
all_expenses[id] = ExpenseState.REJECTED
elif type == "payment":
all_expenses[id] = ExpenseState.COMPLETED
else:
if is_api_call == "true":
return PlainTextResponse("ERROR:INVALID_TYPE")
else:
return PlainTextResponse("Invalid action type")

if is_api_call == "true" or type == "payment":
# For API calls and payment, just return success
if old_state == ExpenseState.CREATED and all_expenses[id] in [
ExpenseState.APPROVED,
ExpenseState.REJECTED,
]:
# Report state change
await notify_expense_state_change(id, all_expenses[id])

print(f"Set state for {id} from {old_state} to {all_expenses[id]}")
return PlainTextResponse("SUCCEED")
else:
# For UI calls, notify and redirect to list
if old_state == ExpenseState.CREATED and all_expenses[id] in [
ExpenseState.APPROVED,
ExpenseState.REJECTED,
]:
await notify_expense_state_change(id, all_expenses[id])

print(f"Set state for {id} from {old_state} to {all_expenses[id]}")
return await list_handler()


@app.get("/create")
async def create_handler(id: str = Query(...), is_api_call: str = Query("false")):
if id in all_expenses:
if is_api_call == "true":
return PlainTextResponse("ERROR:ID_ALREADY_EXISTS")
else:
return PlainTextResponse("ID already exists")

all_expenses[id] = ExpenseState.CREATED

if is_api_call == "true":
print(f"Created new expense id: {id}")
return PlainTextResponse("SUCCEED")
else:
print(f"Created new expense id: {id}")
return await list_handler()


@app.get("/status")
async def status_handler(id: str = Query(...)):
if id not in all_expenses:
return PlainTextResponse("ERROR:INVALID_ID")

state = all_expenses[id]
print(f"Checking status for {id}: {state}")
return PlainTextResponse(state.value)


@app.post("/registerCallback")
async def callback_handler(id: str = Query(...), task_token: str = Form(...)):
if id not in all_expenses:
return PlainTextResponse("ERROR:INVALID_ID")

curr_state = all_expenses[id]
if curr_state != ExpenseState.CREATED:
return PlainTextResponse("ERROR:INVALID_STATE")

# Convert hex string back to bytes
try:
task_token_bytes = bytes.fromhex(task_token)
except ValueError:
return PlainTextResponse("ERROR:INVALID_FORM_DATA")

print(f"Registered callback for ID={id}, token={task_token}")
token_map[id] = task_token_bytes
return PlainTextResponse("SUCCEED")


async def notify_expense_state_change(expense_id: str, state: str):
if expense_id not in token_map:
print(f"Invalid id: {expense_id}")
return

if workflow_client is None:
print("Workflow client not initialized")
return

token = token_map[expense_id]
try:
handle = workflow_client.get_async_activity_handle(task_token=token)
await handle.complete(state)
print(f"Successfully complete activity: {token.hex()}")
except Exception as err:
print(f"Failed to complete activity with error: {err}")


async def main():
global workflow_client

# Initialize the workflow client
workflow_client = await Client.connect("localhost:7233")

print(
f"Expense system UI available at http://{EXPENSE_SERVER_HOST}:{EXPENSE_SERVER_PORT}"
)

# Start the FastAPI server
config = uvicorn.Config(
app, host="0.0.0.0", port=EXPENSE_SERVER_PORT, log_level="info"
)
server = uvicorn.Server(config)
await server.serve()


if __name__ == "__main__":
asyncio.run(main())
Loading
Loading