diff --git a/backend/app/api/routes/v1/garmin_webhooks.py b/backend/app/api/routes/v1/garmin_webhooks.py index 12b5addf..b9d5ad8d 100644 --- a/backend/app/api/routes/v1/garmin_webhooks.py +++ b/backend/app/api/routes/v1/garmin_webhooks.py @@ -30,6 +30,7 @@ from app.services.providers.factory import ProviderFactory from app.services.providers.garmin.data_247 import Garmin247Data from app.services.providers.garmin.workouts import GarminWorkouts +from app.services.raw_payload_storage import store_raw_payload from app.utils.structured_logging import log_structured router = APIRouter() @@ -641,6 +642,13 @@ async def garmin_push_notification( garmin_user_ids=garmin_user_ids, ) + store_raw_payload( + source="webhook", + provider="garmin", + payload=payload, + trace_id=request_trace_id, + ) + processed_count = 0 saved_count = 0 errors: list[str] = [] diff --git a/backend/app/api/routes/v1/sdk_sync.py b/backend/app/api/routes/v1/sdk_sync.py index 8425227a..8896c677 100644 --- a/backend/app/api/routes/v1/sdk_sync.py +++ b/backend/app/api/routes/v1/sdk_sync.py @@ -5,6 +5,7 @@ from app.integrations.celery.tasks.process_sdk_upload_task import process_sdk_upload from app.schemas import SDKSyncRequest, UploadDataResponse +from app.services.raw_payload_storage import store_raw_payload from app.utils.auth import SDKAuthDep from app.utils.structured_logging import log_structured @@ -87,6 +88,14 @@ async def sync_sdk_data( content_str = body.model_dump_json() + store_raw_payload( + source="sdk", + provider=provider, + payload=content_str, + user_id=user_id, + trace_id=batch_id, + ) + process_sdk_upload.delay( content=content_str, content_type="application/json", diff --git a/backend/app/config.py b/backend/app/config.py index b8117954..ec3dfa45 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -127,6 +127,13 @@ class Settings(BaseSettings): xml_chunk_size: int = 50_000 + # RAW PAYLOAD STORAGE + raw_payload_storage: str = "disabled" # disabled | log | s3 + raw_payload_max_size_bytes: int = 10 * 1024 * 1024 # 10 MB + raw_payload_s3_bucket: str | None = None # defaults to aws_bucket_name if not set + raw_payload_s3_prefix: str = "raw-payloads" + raw_payload_s3_endpoint_url: str | None = None # for S3-compatible storage (e.g. Railway Object Storage) + @field_validator("cors_origins", mode="after") @classmethod def assemble_cors_origins(cls, v: str | list[str]) -> list[str] | str: diff --git a/backend/app/main.py b/backend/app/main.py index 2dadbbdb..5260a4d3 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -11,6 +11,7 @@ from app.integrations.celery import create_celery from app.integrations.sentry import init_sentry from app.middlewares import add_cors_middleware +from app.services import raw_payload_storage from app.utils.exceptions import DatetimeParseError, handle_exception # Configure logging to use stdout instead of stderr @@ -25,6 +26,13 @@ api = FastAPI(title=settings.api_name) celery_app = create_celery() init_sentry() +raw_payload_storage.configure( + settings.raw_payload_storage, + settings.raw_payload_max_size_bytes, + s3_bucket=settings.raw_payload_s3_bucket or settings.aws_bucket_name, + s3_prefix=settings.raw_payload_s3_prefix, + s3_endpoint_url=settings.raw_payload_s3_endpoint_url, +) add_cors_middleware(api) diff --git a/backend/app/services/raw_payload_storage.py b/backend/app/services/raw_payload_storage.py new file mode 100644 index 00000000..d1037367 --- /dev/null +++ b/backend/app/services/raw_payload_storage.py @@ -0,0 +1,190 @@ +"""Optional raw payload storage for debugging incoming data. + +Stores raw payloads received from SDKs, webhooks, and API responses. +Disabled by default - enable via RAW_PAYLOAD_STORAGE env var. + +Supported backends: + - "disabled" (default): no-op + - "log": prints JSON to stdout + - "s3": uploads to S3 bucket (configured via RAW_PAYLOAD_S3_BUCKET / AWS creds) + +Usage (one-liner at ingestion point): + store_raw_payload(source="webhook", provider="garmin", payload=data) +""" + +import json +import logging +import sys +from datetime import UTC, datetime +from typing import Any +from uuid import uuid4 + +from app.utils.structured_logging import json_serial + +logger = logging.getLogger(__name__) + +_storage_backend: str = "disabled" +_max_size_bytes: int = 10 * 1024 * 1024 # 10 MB +_s3_bucket: str | None = None +_s3_prefix: str = "raw-payloads" +_s3_client: Any = None + + +def configure( + storage_backend: str, + max_size_bytes: int, + s3_bucket: str | None = None, + s3_prefix: str = "raw-payloads", + s3_endpoint_url: str | None = None, +) -> None: + """Called once at startup from settings.""" + global _storage_backend, _max_size_bytes, _s3_bucket, _s3_prefix, _s3_client + _storage_backend = storage_backend + _max_size_bytes = max_size_bytes + _s3_prefix = s3_prefix + + if storage_backend == "s3": + _s3_bucket = s3_bucket + if not _s3_bucket: + logger.error("RAW_PAYLOAD_STORAGE=s3 but no S3 bucket configured") + _storage_backend = "disabled" + return + _s3_client = _create_s3_client(endpoint_url=s3_endpoint_url) + if _s3_client is None: + logger.error("Failed to create S3 client - raw payload storage disabled") + _storage_backend = "disabled" + + +def _create_s3_client(endpoint_url: str | None = None) -> Any: + """Create a boto3 S3 client using app AWS settings.""" + try: + import boto3 + from botocore.exceptions import NoCredentialsError + + from app.config import settings + + kwargs: dict[str, Any] = {"region_name": settings.aws_region} + if settings.aws_access_key_id and settings.aws_secret_access_key: + kwargs["aws_access_key_id"] = settings.aws_access_key_id + kwargs["aws_secret_access_key"] = settings.aws_secret_access_key.get_secret_value() + if endpoint_url: + kwargs["endpoint_url"] = endpoint_url + + return boto3.client("s3", **kwargs) + except (NoCredentialsError, AttributeError, Exception) as e: + logger.error("Cannot create S3 client for raw payload storage: %s", e) + return None + + +def store_raw_payload( + *, + source: str, + provider: str, + payload: Any, + user_id: str | None = None, + trace_id: str | None = None, +) -> None: + """Store a raw payload. No-op when disabled. + + Args: + source: Origin type - "sdk", "webhook", or "api_response" + provider: Provider name (e.g. "garmin", "apple", "strava") + payload: Raw data (dict, list, or pre-serialized string) + user_id: Optional user identifier for correlation + trace_id: Optional trace/batch ID for correlation with processed data + """ + if _storage_backend == "disabled": + return + + payload_str = payload if isinstance(payload, str) else json.dumps(payload, default=json_serial) + + # Skip payloads that exceed size limit + size = len(payload_str.encode("utf-8")) + if size > _max_size_bytes: + logger.warning( + "Raw payload skipped (size %d bytes exceeds limit %d)", + size, + _max_size_bytes, + ) + return + + if _storage_backend == "log": + _store_to_log(source, provider, payload_str, size, user_id, trace_id) + elif _storage_backend == "s3": + _store_to_s3(source, provider, payload_str, size, user_id, trace_id) + + +def _store_to_log( + source: str, + provider: str, + payload_str: str, + size: int, + user_id: str | None, + trace_id: str | None, +) -> None: + entry: dict[str, Any] = { + "level": "debug", + "message": "raw_payload", + "source": source, + "provider": provider, + "size_bytes": size, + } + if user_id: + entry["user_id"] = user_id + if trace_id: + entry["trace_id"] = trace_id + entry["payload"] = payload_str + + print(json.dumps(entry), file=sys.stdout, flush=True) + + +def _store_to_s3( + source: str, + provider: str, + payload_str: str, + size: int, + user_id: str | None, + trace_id: str | None, +) -> None: + """Upload raw payload to S3. + + Key format: {prefix}/{provider}/{source}/{YYYY-MM-DD}/{uuid}.json + Metadata includes user_id, trace_id, and size for easy filtering. + """ + if _s3_client is None or _s3_bucket is None: + logger.warning("S3 client or bucket not configured - skipping raw payload storage") + return + + now = datetime.now(UTC) + date_part = now.strftime("%Y-%m-%d") + file_id = uuid4().hex[:12] + user_part = user_id if user_id else "_unknown" + key = f"{_s3_prefix}/{provider}/{source}/{date_part}/{user_part}/{file_id}.json" + + metadata: dict[str, str] = { + "source": source, + "provider": provider, + "size_bytes": str(size), + "timestamp": now.isoformat(), + } + if user_id: + metadata["user_id"] = user_id + if trace_id: + metadata["trace_id"] = trace_id + + try: + _s3_client.put_object( + Bucket=_s3_bucket, + Key=key, + Body=payload_str.encode("utf-8"), + ContentType="application/json", + Metadata=metadata, + ) + logger.debug( + "Stored raw payload to S3: s3://%s/%s (%d bytes)", + _s3_bucket, + key, + size, + ) + except Exception: + logger.exception("Failed to store raw payload to S3: s3://%s/%s", _s3_bucket, key) diff --git a/backend/tests/services/test_raw_payload_storage.py b/backend/tests/services/test_raw_payload_storage.py new file mode 100644 index 00000000..6da25fd4 --- /dev/null +++ b/backend/tests/services/test_raw_payload_storage.py @@ -0,0 +1,142 @@ +"""Tests for raw payload storage backends.""" + +import json +from unittest.mock import MagicMock, patch + +import pytest + +from app.services import raw_payload_storage + + +@pytest.fixture(autouse=True) +def _reset_module_state() -> None: + """Reset module-level globals before each test.""" + raw_payload_storage._storage_backend = "disabled" + raw_payload_storage._max_size_bytes = 10 * 1024 * 1024 + raw_payload_storage._s3_bucket = None + raw_payload_storage._s3_prefix = "raw-payloads" + raw_payload_storage._s3_client = None + + +class TestConfigure: + def test_configure_disabled(self) -> None: + raw_payload_storage.configure("disabled", 1024) + assert raw_payload_storage._storage_backend == "disabled" + + def test_configure_log(self) -> None: + raw_payload_storage.configure("log", 2048) + assert raw_payload_storage._storage_backend == "log" + assert raw_payload_storage._max_size_bytes == 2048 + + def test_configure_s3_without_bucket_falls_back_to_disabled(self) -> None: + raw_payload_storage.configure("s3", 1024, s3_bucket=None) + assert raw_payload_storage._storage_backend == "disabled" + + def test_configure_s3_with_bucket(self) -> None: + mock_client = MagicMock() + with patch.object(raw_payload_storage, "_create_s3_client", return_value=mock_client): + raw_payload_storage.configure("s3", 1024, s3_bucket="my-bucket", s3_prefix="payloads") + + assert raw_payload_storage._storage_backend == "s3" + assert raw_payload_storage._s3_bucket == "my-bucket" + assert raw_payload_storage._s3_prefix == "payloads" + assert raw_payload_storage._s3_client is mock_client + + def test_configure_s3_client_creation_fails(self) -> None: + with patch.object(raw_payload_storage, "_create_s3_client", return_value=None): + raw_payload_storage.configure("s3", 1024, s3_bucket="my-bucket") + + assert raw_payload_storage._storage_backend == "disabled" + + +class TestStoreRawPayload: + def test_disabled_is_noop(self) -> None: + raw_payload_storage.configure("disabled", 1024) + # Should not raise + raw_payload_storage.store_raw_payload(source="webhook", provider="garmin", payload={"key": "val"}) + + def test_payload_exceeding_max_size_is_skipped(self, capsys: pytest.CaptureFixture[str]) -> None: + raw_payload_storage.configure("log", 10) # 10 bytes max + raw_payload_storage.store_raw_payload(source="webhook", provider="garmin", payload={"big": "payload"}) + captured = capsys.readouterr() + assert "raw_payload" not in captured.out + + def test_log_backend_outputs_json(self, capsys: pytest.CaptureFixture[str]) -> None: + raw_payload_storage.configure("log", 10 * 1024 * 1024) + raw_payload_storage.store_raw_payload( + source="webhook", + provider="garmin", + payload={"test": True}, + user_id="user-123", + trace_id="trace-abc", + ) + captured = capsys.readouterr() + entry = json.loads(captured.out.strip()) + assert entry["message"] == "raw_payload" + assert entry["source"] == "webhook" + assert entry["provider"] == "garmin" + assert entry["user_id"] == "user-123" + assert entry["trace_id"] == "trace-abc" + + def test_s3_backend_uploads_payload(self) -> None: + mock_client = MagicMock() + mock_client.put_object.return_value = {"ETag": "test-etag"} + + with patch.object(raw_payload_storage, "_create_s3_client", return_value=mock_client): + raw_payload_storage.configure("s3", 10 * 1024 * 1024, s3_bucket="test-bucket", s3_prefix="raw") + + raw_payload_storage.store_raw_payload( + source="webhook", + provider="garmin", + payload={"activity": "running"}, + user_id="user-456", + trace_id="trace-xyz", + ) + + mock_client.put_object.assert_called_once() + call_kwargs = mock_client.put_object.call_args[1] + assert call_kwargs["Bucket"] == "test-bucket" + assert "garmin/webhook/" in call_kwargs["Key"] + assert "/user-456/" in call_kwargs["Key"] + assert call_kwargs["Key"].endswith(".json") + assert call_kwargs["ContentType"] == "application/json" + assert call_kwargs["Metadata"]["provider"] == "garmin" + assert call_kwargs["Metadata"]["user_id"] == "user-456" + assert call_kwargs["Metadata"]["trace_id"] == "trace-xyz" + + body = call_kwargs["Body"].decode("utf-8") + assert json.loads(body) == {"activity": "running"} + + def test_s3_backend_handles_upload_error(self) -> None: + mock_client = MagicMock() + mock_client.put_object.side_effect = Exception("S3 error") + + with patch.object(raw_payload_storage, "_create_s3_client", return_value=mock_client): + raw_payload_storage.configure("s3", 10 * 1024 * 1024, s3_bucket="test-bucket") + + # Should not raise - errors are logged + raw_payload_storage.store_raw_payload(source="sdk", provider="apple", payload="raw-xml-data") + + def test_s3_backend_unknown_user_fallback(self) -> None: + mock_client = MagicMock() + mock_client.put_object.return_value = {"ETag": "test-etag"} + + with patch.object(raw_payload_storage, "_create_s3_client", return_value=mock_client): + raw_payload_storage.configure("s3", 10 * 1024 * 1024, s3_bucket="test-bucket") + + raw_payload_storage.store_raw_payload(source="webhook", provider="garmin", payload={"x": 1}) + + call_kwargs = mock_client.put_object.call_args[1] + assert "/_unknown/" in call_kwargs["Key"] + + def test_s3_backend_pre_serialized_string(self) -> None: + mock_client = MagicMock() + mock_client.put_object.return_value = {"ETag": "test-etag"} + + with patch.object(raw_payload_storage, "_create_s3_client", return_value=mock_client): + raw_payload_storage.configure("s3", 10 * 1024 * 1024, s3_bucket="test-bucket") + + raw_payload_storage.store_raw_payload(source="sdk", provider="apple", payload='{"pre":"serialized"}') + + call_kwargs = mock_client.put_object.call_args[1] + assert call_kwargs["Body"] == b'{"pre":"serialized"}'