Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions backend/app/api/routes/v1/garmin_webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from app.services.providers.factory import ProviderFactory
from app.services.providers.garmin.data_247 import Garmin247Data
from app.services.providers.garmin.workouts import GarminWorkouts
from app.services.raw_payload_storage import store_raw_payload
from app.utils.structured_logging import log_structured

router = APIRouter()
Expand Down Expand Up @@ -641,6 +642,13 @@ async def garmin_push_notification(
garmin_user_ids=garmin_user_ids,
)

store_raw_payload(
source="webhook",
provider="garmin",
payload=payload,
trace_id=request_trace_id,
)

processed_count = 0
saved_count = 0
errors: list[str] = []
Expand Down
6 changes: 6 additions & 0 deletions backend/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ class Settings(BaseSettings):

xml_chunk_size: int = 50_000

# RAW PAYLOAD STORAGE
raw_payload_storage: str = "disabled" # disabled | log | s3
raw_payload_max_size_bytes: int = 10 * 1024 * 1024 # 10 MB
raw_payload_s3_bucket: str | None = None # defaults to aws_bucket_name if not set
raw_payload_s3_prefix: str = "raw-payloads"

@field_validator("cors_origins", mode="after")
@classmethod
def assemble_cors_origins(cls, v: str | list[str]) -> list[str] | str:
Expand Down
7 changes: 7 additions & 0 deletions backend/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from app.integrations.celery import create_celery
from app.integrations.sentry import init_sentry
from app.middlewares import add_cors_middleware
from app.services import raw_payload_storage
from app.utils.exceptions import DatetimeParseError, handle_exception

# Configure logging to use stdout instead of stderr
Expand All @@ -25,6 +26,12 @@
api = FastAPI(title=settings.api_name)
celery_app = create_celery()
init_sentry()
raw_payload_storage.configure(
settings.raw_payload_storage,
settings.raw_payload_max_size_bytes,
s3_bucket=settings.raw_payload_s3_bucket or settings.aws_bucket_name,
s3_prefix=settings.raw_payload_s3_prefix,
)

add_cors_middleware(api)

Expand Down
187 changes: 187 additions & 0 deletions backend/app/services/raw_payload_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
"""Optional raw payload storage for debugging incoming data.

Stores raw payloads received from SDKs, webhooks, and API responses.
Disabled by default - enable via RAW_PAYLOAD_STORAGE env var.

Supported backends:
- "disabled" (default): no-op
- "log": prints JSON to stdout
- "s3": uploads to S3 bucket (configured via RAW_PAYLOAD_S3_BUCKET / AWS creds)

Usage (one-liner at ingestion point):
store_raw_payload(source="webhook", provider="garmin", payload=data)
"""

import json
import logging
import sys
from datetime import UTC, datetime
from typing import Any
from uuid import uuid4

from app.utils.structured_logging import json_serial

logger = logging.getLogger(__name__)

_storage_backend: str = "disabled"
_max_size_bytes: int = 10 * 1024 * 1024 # 10 MB
_s3_bucket: str | None = None
_s3_prefix: str = "raw-payloads"
_s3_client: Any = None


def configure(
storage_backend: str,
max_size_bytes: int,
s3_bucket: str | None = None,
s3_prefix: str = "raw-payloads",
) -> None:
"""Called once at startup from settings."""
global _storage_backend, _max_size_bytes, _s3_bucket, _s3_prefix, _s3_client
_storage_backend = storage_backend
_max_size_bytes = max_size_bytes
_s3_prefix = s3_prefix

if storage_backend == "s3":
_s3_bucket = s3_bucket
if not _s3_bucket:
logger.error("RAW_PAYLOAD_STORAGE=s3 but no S3 bucket configured")
_storage_backend = "disabled"
return
_s3_client = _create_s3_client()
if _s3_client is None:
logger.error("Failed to create S3 client - raw payload storage disabled")
_storage_backend = "disabled"


def _create_s3_client() -> Any:
"""Create a boto3 S3 client using app AWS settings."""
try:
import boto3
from botocore.exceptions import NoCredentialsError

from app.config import settings

return boto3.client(
"s3",
region_name=settings.aws_region,
aws_access_key_id=settings.aws_access_key_id,
aws_secret_access_key=settings.aws_secret_access_key.get_secret_value(),
)
except (NoCredentialsError, AttributeError, Exception) as e:
logger.error("Cannot create S3 client for raw payload storage: %s", e)
return None


