Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,5 @@ CLAUDE.local.md
*.local.mdc

# Exception: AI tool configs that should be tracked
!.gemini/settings.json
!.gemini/settings.json
logs
36 changes: 36 additions & 0 deletions backend/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,42 @@ uv run pytest -v --cov=app
Use `uv add <package-name>` to add new dependencies (automatically updates pyproject.toml, lockfile, and venv).
Run `uv run ruff check . --fix && uv run ruff format .` after making changes.

## Observability

OpenTelemetry-based tracing and metrics. Auto-instruments HTTP requests, database queries, Redis, httpx, and Celery tasks.

### Configuration

```bash
OTEL_ENABLED=true # Enable/disable all observability
OTEL_EXPORTER_ENDPOINT=localhost:4317 # OTLP collector endpoint
OTEL_SERVICE_NAME=open-wearables-api # Service name in traces
```

### Running the Stack

```bash
# Start observability services (Grafana, Tempo, Prometheus, Loki, OTEL Collector)
docker compose -f docker-compose.yml -f docker-compose.observability.yml up -d

# View traces, metrics, and logs at http://localhost:3001 (Grafana)
```

### Recording Metrics

```python
from app.integrations.observability import record_metric, record_histogram

# Counter metrics (increment by 1 or custom value)
record_metric("oauth_attempts", labels={"provider": "garmin"})
record_metric("workouts_synced", 5, {"provider": "polar"})

# Histogram metrics (durations, sizes, etc.)
record_histogram("provider_sync_duration", 2.5, {"provider": "garmin"})
```

All helpers are no-ops when `OTEL_ENABLED=false`.

## Detailed Layer Rules

### Models Layer (`app/models/`)
Expand Down
59 changes: 39 additions & 20 deletions backend/app/api/routes/v1/oauth.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from logging import getLogger
from typing import Annotated
from uuid import UUID

Expand All @@ -6,6 +7,7 @@

