Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/app/api/routes/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from .summaries import router as summaries_router
from .suunto_debug import router as suunto_debug_router
from .sync_data import router as sync_data_router
from .sync_events import router as sync_events_router
from .timeseries import router as timeseries_router
from .token import router as token_router
from .user_invitation_code import router as user_invitation_code_router
Expand All @@ -39,6 +40,7 @@
# New unified vendor workouts endpoint
v1_router.include_router(vendor_workouts_router, prefix="/providers", tags=["providers workouts"])
v1_router.include_router(sync_data_router, prefix="/providers", tags=["sync data"])
v1_router.include_router(sync_events_router, tags=["sync events"])
# Suunto debug endpoints for raw API access
v1_router.include_router(suunto_debug_router, prefix="/debug", tags=["debug"])
v1_router.include_router(users_router, tags=["users"])
Expand Down
105 changes: 105 additions & 0 deletions backend/app/api/routes/v1/sync_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""SSE (Server-Sent Events) endpoint for real-time sync progress.

Subscribes to a per-user Redis Pub/Sub channel and streams every sync
event to the browser over an open HTTP connection.
"""

import asyncio
import json
from collections.abc import AsyncGenerator
from logging import getLogger
from typing import Annotated
from uuid import UUID

import redis.asyncio as aioredis
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse

from app.config import settings
from app.integrations.sync_events import SYNC_CHANNEL_PREFIX
from app.utils.auth import verify_query_token

logger = getLogger(__name__)

router = APIRouter()


async def _event_generator(user_id: str) -> AsyncGenerator[str, None]:
"""Yield SSE-formatted messages from a Redis Pub/Sub channel.

The ``timeout=15.0`` parameter in ``pubsub.get_message()`` controls
the keep-alive interval, NOT message latency.

- If a sync event arrives (e.g., at t=0.1s), Redis delivers it **immediately**.
- If NO event arrives for 15 seconds, ``get_message()`` returns None.
- We then yield a ``: keepalive`` comment to prevent the browser/proxy
from closing the idle connection.

This ensures instant updates while maintaining a stable long-lived connection.
"""
channel = f"{SYNC_CHANNEL_PREFIX}:{user_id}"

redis_client = aioredis.from_url(settings.redis_url, decode_responses=True)
pubsub = redis_client.pubsub()

try:
await pubsub.subscribe(channel)

while True:
# Wait up to 15 s for a message, then send a keep-alive comment
# This does NOT delay messages - they are returned as soon as published.
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=15.0)

if message and message["type"] == "message":
raw = message["data"]

# Parse to extract event type for SSE "event:" field
try:
payload = json.loads(raw)
event_type = payload.get("type", "message")
except (json.JSONDecodeError, TypeError):
event_type = "message"
raw = json.dumps({"type": "message", "data": raw})

yield f"event: {event_type}\ndata: {raw}\n\n"

# If this was a terminal event, close the stream
if event_type in ("sync:completed", "sync:error"):
return
else:
# SSE keep-alive comment (ignored by EventSource)
yield ": keepalive\n\n"

except asyncio.CancelledError:
pass
finally:
await pubsub.unsubscribe(channel)
await pubsub.close()
await redis_client.aclose()


@router.get("/users/{user_id}/sync/events")
async def sync_events_stream(
user_id: UUID,
_developer_id: Annotated[str, Depends(verify_query_token)],
) -> StreamingResponse:
"""Stream real-time sync progress events via Server-Sent Events (SSE).

**Authentication:**
Pass your JWT token as a ``token`` query parameter
(``EventSource`` does not support custom headers).

**Event types emitted:**
(See ``SyncEventType`` in frontend types for full list)

