Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,28 +1,40 @@
ACCESS_TOKEN_EXPIRE_MINUTES=30
ALGORITHM=HS256
API_PREFIX=/api/v1
CORS_ORIGINS=["http://192.168.2.21:3000","http://192.168.2.21:8000"]
ENVIRONMENT=dev
LOG_LEVEL=INFO
OLTP_STD_LOGGING_ENABLED=True
OLTP_LOG_METHOD=none
PROJECT_NAME="FastAPI Template"
RELOAD=False
SECRET_KEY=CHANGE_ME_IN_PRODUCTION
HATCHET_CLIENT_TOKEN=CHANGE_ME_IN_PRODUCTION

# Worker Configuration
# FASTAPI Reload
WATCHFILES_IGNORE_PERMISSION_DENIED=1

## Worker Configuration
WORKER_COUNT=5
ML_WORKER_COUNT=3

# TODO: Are these needed?
## OLTP
OLTP_STD_LOGGING_ENABLED=True
### TODO: Are these needed?
# OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED=True
# OTEL_LOGS_EXPORTER=otlp
# OTEL_METRIC_EXPORT_INTERVAL=5000 # So we don't have to wait 60s for metrics

WATCHFILES_IGNORE_PERMISSION_DENIED=1

## LANGFUSE (Optional)
### Option 1: Langfuse
OLTP_LOG_METHOD=langfuse
LANGFUSE_BASE_URL=FILL_IT
LANGFUSE_PUBLIC_KEY=FILL_IT
LANGFUSE_SECRET_KEY=FILL_IT

### Option 2: Logfire
OLTP_LOG_METHOD=logfire
LOGFIRE_TOKEN=FILL_IT

### Option 3: Custom OTLP Endpoint
OLTP_LOG_METHOD=manual
OTLP_ENDPOINT=http://localhost:4317

### Option 4: No logging
OLTP_LOG_METHOD=none
51 changes: 27 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,6 @@

FastAPI Template with Industry Standard Observability

## Logfire