from app.database import DbSession
from app.integrations.celery.tasks import sync_vendor_data
from app.integrations.observability import record_metric
from app.schemas import (
AuthorizationURLResponse,
BulkProviderSettingsUpdate,
Expand All @@ -18,6 +20,8 @@
from app.services.providers.base_strategy import BaseProviderStrategy
from app.services.providers.factory import ProviderFactory

logger = getLogger(__name__)

router = APIRouter()
factory = ProviderFactory()
settings_service = ProviderSettingsService()
Expand Down Expand Up @@ -46,6 +50,8 @@ async def authorize_provider(

Returns authorization URL where user should be redirected to log in.
"""
record_metric("oauth_attempts", labels={"provider": provider.value})

strategy = get_oauth_strategy(provider)

assert strategy.oauth
Expand All @@ -68,39 +74,52 @@ async def oauth_callback(
Provider redirects here after user authorizes. Exchanges code for tokens.
"""
if error:
record_metric("oauth_failures", labels={"provider": provider.value, "error": error})
return RedirectResponse(
url=f"/api/v1/oauth/error?message={error}:+{error_description or 'Unknown+error'}",
status_code=303,
)

if not code or not state:
record_metric("oauth_failures", labels={"provider": provider.value, "error": "missing_params"})
return RedirectResponse(
url="/api/v1/oauth/error?message=Missing+OAuth+parameters",
status_code=303,
)

strategy = get_oauth_strategy(provider)

assert strategy.oauth
oauth_state = strategy.oauth.handle_callback(db, code, state)

# schedule sync task
sync_vendor_data.delay(
user_id=str(oauth_state.user_id),
start_date=None,
end_date=None,
providers=[provider.value],
)

# If a specific redirect_uri was requested (e.g. by frontend), redirect there
if oauth_state.redirect_uri:
return RedirectResponse(url=oauth_state.redirect_uri, status_code=303)

# Otherwise, redirect to internal success page
return RedirectResponse(
url=f"/api/v1/oauth/success?provider={provider.value}&user_id={oauth_state.user_id}",
status_code=303,
)
try:
assert strategy.oauth
oauth_state = strategy.oauth.handle_callback(db, code, state)

record_metric("oauth_successes", labels={"provider": provider.value})
record_metric("provider_connections", labels={"provider": provider.value})

# schedule sync task
sync_vendor_data.delay(
user_id=str(oauth_state.user_id),
start_date=None,
end_date=None,
providers=[provider.value],
)

# If a specific redirect_uri was requested (e.g. by frontend), redirect there
if oauth_state.redirect_uri:
return RedirectResponse(url=oauth_state.redirect_uri, status_code=303)

# Otherwise, redirect to internal success page
return RedirectResponse(
url=f"/api/v1/oauth/success?provider={provider.value}&user_id={oauth_state.user_id}",
status_code=303,
)
except Exception as e:
logger.exception("OAuth callback failed", extra={"provider": provider.value})
record_metric("oauth_failures", labels={"provider": provider.value, "error": type(e).__name__})
return RedirectResponse(
url=f"/api/v1/oauth/error?message=OAuth+callback+failed:+{type(e).__name__}",
status_code=303,
)


@router.get("/success")
Expand Down
7 changes: 7 additions & 0 deletions backend/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ class Settings(BaseSettings):
SENTRY_SAMPLES_RATE: float = 0.5
SENTRY_ENV: str | None = None

# OpenTelemetry
otel_enabled: bool = False
otel_service_name: str = "open-wearables-api"
otel_service_version: str = "1.0.0"
otel_exporter_endpoint: str = "otel-collector:4317"
otel_log_level: str = "INFO"

# AUTH SETTINGS
secret_key: str
algorithm: str = "HS256"
Expand Down
74 changes: 44 additions & 30 deletions backend/app/integrations/celery/core.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,55 @@
import logging
import sys
from logging import Formatter, StreamHandler, getLogger
import time
from logging import getLogger
from typing import Any

from app.config import settings
from app.integrations.observability import (
configure_logging,
record_task_completed,
record_task_failed,
record_task_started,
)
from app.integrations.observability.tracing import init_celery_tracing
from celery import Celery, signals
from celery import current_app as current_celery_app

logger = getLogger(__name__)

_task_start_times: dict[str, float] = {}


@signals.setup_logging.connect
def setup_celery_logging(**kwargs) -> None:
"""
Configure Celery logging to use stdout instead of stderr.

Some platforms convert stderr logs to level.error automatically, so we must use stdout
to ensure platforms correctly identify log levels from JSON structured logs.

This signal is called when Celery sets up its logging configuration.
"""
# Get Celery's logger
celery_logger = getLogger("celery")

# Remove existing handlers that might use stderr
celery_logger.handlers.clear()

# Create a handler that uses stdout
stdout_handler = StreamHandler(sys.stdout)
stdout_handler.setFormatter(
Formatter(
"[%(asctime)s - %(name)s] (%(levelname)s) %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
)
def setup_celery_logging(**kwargs: Any) -> None:
"""Configure Celery to use the application's structured logging."""
configure_logging()


@signals.celeryd_after_setup.connect
def init_worker_tracing(sender: Any, instance: Any, **kwargs: Any) -> None:
"""Initialize OpenTelemetry tracing in Celery workers."""
init_celery_tracing()


@signals.task_prerun.connect
def task_prerun_handler(task_id: str, task: Any, **kwargs: Any) -> None:
"""Record task start time for duration metrics."""
_task_start_times[task_id] = time.time()
record_task_started(getattr(task, "name", "unknown"))


@signals.task_postrun.connect
def task_postrun_handler(task_id: str, task: Any, retval: Any, state: str, **kwargs: Any) -> None:
"""Record task completion and duration metrics."""
if task_id in _task_start_times:
duration = time.time() - _task_start_times.pop(task_id)
record_task_completed(getattr(task, "name", "unknown"), state, duration)


# Add stdout handler to Celery logger
celery_logger.addHandler(stdout_handler)
celery_logger.setLevel(logging.INFO)
celery_logger.propagate = False
@signals.task_failure.connect
def task_failure_handler(task_id: str, task: Any, exception: Exception, **kwargs: Any) -> None:
"""Record task failures in metrics."""
_task_start_times.pop(task_id, None)
record_task_failed(getattr(task, "name", "unknown"), type(exception).__name__)


def create_celery() -> Celery:
Expand Down
Loading
Loading