def store_raw_payload(
*,
source: str,
provider: str,
payload: Any,
user_id: str | None = None,
trace_id: str | None = None,
) -> None:
"""Store a raw payload. No-op when disabled.

Args:
source: Origin type - "sdk", "webhook", or "api_response"
provider: Provider name (e.g. "garmin", "apple", "strava")
payload: Raw data (dict, list, or pre-serialized string)
user_id: Optional user identifier for correlation
trace_id: Optional trace/batch ID for correlation with processed data
"""
if _storage_backend == "disabled":
return

payload_str = payload if isinstance(payload, str) else json.dumps(payload, default=json_serial)

# Skip payloads that exceed size limit
size = len(payload_str.encode("utf-8"))
if size > _max_size_bytes:
logger.warning(
"Raw payload skipped (size %d bytes exceeds limit %d)",
size,
_max_size_bytes,
)
return

if _storage_backend == "log":
_store_to_log(source, provider, payload_str, size, user_id, trace_id)
elif _storage_backend == "s3":
_store_to_s3(source, provider, payload_str, size, user_id, trace_id)


def _store_to_log(
source: str,
provider: str,
payload_str: str,
size: int,
user_id: str | None,
trace_id: str | None,
) -> None:
entry: dict[str, Any] = {
"level": "debug",
"message": "raw_payload",
"source": source,
"provider": provider,
"size_bytes": size,
}
if user_id:
entry["user_id"] = user_id
if trace_id:
entry["trace_id"] = trace_id
entry["payload"] = payload_str

print(json.dumps(entry), file=sys.stdout, flush=True)


def _store_to_s3(
source: str,
provider: str,
payload_str: str,
size: int,
user_id: str | None,
trace_id: str | None,
) -> None:
"""Upload raw payload to S3.

Key format: {prefix}/{provider}/{source}/{YYYY-MM-DD}/{uuid}.json
Metadata includes user_id, trace_id, and size for easy filtering.
"""
if _s3_client is None or _s3_bucket is None:
logger.warning("S3 client or bucket not configured - skipping raw payload storage")
return

now = datetime.now(UTC)
date_part = now.strftime("%Y-%m-%d")
file_id = uuid4().hex[:12]
user_part = user_id if user_id else "_unknown"
key = f"{_s3_prefix}/{provider}/{source}/{date_part}/{user_part}/{file_id}.json"

metadata: dict[str, str] = {
"source": source,
"provider": provider,
"size_bytes": str(size),
"timestamp": now.isoformat(),
}
if user_id:
metadata["user_id"] = user_id
if trace_id:
metadata["trace_id"] = trace_id

try:
_s3_client.put_object(
Bucket=_s3_bucket,
Key=key,
Body=payload_str.encode("utf-8"),
ContentType="application/json",
Metadata=metadata,
)
logger.debug(
"Stored raw payload to S3: s3://%s/%s (%d bytes)",
_s3_bucket,
key,
size,
)
except Exception:
logger.exception("Failed to store raw payload to S3: s3://%s/%s", _s3_bucket, key)
142 changes: 142 additions & 0 deletions backend/tests/services/test_raw_payload_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
"""Tests for raw payload storage backends."""

import json
from unittest.mock import MagicMock, patch

import pytest

from app.services import raw_payload_storage


@pytest.fixture(autouse=True)
def _reset_module_state() -> None:
"""Reset module-level globals before each test."""
raw_payload_storage._storage_backend = "disabled"
raw_payload_storage._max_size_bytes = 10 * 1024 * 1024
raw_payload_storage._s3_bucket = None
raw_payload_storage._s3_prefix = "raw-payloads"
raw_payload_storage._s3_client = None


class TestConfigure:
def test_configure_disabled(self) -> None:
raw_payload_storage.configure("disabled", 1024)
assert raw_payload_storage._storage_backend == "disabled"

def test_configure_log(self) -> None:
raw_payload_storage.configure("log", 2048)
assert raw_payload_storage._storage_backend == "log"
assert raw_payload_storage._max_size_bytes == 2048

def test_configure_s3_without_bucket_falls_back_to_disabled(self) -> None:
raw_payload_storage.configure("s3", 1024, s3_bucket=None)
assert raw_payload_storage._storage_backend == "disabled"

def test_configure_s3_with_bucket(self) -> None:
mock_client = MagicMock()
with patch.object(raw_payload_storage, "_create_s3_client", return_value=mock_client):
raw_payload_storage.configure("s3", 1024, s3_bucket="my-bucket", s3_prefix="payloads")

assert raw_payload_storage._storage_backend == "s3"
assert raw_payload_storage._s3_bucket == "my-bucket"
assert raw_payload_storage._s3_prefix == "payloads"
assert raw_payload_storage._s3_client is mock_client