The stream closes automatically after a terminal event.
"""
return StreamingResponse(
_event_generator(str(user_id)),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
107 changes: 105 additions & 2 deletions backend/app/integrations/celery/tasks/sync_vendor_data_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from uuid import UUID

from app.database import SessionLocal
from app.integrations.sync_events import publish_sync_event
from app.repositories.user_connection_repository import UserConnectionRepository
from app.schemas import ProviderSyncResult, SyncVendorDataResult
from app.services.providers.factory import ProviderFactory
Expand Down Expand Up @@ -56,6 +57,8 @@ def sync_vendor_data(
errors={"user_id": f"Invalid UUID format: {str(e)}"},
).model_dump()

task_id = sync_vendor_data.request.id or "unknown"

result = SyncVendorDataResult(
user_id=user_uuid,
start_date=start_date,
Expand All @@ -78,6 +81,12 @@ def sync_vendor_data(
task="sync_vendor_data",
)
result.message = "No active provider connections found"
publish_sync_event(
user_id,
"sync:completed",
task_id=task_id,
data={"message": "No active provider connections found", "providers_synced": []},
)
return result.model_dump()

log_structured(
Expand All @@ -88,7 +97,15 @@ def sync_vendor_data(
task="sync_vendor_data",
)

for connection in connections:
provider_names = [c.provider for c in connections]
publish_sync_event(
user_id,
"sync:started",
task_id=task_id,
data={"providers": provider_names, "total_providers": len(connections)},
)

for idx, connection in enumerate(connections):
provider_name = connection.provider
log_structured(
logger,
Expand All @@ -98,16 +115,37 @@ def sync_vendor_data(
task="sync_vendor_data",
)

publish_sync_event(
user_id,
"sync:provider:started",
task_id=task_id,
provider=provider_name,
data={"index": idx, "total": len(connections)},
)

try:
strategy = factory.get_provider(provider_name)
provider_result = ProviderSyncResult(success=True, params={})

# Sync workouts
if strategy.workouts:
publish_sync_event(
user_id,
"sync:provider:workouts:started",
task_id=task_id,
provider=provider_name,
)
params = _build_sync_params(provider_name, start_date, end_date)
try:
success = strategy.workouts.load_data(db, user_uuid, **params)
provider_result.params["workouts"] = {"success": success, **params}
publish_sync_event(
user_id,
"sync:provider:workouts:completed",
task_id=task_id,
provider=provider_name,
data={"success": success},
)
except Exception as e:
log_structured(
logger,
Expand All @@ -123,6 +161,13 @@ def sync_vendor_data(
extra={"user_id": user_id, "provider": provider_name, "task": "sync_vendor_data"},
)
provider_result.params["workouts"] = {"success": False, "error": str(e)}
publish_sync_event(
user_id,
"sync:provider:workouts:error",
task_id=task_id,
provider=provider_name,
data={"error": "An error occurred syncing workouts"},
)

# Sync 247 data (sleep, recovery, activity) and SAVE to database
if hasattr(strategy, "data_247") and strategy.data_247:
Expand All @@ -140,6 +185,12 @@ def sync_vendor_data(
with suppress(ValueError):
end_dt = datetime.fromisoformat(end_date.replace("Z", "+00:00"))

publish_sync_event(
user_id,
"sync:provider:247:started",
task_id=task_id,
provider=provider_name,
)
try:
# Use load_and_save_all if available (saves data to DB)
# Otherwise fallback to load_all_247_data (just returns data)
Expand Down Expand Up @@ -168,6 +219,13 @@ def sync_vendor_data(
provider="sync_vendor_data",
task="sync_vendor_data",
)
publish_sync_event(
user_id,
"sync:provider:247:completed",
task_id=task_id,
provider=provider_name,
data={"success": True},
)
except Exception as e:
log_structured(
logger,
Expand All @@ -183,19 +241,49 @@ def sync_vendor_data(
extra={"user_id": user_id, "provider": provider_name, "task": "sync_vendor_data"},
)
provider_result.params["data_247"] = {"success": False, "error": str(e)}
publish_sync_event(
user_id,
"sync:provider:247:error",
task_id=task_id,
provider=provider_name,
data={"error": "An error occurred syncing 24/7 data"},
)

user_connection_repo.update_last_synced_at(db, connection)

result.providers_synced[provider_name] = provider_result

# Compute success based on individual component results
sync_success = True
for component_result in provider_result.params.values():
# Each component result is a dict with a "success" key
if isinstance(component_result, dict) and not component_result.get("success", False):
sync_success = False
break

log_structured(
logger,
"info",
f"Successfully synced {provider_name} for user {user_id}",
f"Successfully synced {provider_name} for user {user_id} (success={sync_success})",
provider="sync_vendor_data",
task="sync_vendor_data",
)
publish_sync_event(
user_id,
"sync:provider:completed",
task_id=task_id,
provider=provider_name,
data={"success": sync_success, "index": idx, "total": len(connections)},
)

except Exception as e:
publish_sync_event(
user_id,
"sync:provider:error",
task_id=task_id,
provider=provider_name,
data={"error": "An error occurred syncing provider data"},
)
log_and_capture_error(
e,
logger,
Expand All @@ -205,6 +293,15 @@ def sync_vendor_data(
result.errors[provider_name] = str(e)
continue

publish_sync_event(
user_id,
"sync:completed",
task_id=task_id,
data={
"providers_synced": list(result.providers_synced.keys()),
"errors": result.errors if result.errors else None,
},
)
return result.model_dump()

except Exception as e:
Expand All @@ -215,6 +312,12 @@ def sync_vendor_data(
extra={"user_id": user_id, "task": "sync_vendor_data"},
)
result.errors["general"] = str(e)
publish_sync_event(
user_id,
"sync:error",
task_id=task_id,
data={"error": str(e)},
)
return result.model_dump()


Expand Down
58 changes: 58 additions & 0 deletions backend/app/integrations/sync_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""Redis Pub/Sub sync event publisher for real-time SSE updates.

Celery tasks call publish_sync_event() to notify connected SSE clients
of sync progress. Events are published to a per-user Redis channel.
"""

import json
from datetime import datetime, timezone
from logging import getLogger
from typing import Any

from app.integrations.redis_client import get_redis_client

logger = getLogger(__name__)

SYNC_CHANNEL_PREFIX = "sync:events"


def _channel_for_user(user_id: str) -> str:
"""Return the Redis Pub/Sub channel name for a given user."""
return f"{SYNC_CHANNEL_PREFIX}:{user_id}"


def publish_sync_event(
user_id: str,
event_type: str,
*,
task_id: str | None = None,
provider: str | None = None,
data: dict[str, Any] | None = None,
) -> None:
"""Publish a sync progress event to the user's Redis Pub/Sub channel.

Args:
user_id: UUID of the user (as string).
event_type: Event type identifier (e.g. "sync:started").
task_id: Celery task ID, if applicable.
provider: Provider name being synced, if applicable.
data: Additional event payload.
"""
channel = _channel_for_user(user_id)
message: dict[str, Any] = {
"type": event_type,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
if task_id:
message["task_id"] = task_id
if provider:
message["provider"] = provider
if data:
message["data"] = data

try:
redis_client = get_redis_client()
redis_client.publish(channel, json.dumps(message))
except Exception:
# Publishing is best-effort — never break the sync task
logger.debug("Failed to publish sync event %s for user %s", event_type, user_id, exc_info=True)
Loading