feat: sync status visible in user page w/ fastapi sse (2nd attempt)#543
feat: sync status visible in user page w/ fastapi sse (2nd attempt)#543
Conversation
|
🚅 Deployed to the open-wearables-pr-543 environment in Open Wearables
|
📝 WalkthroughWalkthroughAdds real-time synchronization progress via Server-Sent Events: backend publishes per-user sync events to Redis, exposes an SSE endpoint, and emits events from the Celery sync task; frontend consumes SSE with a new hook and renders live progress in UI components. Changes
Sequence DiagramsequenceDiagram
participant User as Client/User
participant Frontend
participant SSE as Frontend SSE (EventSource)
participant Backend as API (SSE endpoint)
participant Celery as Worker/Sync Task
participant Redis
User->>Frontend: Click "Sync Now"
Frontend->>Frontend: useSyncEvents.startListening()
Frontend->>SSE: open EventSource /api/v1/users/{id}/sync/events?token=...
SSE->>Backend: GET /users/{id}/sync/events
Backend->>Redis: SUBSCRIBE sync:events:{id}
Frontend->>Backend: Trigger sync (POST /sync or via UI)
Backend->>Celery: enqueue sync task
Celery->>Redis: PUBLISH sync:started (task_id, providers...)
Redis->>Backend: deliver message to SUBSCRIBER
Backend->>SSE: stream SSE message
SSE->>Frontend: Event received
Frontend->>Frontend: update SyncProgress state
Celery->>Redis: PUBLISH provider-level events (started, workouts:completed, 247:completed, errors...)
Redis->>Backend: deliver messages
Backend->>SSE: stream SSE messages
SSE->>Frontend: Events received -> UI updates
Celery->>Redis: PUBLISH sync:completed / sync:error
Redis->>Backend: deliver final message
Backend->>SSE: stream terminal event
SSE->>Frontend: receives terminal -> closes EventSource
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (1)
backend/app/api/routes/v1/sync_events.py (1)
81-81: Add explicitstatus_codeto the route decorator for consistency.Set
status_code=status.HTTP_200_OKon the SSE endpoint decorator.As per coding guidelines "API routes must include
response_model(Pydantic schema) andstatus_code(from fastapi.status) decorators."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/api/routes/v1/sync_events.py` at line 81, The SSE route decorator on router.get("/users/{user_id}/sync/events") is missing an explicit status_code; update the decorator to include status_code=status.HTTP_200_OK (i.e., router.get("/users/{user_id}/sync/events", status_code=status.HTTP_200_OK")) and ensure status is imported from fastapi (add from fastapi import status if not already present) so the endpoint follows the project's requirement to declare status codes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/api/routes/v1/sync_events.py`:
- Around line 81-85: The endpoint sync_events_stream authenticates the caller
via verify_query_token but does not authorize access to the requested user_id;
update sync_events_stream to extract the authenticated caller identity from the
token (replace or augment the _developer_id dependency so it returns the
caller's user/developer id or permissions), compare that identity/permissions
against the requested user_id (and/or check a "can_subscribe(user_id)"
permission), and if they don't match or lack permission raise an
HTTPException(403) to prevent unauthorized subscriptions to another user's
stream.
In `@backend/app/integrations/celery/tasks/sync_vendor_data_task.py`:
- Around line 164-170: The SSE payloads currently include raw exception text via
str(e) when calling publish_sync_event in sync_vendor_data_task; replace the raw
exception string with a sanitized user-facing message or an error code (e.g.,
"An error occurred syncing provider data" or {"code":"provider_sync_error"}) and
ensure the full exception is sent to internal logs/Sentry instead (use the
existing logger or capture_exception). Update every publish_sync_event call in
this module (references: publish_sync_event and the sync_vendor_data_task
handler where str(e) is used) so client-facing events never contain
internal/provider details, while preserving full error details in
logger.exception or sentry.capture_exception nearby.
- Around line 262-268: The completion event currently hardcodes "success": True
in the publish_sync_event call; change it to reflect whether any errors occurred
earlier by computing a success boolean (e.g., success = (error_count == 0) or
success = not errors_emitted or success = len(errors) == 0 depending on which
variable exists in this scope) and pass that variable into publish_sync_event
for the "sync:provider:completed" event (keep task_id, provider and index/total
fields unchanged). Locate the call to publish_sync_event in
sync_vendor_data_task.py and replace the literal True with the computed success
flag so frontend aggregation is consistent.
In `@frontend/src/components/user/sync-progress-overlay.tsx`:
- Around line 57-64: The progress bar color logic lets terminal states override
error styling; adjust the conditional so error state always wins: in the JSX
where className is built with cn (the ternary using isError and isTerminal),
ensure you check isError first and only apply 'bg-green-500' when isTerminal is
true AND isError is false (or rewrite as an explicit if/else: if (isError) use
'bg-destructive' else if (isTerminal) use 'bg-green-500' else 'bg-primary'),
referencing the same className/cn expression and the isError/isTerminal
booleans.
In `@frontend/src/hooks/api/use-sync-events.ts`:
- Around line 1-196: Prettier formatting failed for this file; run Prettier (or
your repo's format script) on the file containing useSyncEvents
(frontend/src/hooks/api/use-sync-events.ts) to reformat INITIAL_PROGRESS,
imports, and the startListening/stopListening hook implementations, then stage
the updated file and push; ensure the formatted output matches the project's
Prettier settings so CI passes.
- Line 59: The EventSource error handler in useSyncEvents currently only clears
the UI-active flag when readyState === CLOSED, which lets reconnecting errors
leave the UI stuck; update the 'error' handler (the EventSource error callback
in use-sync-events.ts) to explicitly reset progress to inactive in non-open
states by calling setProgress({ ...INITIAL_PROGRESS, active: false }) when
eventSource.readyState !== EventSource.OPEN (or !== 0/1 as appropriate), while
preserving existing logic that closes the connection on terminal events
('sync:completed'/'sync:error'); this ensures the UI is re-enabled during
reconnect/error loops without changing terminal-close behavior.
- Around line 133-148: The cache invalidations are inside the setProgress state
updater (in use-sync-events.ts) which makes the updater impure; extract the
boolean/sync-complete flag (compute whether
connections/workouts/activitySummaries/sleepSessions/bodySummary need
invalidation) before calling setProgress, call setProgress with a pure updater,
then after setProgress returns perform the five queryClient.invalidateQueries
calls using queryClient and the existing queryKeys:
queryKeys.connections.all(userId), queryKeys.health.workouts(userId),
queryKeys.health.activitySummaries(userId),
queryKeys.health.sleepSessions(userId), and queryKeys.health.bodySummary(userId)
so invalidations happen once and outside the updater.
---
Nitpick comments:
In `@backend/app/api/routes/v1/sync_events.py`:
- Line 81: The SSE route decorator on router.get("/users/{user_id}/sync/events")
is missing an explicit status_code; update the decorator to include
status_code=status.HTTP_200_OK (i.e., router.get("/users/{user_id}/sync/events",
status_code=status.HTTP_200_OK")) and ensure status is imported from fastapi
(add from fastapi import status if not already present) so the endpoint follows
the project's requirement to declare status codes.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
backend/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (15)
backend/app/api/routes/v1/__init__.pybackend/app/api/routes/v1/sync_events.pybackend/app/integrations/celery/tasks/sync_vendor_data_task.pybackend/app/integrations/sync_events.pybackend/app/utils/auth.pybackend/pyproject.tomlbackend/tests/api/v1/test_auth.pybackend/tests/conftest.pyfrontend/src/components/user/connection-card.tsxfrontend/src/components/user/profile-section.tsxfrontend/src/components/user/sync-progress-overlay.tsxfrontend/src/hooks/api/use-sync-events.tsfrontend/src/lib/api/config.tsfrontend/src/lib/api/types.tsfrontend/src/lib/utils/sync-messages.ts
There was a problem hiding this comment.
♻️ Duplicate comments (1)
backend/app/integrations/celery/tasks/sync_vendor_data_task.py (1)
315-320:⚠️ Potential issue | 🟠 MajorRaw exception text still exposed in
sync:errorevent.Line 319 sends
str(e)directly in the SSE payload, which can leak internal/provider details to clients. This is the same issue flagged in a previous review that was addressed in other error events (lines 169, 249, 285) but missed here.🔒 Proposed fix to sanitize error message
publish_sync_event( user_id, "sync:error", task_id=task_id, - data={"error": str(e)}, + data={"error": "An unexpected error occurred during sync"}, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/integrations/celery/tasks/sync_vendor_data_task.py` around lines 315 - 320, The sync:error SSE currently exposes raw exception text by passing str(e) into publish_sync_event; change this to send a sanitized, non-sensitive message (e.g., "An error occurred while syncing vendor data") and a minimal error code/flag if needed, while ensuring the full exception is recorded internally via the existing logger.exception or error reporting (so you still capture details). Update the publish_sync_event call in sync_vendor_data_task.py to replace str(e) with the sanitized message/metadata and add a logger.exception(e) or send to the centralized error reporter near the exception handler so diagnostics are retained but not leaked to clients.
🧹 Nitpick comments (5)
frontend/src/hooks/api/use-sync-events.ts (4)
128-144: Usevoidto explicitly mark fire-and-forget promises.The
invalidateQueriescalls return Promises that are currently ignored. Usingvoidmakes the intent explicit and satisfies linters that flag floating promises.♻️ Suggested fix
if (event.type === 'sync:completed') { - queryClient.invalidateQueries({ + void queryClient.invalidateQueries({ queryKey: queryKeys.connections.all(userId), }); - queryClient.invalidateQueries({ + void queryClient.invalidateQueries({ queryKey: queryKeys.health.workouts(userId), }); - queryClient.invalidateQueries({ + void queryClient.invalidateQueries({ queryKey: queryKeys.health.activitySummaries(userId), }); - queryClient.invalidateQueries({ + void queryClient.invalidateQueries({ queryKey: queryKeys.health.sleepSessions(userId), }); - queryClient.invalidateQueries({ + void queryClient.invalidateQueries({ queryKey: queryKeys.health.bodySummary(userId), }); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/hooks/api/use-sync-events.ts` around lines 128 - 144, In the sync:completed handler, the queryClient.invalidateQueries calls are returning promises that are ignored; update the block handling event.type === 'sync:completed' so each invalidate call is explicitly fire-and-forget by prefixing them with void (e.g. void queryClient.invalidateQueries({ queryKey: queryKeys.connections.all(userId) }) and similarly for queryKeys.health.workouts(userId), queryKeys.health.activitySummaries(userId), queryKeys.health.sleepSessions(userId), and queryKeys.health.bodySummary(userId)), ensuring linters won’t flag floating promises while preserving current behavior.
151-153: Consider logging malformed messages in development for easier debugging.The empty catch block silently discards all parsing errors. While ignoring malformed SSE messages is reasonable in production, logging them during development can help diagnose backend issues or protocol mismatches.
♻️ Suggested fix
- } catch { - // Ignore malformed messages + } catch (err) { + if (import.meta.env.DEV) { + console.warn('[useSyncEvents] Failed to parse SSE message:', err); + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/hooks/api/use-sync-events.ts` around lines 151 - 153, The empty catch in use-sync-events.ts is swallowing JSON parse errors; update the catch in the SSE message handler (the try/catch around parsing inside the useSyncEvents or onmessage callback) to log the malformed message and the error when in development only (e.g., check process.env.NODE_ENV !== 'production' or a isDev flag) while preserving the current behavior in production; include the raw message payload and the error stack in the log to aid debugging.
47-52: Silent return on missing token provides no feedback.When
getToken()returnsnull, the function exits silently without starting the connection or notifying the caller. Consider either logging a warning in development or returning a status so calling components can react appropriately (e.g., prompt re-authentication).♻️ Suggested fix
const token = getToken(); - if (!token) return; + if (!token) { + if (import.meta.env.DEV) { + console.warn('[useSyncEvents] No auth token available'); + } + return; + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/hooks/api/use-sync-events.ts` around lines 47 - 52, startListening currently exits silently when getToken() is null; change it to provide feedback by returning a status and emitting a warning: have startListening return a boolean (or Promise<boolean> if async) and when !token call stopListening(), log a dev-only warning (e.g., if process.env.NODE_ENV !== 'production' console.warn(...)) or invoke an optional onAuthFailure/onError callback, then return false; on success continue opening the connection and return true. Update callers to handle the boolean result. Reference: startListening, getToken, stopListening (and any consumer hooks that call startListening).
175-186: Error handler resets to empty message—consider providing user feedback.When the connection encounters an error in a non-OPEN state, progress resets to
INITIAL_PROGRESSwhich has an emptymessage. A brief user-facing message like "Connection lost" would improve UX by informing users why the sync status disappeared.♻️ Suggested fix
if (es.readyState !== EventSource.OPEN) { - setProgress({ ...INITIAL_PROGRESS, active: false }); + setProgress({ + ...INITIAL_PROGRESS, + active: false, + message: 'Connection lost. Please try again.', + }); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/hooks/api/use-sync-events.ts` around lines 175 - 186, The onerror handler resets progress to INITIAL_PROGRESS which clears the user message; update es.onerror so when es.readyState !== EventSource.OPEN you call setProgress with INITIAL_PROGRESS merged but include a user-facing message (e.g., message: "Connection lost") and active: false instead of leaving message empty; reference es.onerror, setProgress, INITIAL_PROGRESS and eventSourceRef to locate the handler and ensure you still null out eventSourceRef.current when es.readyState === EventSource.CLOSED.backend/app/integrations/celery/tasks/sync_vendor_data_task.py (1)
296-304: Consider sanitizing error details insync:completedevent.Line 302 sends
result.errorswhich contains raw exception strings (populated at line 293). While individual error events use sanitized messages, this aggregated errors dict still exposes internal details to the client.Consider either:
- Omitting the detailed errors from the SSE payload (they're already logged server-side)
- Or mapping them to sanitized provider-level error messages
♻️ Proposed fix to sanitize errors
publish_sync_event( user_id, "sync:completed", task_id=task_id, data={ "providers_synced": list(result.providers_synced.keys()), - "errors": result.errors if result.errors else None, + "errors": list(result.errors.keys()) if result.errors else None, }, )This sends only the provider names that failed, without the raw exception details.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/integrations/celery/tasks/sync_vendor_data_task.py` around lines 296 - 304, The sync:completed SSE currently includes raw exception strings from result.errors; update the publish_sync_event call (in the sync_vendor_data_task where publish_sync_event is invoked for "sync:completed") to avoid exposing internal error details by either removing the result.errors field entirely or replacing it with a sanitized mapping (e.g., map result.errors keys to simple provider-level statuses like {"provider_name": "failed"} or {"provider_name": "error_occurred"}) while keeping providers_synced as-is; ensure you reference result.errors and providers_synced in your change and keep detailed exceptions only in server logs (where they are already recorded).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@backend/app/integrations/celery/tasks/sync_vendor_data_task.py`:
- Around line 315-320: The sync:error SSE currently exposes raw exception text
by passing str(e) into publish_sync_event; change this to send a sanitized,
non-sensitive message (e.g., "An error occurred while syncing vendor data") and
a minimal error code/flag if needed, while ensuring the full exception is
recorded internally via the existing logger.exception or error reporting (so you
still capture details). Update the publish_sync_event call in
sync_vendor_data_task.py to replace str(e) with the sanitized message/metadata
and add a logger.exception(e) or send to the centralized error reporter near the
exception handler so diagnostics are retained but not leaked to clients.
---
Nitpick comments:
In `@backend/app/integrations/celery/tasks/sync_vendor_data_task.py`:
- Around line 296-304: The sync:completed SSE currently includes raw exception
strings from result.errors; update the publish_sync_event call (in the
sync_vendor_data_task where publish_sync_event is invoked for "sync:completed")
to avoid exposing internal error details by either removing the result.errors
field entirely or replacing it with a sanitized mapping (e.g., map result.errors
keys to simple provider-level statuses like {"provider_name": "failed"} or
{"provider_name": "error_occurred"}) while keeping providers_synced as-is;
ensure you reference result.errors and providers_synced in your change and keep
detailed exceptions only in server logs (where they are already recorded).
In `@frontend/src/hooks/api/use-sync-events.ts`:
- Around line 128-144: In the sync:completed handler, the
queryClient.invalidateQueries calls are returning promises that are ignored;
update the block handling event.type === 'sync:completed' so each invalidate
call is explicitly fire-and-forget by prefixing them with void (e.g. void
queryClient.invalidateQueries({ queryKey: queryKeys.connections.all(userId) })
and similarly for queryKeys.health.workouts(userId),
queryKeys.health.activitySummaries(userId),
queryKeys.health.sleepSessions(userId), and
queryKeys.health.bodySummary(userId)), ensuring linters won’t flag floating
promises while preserving current behavior.
- Around line 151-153: The empty catch in use-sync-events.ts is swallowing JSON
parse errors; update the catch in the SSE message handler (the try/catch around
parsing inside the useSyncEvents or onmessage callback) to log the malformed
message and the error when in development only (e.g., check process.env.NODE_ENV
!== 'production' or a isDev flag) while preserving the current behavior in
production; include the raw message payload and the error stack in the log to
aid debugging.
- Around line 47-52: startListening currently exits silently when getToken() is
null; change it to provide feedback by returning a status and emitting a
warning: have startListening return a boolean (or Promise<boolean> if async) and
when !token call stopListening(), log a dev-only warning (e.g., if
process.env.NODE_ENV !== 'production' console.warn(...)) or invoke an optional
onAuthFailure/onError callback, then return false; on success continue opening
the connection and return true. Update callers to handle the boolean result.
Reference: startListening, getToken, stopListening (and any consumer hooks that
call startListening).
- Around line 175-186: The onerror handler resets progress to INITIAL_PROGRESS
which clears the user message; update es.onerror so when es.readyState !==
EventSource.OPEN you call setProgress with INITIAL_PROGRESS merged but include a
user-facing message (e.g., message: "Connection lost") and active: false instead
of leaving message empty; reference es.onerror, setProgress, INITIAL_PROGRESS
and eventSourceRef to locate the handler and ensure you still null out
eventSourceRef.current when es.readyState === EventSource.CLOSED.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
backend/app/integrations/celery/tasks/sync_vendor_data_task.pyfrontend/src/hooks/api/use-sync-events.ts
Closes #574
FastAPI now supprots SSE events streaming, so I have added this to visualise to the user sync status live.
Screen.Recording.2026-03-02.at.15.15.24.mov
Summary by CodeRabbit
New Features
Bug Fixes
Chores