def test_configure_s3_client_creation_fails(self) -> None:
with patch.object(raw_payload_storage, "_create_s3_client", return_value=None):
raw_payload_storage.configure("s3", 1024, s3_bucket="my-bucket")

assert raw_payload_storage._storage_backend == "disabled"


class TestStoreRawPayload:
def test_disabled_is_noop(self) -> None:
raw_payload_storage.configure("disabled", 1024)
# Should not raise
raw_payload_storage.store_raw_payload(source="webhook", provider="garmin", payload={"key": "val"})

def test_payload_exceeding_max_size_is_skipped(self, capsys: pytest.CaptureFixture[str]) -> None:
raw_payload_storage.configure("log", 10) # 10 bytes max
raw_payload_storage.store_raw_payload(source="webhook", provider="garmin", payload={"big": "payload"})
captured = capsys.readouterr()
assert "raw_payload" not in captured.out

def test_log_backend_outputs_json(self, capsys: pytest.CaptureFixture[str]) -> None:
raw_payload_storage.configure("log", 10 * 1024 * 1024)
raw_payload_storage.store_raw_payload(
source="webhook",
provider="garmin",
payload={"test": True},
user_id="user-123",
trace_id="trace-abc",
)
captured = capsys.readouterr()
entry = json.loads(captured.out.strip())
assert entry["message"] == "raw_payload"
assert entry["source"] == "webhook"
assert entry["provider"] == "garmin"
assert entry["user_id"] == "user-123"
assert entry["trace_id"] == "trace-abc"

def test_s3_backend_uploads_payload(self) -> None:
mock_client = MagicMock()
mock_client.put_object.return_value = {"ETag": "test-etag"}

with patch.object(raw_payload_storage, "_create_s3_client", return_value=mock_client):
raw_payload_storage.configure("s3", 10 * 1024 * 1024, s3_bucket="test-bucket", s3_prefix="raw")

raw_payload_storage.store_raw_payload(
source="webhook",
provider="garmin",
payload={"activity": "running"},
user_id="user-456",
trace_id="trace-xyz",
)

mock_client.put_object.assert_called_once()
call_kwargs = mock_client.put_object.call_args[1]
assert call_kwargs["Bucket"] == "test-bucket"
assert "garmin/webhook/" in call_kwargs["Key"]
assert "/user-456/" in call_kwargs["Key"]
assert call_kwargs["Key"].endswith(".json")
assert call_kwargs["ContentType"] == "application/json"
assert call_kwargs["Metadata"]["provider"] == "garmin"
assert call_kwargs["Metadata"]["user_id"] == "user-456"
assert call_kwargs["Metadata"]["trace_id"] == "trace-xyz"

body = call_kwargs["Body"].decode("utf-8")
assert json.loads(body) == {"activity": "running"}

def test_s3_backend_handles_upload_error(self) -> None:
mock_client = MagicMock()
mock_client.put_object.side_effect = Exception("S3 error")

with patch.object(raw_payload_storage, "_create_s3_client", return_value=mock_client):
raw_payload_storage.configure("s3", 10 * 1024 * 1024, s3_bucket="test-bucket")

# Should not raise - errors are logged
raw_payload_storage.store_raw_payload(source="sdk", provider="apple", payload="raw-xml-data")

def test_s3_backend_unknown_user_fallback(self) -> None:
mock_client = MagicMock()
mock_client.put_object.return_value = {"ETag": "test-etag"}

with patch.object(raw_payload_storage, "_create_s3_client", return_value=mock_client):
raw_payload_storage.configure("s3", 10 * 1024 * 1024, s3_bucket="test-bucket")

raw_payload_storage.store_raw_payload(source="webhook", provider="garmin", payload={"x": 1})

call_kwargs = mock_client.put_object.call_args[1]
assert "/_unknown/" in call_kwargs["Key"]

def test_s3_backend_pre_serialized_string(self) -> None:
mock_client = MagicMock()
mock_client.put_object.return_value = {"ETag": "test-etag"}

with patch.object(raw_payload_storage, "_create_s3_client", return_value=mock_client):
raw_payload_storage.configure("s3", 10 * 1024 * 1024, s3_bucket="test-bucket")

raw_payload_storage.store_raw_payload(source="sdk", provider="apple", payload='{"pre":"serialized"}')

call_kwargs = mock_client.put_object.call_args[1]
assert call_kwargs["Body"] == b'{"pre":"serialized"}'
Loading