- [Logfire](https://github.com/pydantic/logfire) is Uncomplicated Observability for Python by Pydantic Team.
- Create a new project in [Logfire](https://logfire-us.pydantic.dev) with name `api-template`

```bash
logfire auth
logfire projects use api-template
```

Set in `.env`: `OLTP_LOG_METHOD=logfire`

## Langfuse

Set in `.env`: `OLTP_LOG_METHOD=langfuse`

Also set the following in `.env`:

```bash
LANGFUSE_BASE_URL=FILL_IT
LANGFUSE_PUBLIC_KEY=FILL_IT
LANGFUSE_SECRET_KEY=FILL_IT
```

## Hatchet

Background workflows are powered by [Hatchet](https://hatchet.run).
Expand All @@ -43,6 +19,33 @@ cd api-workers-general && make run-worker
cd api-workers-ml && make run-worker
```

## OpenTelemetry

You can choose where to send OpenTelemetry traces. You can use any OpenTelemetry compatible backend, but the template has built-in support for `Logfire` and `Langfuse`.

### Logfire

- [Logfire](https://github.com/pydantic/logfire) is Uncomplicated Observability for Python by Pydantic Team.
- Create a new project in [Logfire](https://logfire-us.pydantic.dev) with name `fastapi-template`

Set in `.env`

```bash
OLTP_LOG_METHOD=logfire
LOGFIRE_TOKEN=FILL_IT
```

### Langfuse

Set in `.env`:

```bash
OLTP_LOG_METHOD=langfuse
LANGFUSE_BASE_URL=FILL_IT
LANGFUSE_PUBLIC_KEY=FILL_IT
LANGFUSE_SECRET_KEY=FILL_IT
```

## Migrations

```bash
Expand Down
2 changes: 1 addition & 1 deletion api-shared/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description = "Shared dependencies for both api and the worker services."
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"hatchet-sdk[otel]>=1.24",
"hatchet-sdk[otel]>=1.27",
"httpx>=0.28.1",
"loguru>=0.7.3",
"opentelemetry-distro[otlp]>=0.52b0",
Expand Down
8 changes: 4 additions & 4 deletions api-shared/src/api_shared/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ class Environment(StrEnum):
"""Possible environments."""

DEV = "dev"
TEST = "test"
PROD = "prod"
TEST = "test"


class OLTPLogMethod(StrEnum):
NONE = "none"
MANUAL = "manual"
LOGFIRE = "logfire"
LANGFUSE = "langfuse"
LOGFIRE = "logfire"
MANUAL = "manual"
NONE = "none"


class SharedBaseSettings(BaseSettings):
Expand Down
29 changes: 26 additions & 3 deletions api-shared/src/api_shared/hatchet_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

from hatchet_sdk import Hatchet
from hatchet_sdk.opentelemetry.instrumentor import HatchetInstrumentor
from loguru import logger
from opentelemetry.trace import get_tracer_provider

from api_shared.core.settings import OLTPLogMethod, settings
from api_shared.core.settings import Environment, OLTPLogMethod, settings


@cache
Expand All @@ -14,10 +15,32 @@ def get_hatchet() -> Hatchet:
DI wires higher-level objects (e.g., `ExternalRunner`), while this cache
avoids re-creating the underlying Hatchet client and its connections.
"""
hatchet = Hatchet(debug=settings.ENVIRONMENT == "dev")
hatchet = Hatchet(debug=settings.ENVIRONMENT == Environment.DEV)
if settings.OLTP_LOG_METHOD != OLTPLogMethod.NONE:
HatchetInstrumentor(tracer_provider=get_tracer_provider()).instrument()
return hatchet


__all__ = ["get_hatchet"]
async def ensure_hatchet_connection() -> None:
"""Validate Hatchet connectivity for application startup."""
if settings.ENVIRONMENT == Environment.TEST:
return

hatchet = get_hatchet()
try:
hatchet_version = await hatchet.dispatcher.get_version()
logger.info(
"Successfully connected to Hatchet host:{} version:{}",
hatchet.config.host_port,
hatchet_version,
)
except Exception as exc:
details_fn = getattr(exc, "details", None)
details = details_fn() if callable(details_fn) else None
message = details or str(exc)
raise ConnectionError(
f"Failed to connect to Hatchet at {hatchet.config.host_port}: {message}"
) from exc


__all__ = ["ensure_hatchet_connection", "get_hatchet"]
9 changes: 5 additions & 4 deletions api-workers-general/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions api-workers-ml/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions app/api/lifespan.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from contextlib import asynccontextmanager
from typing import AsyncGenerator

from api_shared.hatchet_client import ensure_hatchet_connection
from fastapi import FastAPI

from app.core.telemetry import setup_opentelemetry, setup_prometheus, stop_opentelemetry
Expand All @@ -15,6 +16,8 @@ async def lifespan_setup(

app.middleware_stack = None

await ensure_hatchet_connection()

setup_db(app)
setup_opentelemetry(app)
setup_prometheus(app)
Expand Down
4 changes: 2 additions & 2 deletions app/api/tasks/deps.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from app.external.runner import ExternalRunner, get_external_runner
from app.core.hatchet import ExternalRunner


def get_runner() -> ExternalRunner:
return get_external_runner()
return ExternalRunner()
2 changes: 1 addition & 1 deletion app/api/tasks/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
)

from app.api.tasks.deps import get_runner
from app.external.runner import ExternalRunner
from app.core.hatchet import ExternalRunner

router = APIRouter(prefix="/tasks/general", tags=["tasks"])

Expand Down
2 changes: 1 addition & 1 deletion app/api/tasks/ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
)

from app.api.tasks.deps import get_runner
from app.external.runner import ExternalRunner
from app.core.hatchet import ExternalRunner

router = APIRouter(prefix="/tasks/ml", tags=["ml-tasks"])

Expand Down
33 changes: 32 additions & 1 deletion app/core/hatchet.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,34 @@
from typing import Any

from api_shared.hatchet_client import get_hatchet
from hatchet_sdk import Hatchet
from hatchet_sdk.clients.rest.models.v1_workflow_run import V1WorkflowRun
from hatchet_sdk.clients.rest.models.v1_workflow_run_details import (
V1WorkflowRunDetails,
)


class ExternalRunner:
"""Abstraction over external task execution/read operations."""

def __init__(self, hatchet: Hatchet | None = None):
self.hatchet = hatchet or get_hatchet()

async def trigger_task(
self,
*,
name: str,
input: Any,
input_validator: type[Any] | None = None,
output_validator: type[Any] | None = None,
) -> V1WorkflowRun:
stub = self.hatchet.stubs.task(
name=name,
input_validator=input_validator,
output_validator=output_validator,
)
run_ref = await stub.aio_run_no_wait(input=input)
return (await self.hatchet.runs.aio_get(run_ref.workflow_run_id)).run

__all__ = ["get_hatchet"]
async def get_task(self, task_id: str) -> V1WorkflowRunDetails:
return await self.hatchet.runs.aio_get(task_id)
3 changes: 0 additions & 3 deletions app/external/__init__.py

This file was deleted.

40 changes: 0 additions & 40 deletions app/external/runner.py

This file was deleted.

Loading