From d52b1847172cf034de0dd108256f82ba9748b313 Mon Sep 17 00:00:00 2001 From: Stu Mason Date: Tue, 13 Jan 2026 10:48:00 +0000 Subject: [PATCH 1/6] feat: v1.1.0 - Automatic Background Sync with Full Audit Trail ## Added **Automatic Background Sync** - Smart sync scheduler with APScheduler for automatic background syncing - Rate-limit-aware orchestration respecting Polar API limits (15-min and 24-hour windows) - Priority queue system for efficient multi-user sync: - CRITICAL: Users who haven't synced in 48h+ or have expiring tokens - HIGH: Active users, hasn't synced in 12h+ - NORMAL: Regular users, hasn't synced in 24h+ - LOW: Dormant users, hasn't synced in 7d+ - Comprehensive `SyncLog` model for complete audit trail of every sync operation - Consistent error classification with `SyncErrorType` enum covering: - Authentication errors (TOKEN_EXPIRED, TOKEN_INVALID, TOKEN_REVOKED) - Rate limiting (RATE_LIMITED_15M, RATE_LIMITED_24H) - API errors (API_UNAVAILABLE, API_TIMEOUT, API_ERROR) - Data errors (INVALID_RESPONSE, TRANSFORM_ERROR) - Internal errors (DATABASE_ERROR, INTERNAL_ERROR) - Post-sync analytics: Automatic baseline recalculation and pattern detection **Configuration** - `SYNC_ENABLED`: Enable/disable automatic syncing (default: true) - `SYNC_INTERVAL_MINUTES`: Sync cycle interval (default: 60) - `SYNC_ON_STARTUP`: Run sync immediately on startup (default: true) - `SYNC_MAX_USERS_PER_RUN`: Maximum users per sync cycle (default: rate-limit aware) - `SYNC_STAGGER_SECONDS`: Delay between user syncs (default: 5) **Database** - New `sync_logs` table with comprehensive fields for audit and debugging - Composite indexes for efficient querying by user, status, and error type Stu Mason + AI --- CHANGELOG.md | 34 ++ .../f7g8h9i0j1k2_add_sync_logs_table.py | 198 +++++++ src/polar_flow_server/admin/routes.py | 2 +- src/polar_flow_server/app.py | 22 +- src/polar_flow_server/core/config.py | 22 +- src/polar_flow_server/models/__init__.py | 12 + src/polar_flow_server/models/sync_log.py | 350 +++++++++++ src/polar_flow_server/services/scheduler.py | 263 +++++++++ .../services/sync_error_handler.py | 476 +++++++++++++++ .../services/sync_orchestrator.py | 556 ++++++++++++++++++ .../templates/admin/dashboard.html | 2 +- 11 files changed, 1931 insertions(+), 6 deletions(-) create mode 100644 alembic/versions/f7g8h9i0j1k2_add_sync_logs_table.py create mode 100644 src/polar_flow_server/models/sync_log.py create mode 100644 src/polar_flow_server/services/scheduler.py create mode 100644 src/polar_flow_server/services/sync_error_handler.py create mode 100644 src/polar_flow_server/services/sync_orchestrator.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 4398f53..cd67fa7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,40 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +**Automatic Background Sync** +- Smart sync scheduler with APScheduler for automatic background syncing +- Rate-limit-aware orchestration respecting Polar API limits (15-min and 24-hour windows) +- Priority queue system for efficient multi-user sync: + - CRITICAL: Users who haven't synced in 48h+ or have expiring tokens + - HIGH: Active users, hasn't synced in 12h+ + - NORMAL: Regular users, hasn't synced in 24h+ + - LOW: Dormant users, hasn't synced in 7d+ +- Comprehensive `SyncLog` model for complete audit trail of every sync operation +- Consistent error classification with `SyncErrorType` enum covering: + - Authentication errors (TOKEN_EXPIRED, TOKEN_INVALID, TOKEN_REVOKED) + - Rate limiting (RATE_LIMITED_15M, RATE_LIMITED_24H) + - API errors (API_UNAVAILABLE, API_TIMEOUT, API_ERROR) + - Data errors (INVALID_RESPONSE, TRANSFORM_ERROR) + - Internal errors (DATABASE_ERROR, INTERNAL_ERROR) +- Post-sync analytics: Automatic baseline recalculation and pattern detection + +**Configuration** +- `SYNC_ENABLED`: Enable/disable automatic syncing (default: true) +- `SYNC_INTERVAL_MINUTES`: Sync cycle interval (default: 60) +- `SYNC_ON_STARTUP`: Run sync immediately on startup (default: true) +- `SYNC_MAX_USERS_PER_RUN`: Maximum users per sync cycle (default: rate-limit aware) +- `SYNC_STAGGER_SECONDS`: Delay between user syncs (default: 5) + +**Database** +- New `sync_logs` table with comprehensive fields for audit and debugging +- Composite indexes for efficient querying by user, status, and error type + +--- + ## [1.0.0] - 2025-01-13 First stable release of polar-flow-server - a self-hosted health analytics server for Polar devices. diff --git a/alembic/versions/f7g8h9i0j1k2_add_sync_logs_table.py b/alembic/versions/f7g8h9i0j1k2_add_sync_logs_table.py new file mode 100644 index 0000000..c32fe28 --- /dev/null +++ b/alembic/versions/f7g8h9i0j1k2_add_sync_logs_table.py @@ -0,0 +1,198 @@ +"""Add sync_logs table for comprehensive sync audit trail. + +Revision ID: f7g8h9i0j1k2 +Revises: e6f7g8h9i0j1 +Create Date: 2026-01-13 15:00:00.000000 + +""" + +from collections.abc import Sequence + +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSON + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "f7g8h9i0j1k2" +down_revision: str | None = "e6f7g8h9i0j1" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Create sync_logs table for complete sync audit trail.""" + op.create_table( + "sync_logs", + # Primary key + sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True), + # User identification + sa.Column( + "user_id", + sa.String(255), + nullable=False, + index=True, + comment="User being synced (Polar user ID or Laravel UUID)", + ), + sa.Column( + "job_id", + sa.String(36), + nullable=False, + index=True, + comment="UUID for correlating logs across services", + ), + # Timing + sa.Column( + "started_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.func.now(), + index=True, + comment="When sync began", + ), + sa.Column( + "completed_at", + sa.DateTime(timezone=True), + nullable=True, + comment="When sync finished (null if still running)", + ), + sa.Column( + "duration_ms", + sa.Integer(), + nullable=True, + comment="Total sync duration in milliseconds", + ), + # Status + sa.Column( + "status", + sa.String(20), + nullable=False, + server_default="started", + index=True, + comment="Status: started, success, partial, failed, skipped", + ), + # Error tracking + sa.Column( + "error_type", + sa.String(50), + nullable=True, + index=True, + comment="Categorized error type (token_expired, rate_limited_15m, etc.)", + ), + sa.Column( + "error_message", + sa.Text(), + nullable=True, + comment="Human-readable error description", + ), + sa.Column( + "error_details", + JSON(), + nullable=True, + comment="Full error context as JSON", + ), + # Results + sa.Column( + "records_synced", + JSON(), + nullable=True, + comment="Count of records synced per data type", + ), + sa.Column( + "api_calls_made", + sa.Integer(), + nullable=False, + server_default="0", + comment="Total API calls made during sync", + ), + # Rate limit tracking + sa.Column( + "rate_limit_remaining_15m", + sa.Integer(), + nullable=True, + comment="Remaining 15-min quota after sync", + ), + sa.Column( + "rate_limit_remaining_24h", + sa.Integer(), + nullable=True, + comment="Remaining 24-hour quota after sync", + ), + sa.Column( + "rate_limit_limit_15m", + sa.Integer(), + nullable=True, + comment="Max requests in 15-min window", + ), + sa.Column( + "rate_limit_limit_24h", + sa.Integer(), + nullable=True, + comment="Max requests in 24-hour window", + ), + # Analytics follow-up + sa.Column( + "baselines_recalculated", + sa.Boolean(), + nullable=False, + server_default="false", + comment="Whether baselines were updated after sync", + ), + sa.Column( + "patterns_detected", + sa.Boolean(), + nullable=False, + server_default="false", + comment="Whether pattern detection ran after sync", + ), + sa.Column( + "insights_generated", + sa.Boolean(), + nullable=False, + server_default="false", + comment="Whether insights were regenerated after sync", + ), + # Context + sa.Column( + "trigger", + sa.String(20), + nullable=False, + server_default="manual", + comment="What triggered sync: scheduler, manual, webhook, startup", + ), + sa.Column( + "priority", + sa.String(20), + nullable=True, + comment="Sync priority: critical, high, normal, low", + ), + comment="Complete audit trail of every sync attempt", + ) + + # Composite indexes for common queries + op.create_index( + "ix_sync_logs_user_started", + "sync_logs", + ["user_id", "started_at"], + comment="For user-specific sync history queries", + ) + op.create_index( + "ix_sync_logs_status_started", + "sync_logs", + ["status", "started_at"], + comment="For filtering syncs by status over time", + ) + op.create_index( + "ix_sync_logs_error_type_started", + "sync_logs", + ["error_type", "started_at"], + comment="For analyzing error patterns over time", + ) + + +def downgrade() -> None: + """Drop sync_logs table and indexes.""" + op.drop_index("ix_sync_logs_error_type_started", table_name="sync_logs") + op.drop_index("ix_sync_logs_status_started", table_name="sync_logs") + op.drop_index("ix_sync_logs_user_started", table_name="sync_logs") + op.drop_table("sync_logs") diff --git a/src/polar_flow_server/admin/routes.py b/src/polar_flow_server/admin/routes.py index 460c6ff..f3352f8 100644 --- a/src/polar_flow_server/admin/routes.py +++ b/src/polar_flow_server/admin/routes.py @@ -537,7 +537,7 @@ async def admin_dashboard( "latest_cardio": latest_cardio, "latest_hr": latest_hr, "latest_alertness": latest_alertness, - "sync_interval_hours": settings.sync_interval_hours, + "sync_interval_minutes": settings.sync_interval_minutes, "recovery_status": recovery_status, "api_keys": api_keys, "csrf_token": _get_csrf_token(request), diff --git a/src/polar_flow_server/app.py b/src/polar_flow_server/app.py index 4aff355..97b3f70 100644 --- a/src/polar_flow_server/app.py +++ b/src/polar_flow_server/app.py @@ -19,9 +19,15 @@ from polar_flow_server.admin import admin_router from polar_flow_server.api import api_routers from polar_flow_server.core.config import settings -from polar_flow_server.core.database import close_database, engine, init_database +from polar_flow_server.core.database import ( + async_session_maker, + close_database, + engine, + init_database, +) from polar_flow_server.middleware import RateLimitHeadersMiddleware from polar_flow_server.routes import root_redirect +from polar_flow_server.services.scheduler import SyncScheduler, set_scheduler # Configure structured logging structlog.configure( @@ -48,21 +54,33 @@ async def lifespan(app: Litestar) -> AsyncIterator[None]: Handles startup and shutdown tasks: - Initialize database on startup + - Start background sync scheduler - Close database connections on shutdown + - Stop scheduler on shutdown """ logger.info( "Starting polar-flow-server", version=__version__, mode=settings.deployment_mode.value, + sync_enabled=settings.sync_enabled, + sync_interval=settings.sync_interval_minutes, ) # Initialize database tables await init_database() logger.info("Database initialized") + # Start background sync scheduler + scheduler = SyncScheduler(async_session_maker) + set_scheduler(scheduler) + await scheduler.start() + yield - # Cleanup + # Stop scheduler + await scheduler.stop() + + # Cleanup database await close_database() logger.info("Shutdown complete") diff --git a/src/polar_flow_server/core/config.py b/src/polar_flow_server/core/config.py index 577450f..dbd18c4 100644 --- a/src/polar_flow_server/core/config.py +++ b/src/polar_flow_server/core/config.py @@ -70,12 +70,30 @@ class Settings(BaseSettings): jwt_expiry_minutes: int = Field(default=15, description="JWT access token expiry") # Sync settings - sync_interval_hours: int = Field(default=1, description="How often to sync data") - sync_on_startup: bool = Field(default=True, description="Sync immediately on startup") + sync_enabled: bool = Field( + default=True, + description="Enable automatic background syncing", + ) + sync_interval_minutes: int = Field( + default=60, + description="How often to run sync cycle (minutes)", + ) + sync_on_startup: bool = Field( + default=True, + description="Sync immediately on startup", + ) sync_days_lookback: int = Field( default=30, description="How many days of data to fetch on sync", ) + sync_max_users_per_run: int | None = Field( + default=None, + description="Maximum users per sync cycle (None = rate limit aware auto)", + ) + sync_stagger_seconds: int = Field( + default=5, + description="Seconds to wait between user syncs (rate limit protection)", + ) # Polar OAuth app credentials (app-level, not user-level) # NOTE: For self-hosted, these will be stored in DB via setup wizard diff --git a/src/polar_flow_server/models/__init__.py b/src/polar_flow_server/models/__init__.py index 0d1205d..53379ce 100644 --- a/src/polar_flow_server/models/__init__.py +++ b/src/polar_flow_server/models/__init__.py @@ -22,6 +22,13 @@ from polar_flow_server.models.sleepwise_alertness import SleepWiseAlertness from polar_flow_server.models.sleepwise_bedtime import SleepWiseBedtime from polar_flow_server.models.spo2 import SpO2 +from polar_flow_server.models.sync_log import ( + SyncErrorType, + SyncLog, + SyncPriority, + SyncStatus, + SyncTrigger, +) from polar_flow_server.models.temp_auth_code import TempAuthCode from polar_flow_server.models.temperature import BodyTemperature, SkinTemperature from polar_flow_server.models.user import User @@ -50,6 +57,11 @@ "SleepWiseAlertness", "SleepWiseBedtime", "SpO2", + "SyncErrorType", + "SyncLog", + "SyncPriority", + "SyncStatus", + "SyncTrigger", "TempAuthCode", "User", "UserBaseline", diff --git a/src/polar_flow_server/models/sync_log.py b/src/polar_flow_server/models/sync_log.py new file mode 100644 index 0000000..343ca2c --- /dev/null +++ b/src/polar_flow_server/models/sync_log.py @@ -0,0 +1,350 @@ +"""Sync log model for comprehensive audit trail of all sync operations. + +This module provides complete visibility into every sync attempt, enabling: +- Debugging sync failures with full context +- Monitoring sync performance and success rates +- Tracking rate limit usage across the Polar API +- Auditing data freshness per user + +Every sync operation - whether triggered by scheduler, manual API call, +or webhook - gets logged here with full context. +""" + +from datetime import UTC, datetime +from enum import Enum + +from sqlalchemy import DateTime, Index, Integer, String, Text, func +from sqlalchemy.dialects.postgresql import JSON +from sqlalchemy.orm import Mapped, mapped_column + +from polar_flow_server.models.base import Base + + +class SyncStatus(str, Enum): + """Status of a sync operation. + + Attributes: + STARTED: Sync has begun but not completed + SUCCESS: All data types synced successfully + PARTIAL: Some data types synced, others failed + FAILED: Sync failed completely + SKIPPED: Sync was skipped (e.g., rate limited, no token) + """ + + STARTED = "started" + SUCCESS = "success" + PARTIAL = "partial" + FAILED = "failed" + SKIPPED = "skipped" + + +class SyncErrorType(str, Enum): + """Categorized error types for consistent error handling. + + These categories enable: + - Appropriate retry strategies per error type + - Clear dashboards showing error distribution + - Automated alerting on specific error patterns + + Attributes: + TOKEN_EXPIRED: OAuth token has expired, user needs to re-authenticate + TOKEN_INVALID: Token is malformed or unrecognized + TOKEN_REVOKED: User revoked access in Polar settings + RATE_LIMITED_15M: Hit 15-minute rate limit, backoff needed + RATE_LIMITED_24H: Hit 24-hour rate limit, longer backoff needed + API_UNAVAILABLE: Polar API is down or unreachable + API_TIMEOUT: Request timed out waiting for response + API_ERROR: Polar API returned an error response + INVALID_RESPONSE: Response didn't match expected schema + TRANSFORM_ERROR: Failed to transform API data to our models + DATABASE_ERROR: Failed to write to database + INTERNAL_ERROR: Unexpected internal error + """ + + # Authentication errors + TOKEN_EXPIRED = "token_expired" + TOKEN_INVALID = "token_invalid" + TOKEN_REVOKED = "token_revoked" + + # Rate limiting + RATE_LIMITED_15M = "rate_limited_15m" + RATE_LIMITED_24H = "rate_limited_24h" + + # API errors + API_UNAVAILABLE = "api_unavailable" + API_TIMEOUT = "api_timeout" + API_ERROR = "api_error" + + # Data errors + INVALID_RESPONSE = "invalid_response" + TRANSFORM_ERROR = "transform_error" + + # Internal errors + DATABASE_ERROR = "database_error" + INTERNAL_ERROR = "internal_error" + + +class SyncTrigger(str, Enum): + """What initiated the sync operation. + + Attributes: + SCHEDULER: Automatic sync from background scheduler + MANUAL: User/admin triggered via API endpoint + WEBHOOK: Triggered by Polar webhook (future) + STARTUP: Initial sync on application startup + """ + + SCHEDULER = "scheduler" + MANUAL = "manual" + WEBHOOK = "webhook" + STARTUP = "startup" + + +class SyncPriority(str, Enum): + """Priority level for sync queue ordering. + + Higher priority syncs are processed first when queue is backlogged. + + Attributes: + CRITICAL: Token expiring soon or hasn't synced in 48h+ + HIGH: Active user, hasn't synced in 12h+ + NORMAL: Regular user, hasn't synced in 24h+ + LOW: Dormant user, hasn't synced in 7d+ + """ + + CRITICAL = "critical" + HIGH = "high" + NORMAL = "normal" + LOW = "low" + + +class SyncLog(Base): + """Complete audit trail of every sync attempt. + + This table is the single source of truth for understanding what happened + during any sync operation. It captures timing, results, errors, rate limits, + and follow-up analytics processing. + + Example queries: + # Find all failed syncs in last 24 hours + SELECT * FROM sync_logs + WHERE status = 'failed' + AND started_at > NOW() - INTERVAL '24 hours' + ORDER BY started_at DESC; + + # Get sync success rate by user + SELECT user_id, + COUNT(*) FILTER (WHERE status = 'success') as successes, + COUNT(*) as total, + ROUND(100.0 * COUNT(*) FILTER (WHERE status = 'success') / COUNT(*), 2) as success_rate + FROM sync_logs + GROUP BY user_id; + + # Find rate limit issues + SELECT * FROM sync_logs + WHERE error_type IN ('rate_limited_15m', 'rate_limited_24h') + ORDER BY started_at DESC; + + Attributes: + id: Auto-incrementing primary key + user_id: User being synced (indexed for user-specific queries) + job_id: UUID for correlating logs across services + started_at: When sync began + completed_at: When sync finished (null if still running) + duration_ms: Total sync duration in milliseconds + status: Current status (started/success/partial/failed/skipped) + error_type: Categorized error type if failed + error_message: Human-readable error description + error_details: Full error context as JSON + records_synced: Count of records synced per data type + api_calls_made: Total API calls made during sync + rate_limit_remaining_15m: Remaining 15-min quota after sync + rate_limit_remaining_24h: Remaining 24-hour quota after sync + baselines_recalculated: Whether baselines were updated + patterns_detected: Whether pattern detection ran + trigger: What initiated this sync + priority: Queue priority level + """ + + __tablename__ = "sync_logs" + + # Primary key + id: Mapped[int] = mapped_column(primary_key=True) + + # User identification + user_id: Mapped[str] = mapped_column(String(255), index=True) + job_id: Mapped[str] = mapped_column(String(36), index=True) # UUID for correlation + + # Timing + started_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now(), index=True + ) + completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + duration_ms: Mapped[int | None] = mapped_column(Integer, nullable=True) + + # Status + status: Mapped[str] = mapped_column(String(20), default=SyncStatus.STARTED.value, index=True) + + # Error tracking + error_type: Mapped[str | None] = mapped_column(String(50), nullable=True, index=True) + error_message: Mapped[str | None] = mapped_column(Text, nullable=True) + error_details: Mapped[dict[str, object] | None] = mapped_column(JSON, nullable=True) + + # Results + records_synced: Mapped[dict[str, int] | None] = mapped_column(JSON, nullable=True) + api_calls_made: Mapped[int] = mapped_column(Integer, default=0) + + # Rate limit tracking (from Polar API response headers) + rate_limit_remaining_15m: Mapped[int | None] = mapped_column(Integer, nullable=True) + rate_limit_remaining_24h: Mapped[int | None] = mapped_column(Integer, nullable=True) + rate_limit_limit_15m: Mapped[int | None] = mapped_column(Integer, nullable=True) + rate_limit_limit_24h: Mapped[int | None] = mapped_column(Integer, nullable=True) + + # Analytics follow-up + baselines_recalculated: Mapped[bool] = mapped_column(default=False) + patterns_detected: Mapped[bool] = mapped_column(default=False) + insights_generated: Mapped[bool] = mapped_column(default=False) + + # Context + trigger: Mapped[str] = mapped_column(String(20), default=SyncTrigger.MANUAL.value) + priority: Mapped[str | None] = mapped_column(String(20), nullable=True) + + # Composite indexes for common queries + __table_args__ = ( + Index("ix_sync_logs_user_started", "user_id", "started_at"), + Index("ix_sync_logs_status_started", "status", "started_at"), + Index("ix_sync_logs_error_type_started", "error_type", "started_at"), + ) + + def __repr__(self) -> str: + """Return string representation.""" + return ( + f"" + ) + + @property + def is_complete(self) -> bool: + """Return True if sync has finished (success or failure).""" + return self.status in ( + SyncStatus.SUCCESS.value, + SyncStatus.PARTIAL.value, + SyncStatus.FAILED.value, + SyncStatus.SKIPPED.value, + ) + + @property + def is_successful(self) -> bool: + """Return True if sync completed successfully.""" + return self.status == SyncStatus.SUCCESS.value + + @property + def has_error(self) -> bool: + """Return True if sync failed with an error.""" + return self.error_type is not None + + def complete_success(self, records: dict[str, int], api_calls: int) -> None: + """Mark sync as successfully completed. + + Args: + records: Dict mapping data type to count synced + api_calls: Total API calls made + """ + now = datetime.now(UTC) + self.completed_at = now + self.duration_ms = int((now - self.started_at).total_seconds() * 1000) + self.status = SyncStatus.SUCCESS.value + self.records_synced = records + self.api_calls_made = api_calls + + def complete_partial( + self, records: dict[str, int], api_calls: int, error_type: SyncErrorType, message: str + ) -> None: + """Mark sync as partially completed (some data synced, some failed). + + Args: + records: Dict mapping data type to count synced + api_calls: Total API calls made + error_type: Type of error that caused partial failure + message: Human-readable error message + """ + now = datetime.now(UTC) + self.completed_at = now + self.duration_ms = int((now - self.started_at).total_seconds() * 1000) + self.status = SyncStatus.PARTIAL.value + self.records_synced = records + self.api_calls_made = api_calls + self.error_type = error_type.value + self.error_message = message + + def complete_failed( + self, + error_type: SyncErrorType, + message: str, + details: dict[str, object] | None = None, + api_calls: int = 0, + ) -> None: + """Mark sync as failed. + + Args: + error_type: Categorized error type + message: Human-readable error message + details: Additional error context + api_calls: API calls made before failure + """ + now = datetime.now(UTC) + self.completed_at = now + self.duration_ms = int((now - self.started_at).total_seconds() * 1000) + self.status = SyncStatus.FAILED.value + self.error_type = error_type.value + self.error_message = message + self.error_details = details + self.api_calls_made = api_calls + + def complete_skipped(self, reason: str) -> None: + """Mark sync as skipped (not attempted). + + Args: + reason: Why sync was skipped + """ + now = datetime.now(UTC) + self.completed_at = now + self.duration_ms = 0 + self.status = SyncStatus.SKIPPED.value + self.error_message = reason + + def update_rate_limits( + self, + remaining_15m: int | None, + remaining_24h: int | None, + limit_15m: int | None = None, + limit_24h: int | None = None, + ) -> None: + """Update rate limit tracking from API response headers. + + Args: + remaining_15m: Remaining requests in 15-min window + remaining_24h: Remaining requests in 24-hour window + limit_15m: Max requests in 15-min window + limit_24h: Max requests in 24-hour window + """ + self.rate_limit_remaining_15m = remaining_15m + self.rate_limit_remaining_24h = remaining_24h + if limit_15m is not None: + self.rate_limit_limit_15m = limit_15m + if limit_24h is not None: + self.rate_limit_limit_24h = limit_24h + + def mark_analytics_complete( + self, baselines: bool = False, patterns: bool = False, insights: bool = False + ) -> None: + """Mark post-sync analytics as completed. + + Args: + baselines: Whether baselines were recalculated + patterns: Whether patterns were detected + insights: Whether insights were regenerated + """ + self.baselines_recalculated = baselines + self.patterns_detected = patterns + self.insights_generated = insights diff --git a/src/polar_flow_server/services/scheduler.py b/src/polar_flow_server/services/scheduler.py new file mode 100644 index 0000000..c4fea9c --- /dev/null +++ b/src/polar_flow_server/services/scheduler.py @@ -0,0 +1,263 @@ +"""Background sync scheduler using APScheduler. + +This module provides automatic background syncing for all users at +configurable intervals. It integrates with the SyncOrchestrator to +respect rate limits and maintain audit trails. + +Architecture: + The scheduler runs as a background service that periodically + triggers sync operations via the SyncOrchestrator. + + ┌──────────────────────────────────────────────────────────────────┐ + │ SyncScheduler │ + │ │ + │ ┌──────────────┐ ┌──────────────────────────────────────────┐ │ + │ │ APScheduler │ -> │ sync_all_users job │ │ + │ │ (cron/ │ │ │ │ + │ │ interval) │ │ ┌────────────────────────────────────┐ │ │ + │ └──────────────┘ │ │ SyncOrchestrator │ │ │ + │ │ │ - Rate limit aware │ │ │ + │ │ │ - Priority queue │ │ │ + │ │ │ - Full audit logging │ │ │ + │ │ └────────────────────────────────────┘ │ │ + │ └──────────────────────────────────────────┘ │ + └──────────────────────────────────────────────────────────────────┘ + +Configuration: + SYNC_ENABLED: Enable/disable automatic syncing + SYNC_INTERVAL_MINUTES: How often to run sync (default: 60) + SYNC_ON_STARTUP: Whether to sync immediately on startup + SYNC_MAX_USERS_PER_RUN: Maximum users per sync cycle (rate limit aware) + +Usage: + # In app startup + scheduler = SyncScheduler(db_session_factory) + await scheduler.start() + + # In app shutdown + await scheduler.stop() +""" + +from __future__ import annotations + +import asyncio +from datetime import UTC, datetime +from typing import TYPE_CHECKING + +import structlog +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.interval import IntervalTrigger +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + +from polar_flow_server.core.config import settings +from polar_flow_server.models.sync_log import SyncTrigger +from polar_flow_server.services.sync_orchestrator import SyncOrchestrator + +if TYPE_CHECKING: + from apscheduler.job import Job + +logger = structlog.get_logger() + + +class SyncScheduler: + """Background sync scheduler for automatic user syncing. + + Manages APScheduler to periodically trigger sync operations + for all users while respecting rate limits. + + Attributes: + session_factory: Async session factory for database access + scheduler: APScheduler instance + is_running: Whether scheduler is currently running + last_run_at: Timestamp of last sync run + last_run_stats: Stats from last sync run + """ + + def __init__(self, session_factory: async_sessionmaker[AsyncSession]) -> None: + """Initialize sync scheduler. + + Args: + session_factory: SQLAlchemy async session factory + """ + self.session_factory = session_factory + self.scheduler = AsyncIOScheduler() + self.is_running = False + self.last_run_at: datetime | None = None + self.last_run_stats: dict[str, object] | None = None + self._sync_job: Job | None = None + self.logger = logger.bind(component="sync_scheduler") + + async def start(self) -> None: + """Start the background scheduler. + + Configures and starts APScheduler with the sync job. + If SYNC_ON_STARTUP is enabled, triggers an immediate sync. + """ + if not settings.sync_enabled: + self.logger.info("Sync scheduler disabled by configuration") + return + + if self.is_running: + self.logger.warning("Scheduler already running") + return + + self.logger.info( + "Starting sync scheduler", + interval_minutes=settings.sync_interval_minutes, + sync_on_startup=settings.sync_on_startup, + ) + + # Add the sync job + self._sync_job = self.scheduler.add_job( + self._run_sync_cycle, + trigger=IntervalTrigger(minutes=settings.sync_interval_minutes), + id="sync_all_users", + name="Sync all Polar users", + replace_existing=True, + max_instances=1, # Prevent overlapping runs + ) + + # Start the scheduler + self.scheduler.start() + self.is_running = True + + self.logger.info("Sync scheduler started") + + # Run immediately if configured + if settings.sync_on_startup: + self.logger.info("Running startup sync") + # Run in background to not block startup + asyncio.create_task(self._run_startup_sync()) + + async def stop(self) -> None: + """Stop the background scheduler gracefully.""" + if not self.is_running: + return + + self.logger.info("Stopping sync scheduler") + + self.scheduler.shutdown(wait=True) + self.is_running = False + + self.logger.info("Sync scheduler stopped") + + async def _run_startup_sync(self) -> None: + """Run sync immediately on startup.""" + try: + await self._run_sync_cycle(trigger=SyncTrigger.STARTUP) + except Exception as e: + self.logger.error("Startup sync failed", error=str(e)) + + async def _run_sync_cycle( + self, + trigger: SyncTrigger = SyncTrigger.SCHEDULER, + ) -> None: + """Execute a sync cycle for all users. + + This is the main job that APScheduler calls periodically. + It creates a new database session and uses SyncOrchestrator + to process the sync queue. + + Args: + trigger: What triggered this sync cycle + """ + start_time = datetime.now(UTC) + self.logger.info("Starting sync cycle", trigger=trigger.value) + + try: + async with self.session_factory() as session: + orchestrator = SyncOrchestrator(session) + + # Process sync queue + results = await orchestrator.process_sync_queue( + max_users=settings.sync_max_users_per_run, + ) + + # Get stats + stats = await orchestrator.get_sync_stats() + + # Update instance state + end_time = datetime.now(UTC) + duration_ms = int((end_time - start_time).total_seconds() * 1000) + + self.last_run_at = end_time + self.last_run_stats = { + "trigger": trigger.value, + "users_processed": len(results), + "successful": sum(1 for r in results if r.is_successful), + "failed": sum(1 for r in results if r.has_error), + "duration_ms": duration_ms, + **stats, + } + + self.logger.info( + "Sync cycle complete", + users_processed=len(results), + successful=self.last_run_stats["successful"], + failed=self.last_run_stats["failed"], + duration_ms=duration_ms, + ) + + except Exception as e: + self.logger.exception("Sync cycle failed", error=str(e)) + self.last_run_stats = { + "trigger": trigger.value, + "error": str(e), + "timestamp": datetime.now(UTC).isoformat(), + } + + async def trigger_manual_sync(self) -> dict[str, object]: + """Manually trigger a sync cycle outside the schedule. + + Returns: + Dict with sync results + """ + self.logger.info("Manual sync triggered") + + await self._run_sync_cycle(trigger=SyncTrigger.MANUAL) + + return self.last_run_stats or {"status": "completed"} + + def get_status(self) -> dict[str, object]: + """Get scheduler status for monitoring. + + Returns: + Dict with scheduler state and stats + """ + next_run = None + if self._sync_job and self.is_running: + next_run_time = self._sync_job.next_run_time + if next_run_time: + next_run = next_run_time.isoformat() + + return { + "enabled": settings.sync_enabled, + "is_running": self.is_running, + "interval_minutes": settings.sync_interval_minutes, + "next_run_at": next_run, + "last_run_at": self.last_run_at.isoformat() if self.last_run_at else None, + "last_run_stats": self.last_run_stats, + } + + +# Global scheduler instance (initialized in app startup) +_scheduler: SyncScheduler | None = None + + +def get_scheduler() -> SyncScheduler | None: + """Get the global scheduler instance. + + Returns: + Scheduler instance or None if not initialized + """ + return _scheduler + + +def set_scheduler(scheduler: SyncScheduler) -> None: + """Set the global scheduler instance. + + Args: + scheduler: Scheduler instance to set as global + """ + global _scheduler + _scheduler = scheduler diff --git a/src/polar_flow_server/services/sync_error_handler.py b/src/polar_flow_server/services/sync_error_handler.py new file mode 100644 index 0000000..f5eff6a --- /dev/null +++ b/src/polar_flow_server/services/sync_error_handler.py @@ -0,0 +1,476 @@ +"""Sync error handler for consistent error classification and retry strategies. + +This module provides centralized error handling for all sync operations, ensuring: +- Consistent error classification into SyncErrorType categories +- Appropriate retry strategies based on error type +- Comprehensive error details for debugging +- Clear separation between transient and permanent failures + +Error Classification: + + TRANSIENT (can retry): + - TOKEN_EXPIRED: OAuth token needs refresh, retry with new token + - RATE_LIMITED_15M: Hit 15-min limit, backoff ~15 minutes + - RATE_LIMITED_24H: Hit 24-hour limit, backoff ~24 hours + - API_UNAVAILABLE: Polar API down, exponential backoff + - API_TIMEOUT: Request timed out, retry with longer timeout + - DATABASE_ERROR: DB connection issue, retry after reconnect + + PERMANENT (don't retry automatically): + - TOKEN_INVALID: Token is malformed, user needs to re-auth + - TOKEN_REVOKED: User revoked access, user needs to re-auth + - API_ERROR: Polar API returned error response + - INVALID_RESPONSE: Response schema mismatch + - TRANSFORM_ERROR: Data transformation failed + - INTERNAL_ERROR: Unexpected internal error +""" + +from dataclasses import dataclass +from typing import Any + +import httpx +import structlog +from polar_flow.exceptions import ( + AuthenticationError, + PolarFlowError, + RateLimitError, +) +from sqlalchemy.exc import SQLAlchemyError + +from polar_flow_server.models.sync_log import SyncErrorType + +logger = structlog.get_logger() + + +@dataclass +class SyncError: + """Structured sync error with classification and retry info. + + Attributes: + error_type: Categorized error type for consistent handling + message: Human-readable error message + details: Additional error context as dict + retry_after_seconds: Seconds to wait before retry (None if no retry) + is_transient: Whether error is temporary and can be retried + original_exception: The original exception that caused this error + """ + + error_type: SyncErrorType + message: str + details: dict[str, Any] + retry_after_seconds: int | None + is_transient: bool + original_exception: Exception | None = None + + def to_log_dict(self) -> dict[str, Any]: + """Convert to dict for structured logging. + + Returns: + Dict suitable for structlog context + """ + return { + "error_type": self.error_type.value, + "message": self.message, + "is_transient": self.is_transient, + "retry_after_seconds": self.retry_after_seconds, + **self.details, + } + + +class SyncErrorHandler: + """Centralized error handler for sync operations. + + Classifies exceptions into SyncErrorType categories and provides + consistent retry strategies across all sync operations. + + Usage: + handler = SyncErrorHandler() + + try: + await sync_user(user_id, token) + except Exception as e: + sync_error = handler.classify(e, context={"user_id": user_id}) + sync_log.complete_failed( + error_type=sync_error.error_type, + message=sync_error.message, + details=sync_error.details, + ) + if sync_error.is_transient: + schedule_retry(user_id, delay=sync_error.retry_after_seconds) + """ + + def __init__(self) -> None: + """Initialize error handler.""" + self.logger = logger.bind(component="sync_error_handler") + + def classify( + self, + exception: Exception, + context: dict[str, Any] | None = None, + ) -> SyncError: + """Classify an exception into a SyncError. + + Examines the exception type and attributes to determine: + - The appropriate SyncErrorType category + - Whether the error is transient (retryable) + - How long to wait before retrying + + Args: + exception: The exception to classify + context: Additional context (user_id, endpoint, etc.) + + Returns: + SyncError with classification and retry info + """ + context = context or {} + + # Polar SDK exceptions + if isinstance(exception, RateLimitError): + return self._handle_rate_limit(exception, context) + if isinstance(exception, AuthenticationError): + return self._handle_auth_error(exception, context) + if isinstance(exception, PolarFlowError): + return self._handle_polar_error(exception, context) + + # HTTP client exceptions + if isinstance(exception, httpx.TimeoutException): + return self._handle_timeout(exception, context) + if isinstance(exception, httpx.ConnectError): + return self._handle_connect_error(exception, context) + if isinstance(exception, httpx.HTTPStatusError): + return self._handle_http_status(exception, context) + + # Database exceptions + if isinstance(exception, SQLAlchemyError): + return self._handle_database_error(exception, context) + + # Transformation errors (ValueError, KeyError, TypeError in transforms) + if isinstance(exception, (ValueError, KeyError, TypeError)): + return self._handle_transform_error(exception, context) + + # Unknown exceptions + return self._handle_unknown_error(exception, context) + + def _handle_rate_limit( + self, + exception: RateLimitError, + context: dict[str, Any], + ) -> SyncError: + """Handle rate limit errors from Polar API. + + Polar has two rate limits: + - 15-minute: 500 + (users × 20) requests + - 24-hour: 5000 + (users × 100) requests + + We determine which limit was hit based on retry_after: + - retry_after <= 900 (15 min): 15-minute limit + - retry_after > 900: 24-hour limit + """ + retry_after = exception.retry_after + + # Classify based on retry duration + if retry_after <= 900: # 15 minutes + error_type = SyncErrorType.RATE_LIMITED_15M + message = f"Rate limited by Polar API (15-min window). Retry after {retry_after}s." + else: + error_type = SyncErrorType.RATE_LIMITED_24H + message = f"Rate limited by Polar API (24-hour window). Retry after {retry_after}s." + + self.logger.warning( + "Rate limit hit", + error_type=error_type.value, + retry_after=retry_after, + endpoint=exception.endpoint, + **context, + ) + + return SyncError( + error_type=error_type, + message=message, + details={ + "endpoint": exception.endpoint, + "retry_after": retry_after, + **context, + }, + retry_after_seconds=retry_after, + is_transient=True, + original_exception=exception, + ) + + def _handle_auth_error( + self, + exception: AuthenticationError, + context: dict[str, Any], + ) -> SyncError: + """Handle authentication errors. + + We classify based on status code and message: + - 401 with "expired": Token expired, might be refreshable + - 401 with "invalid": Token malformed or unrecognized + - 401 with "revoked": User revoked access in Polar settings + """ + message_lower = str(exception).lower() + response_body = (exception.response_body or "").lower() + + # Try to determine specific auth failure type + if "expired" in message_lower or "expired" in response_body: + error_type = SyncErrorType.TOKEN_EXPIRED + message = "Polar access token has expired. Token refresh required." + is_transient = True # Can retry with new token + retry_after = 0 # Retry immediately with refreshed token + elif "revoked" in message_lower or "revoked" in response_body: + error_type = SyncErrorType.TOKEN_REVOKED + message = "User revoked Polar access. Re-authentication required." + is_transient = False # User action required + retry_after = None + else: + error_type = SyncErrorType.TOKEN_INVALID + message = "Polar access token is invalid. Re-authentication required." + is_transient = False # User action required + retry_after = None + + self.logger.error( + "Authentication error", + error_type=error_type.value, + endpoint=exception.endpoint, + **context, + ) + + return SyncError( + error_type=error_type, + message=message, + details={ + "endpoint": exception.endpoint, + "status_code": exception.status_code, + **context, + }, + retry_after_seconds=retry_after, + is_transient=is_transient, + original_exception=exception, + ) + + def _handle_polar_error( + self, + exception: PolarFlowError, + context: dict[str, Any], + ) -> SyncError: + """Handle generic Polar API errors.""" + self.logger.error( + "Polar API error", + endpoint=exception.endpoint, + status_code=exception.status_code, + response=exception.response_body[:200] if exception.response_body else None, + **context, + ) + + return SyncError( + error_type=SyncErrorType.API_ERROR, + message=f"Polar API error: {exception}", + details={ + "endpoint": exception.endpoint, + "status_code": exception.status_code, + "response_body": exception.response_body[:500] if exception.response_body else None, + **context, + }, + retry_after_seconds=300, # Retry in 5 minutes + is_transient=True, # API errors might be temporary + original_exception=exception, + ) + + def _handle_timeout( + self, + exception: httpx.TimeoutException, + context: dict[str, Any], + ) -> SyncError: + """Handle HTTP timeout errors.""" + self.logger.warning("API request timeout", error=str(exception), **context) + + return SyncError( + error_type=SyncErrorType.API_TIMEOUT, + message=f"Request timed out: {exception}", + details={ + "error": str(exception), + **context, + }, + retry_after_seconds=60, # Retry in 1 minute + is_transient=True, + original_exception=exception, + ) + + def _handle_connect_error( + self, + exception: httpx.ConnectError, + context: dict[str, Any], + ) -> SyncError: + """Handle connection errors (API unreachable).""" + self.logger.error("API connection failed", error=str(exception), **context) + + return SyncError( + error_type=SyncErrorType.API_UNAVAILABLE, + message=f"Failed to connect to Polar API: {exception}", + details={ + "error": str(exception), + **context, + }, + retry_after_seconds=300, # Retry in 5 minutes + is_transient=True, + original_exception=exception, + ) + + def _handle_http_status( + self, + exception: httpx.HTTPStatusError, + context: dict[str, Any], + ) -> SyncError: + """Handle HTTP status errors not caught by SDK.""" + status_code = exception.response.status_code + + # Map status codes to error types + if status_code == 401: + error_type = SyncErrorType.TOKEN_INVALID + is_transient = False + elif status_code == 429: + error_type = SyncErrorType.RATE_LIMITED_15M + is_transient = True + elif status_code >= 500: + error_type = SyncErrorType.API_UNAVAILABLE + is_transient = True + else: + error_type = SyncErrorType.API_ERROR + is_transient = False + + self.logger.error( + "HTTP status error", + status_code=status_code, + error_type=error_type.value, + **context, + ) + + return SyncError( + error_type=error_type, + message=f"HTTP {status_code}: {exception}", + details={ + "status_code": status_code, + "url": str(exception.request.url), + **context, + }, + retry_after_seconds=300 if is_transient else None, + is_transient=is_transient, + original_exception=exception, + ) + + def _handle_database_error( + self, + exception: SQLAlchemyError, + context: dict[str, Any], + ) -> SyncError: + """Handle database errors.""" + self.logger.error("Database error", error=str(exception), **context) + + return SyncError( + error_type=SyncErrorType.DATABASE_ERROR, + message=f"Database error: {exception}", + details={ + "error_type": type(exception).__name__, + "error": str(exception)[:500], + **context, + }, + retry_after_seconds=60, # Retry in 1 minute + is_transient=True, # DB errors often transient + original_exception=exception, + ) + + def _handle_transform_error( + self, + exception: ValueError | KeyError | TypeError, + context: dict[str, Any], + ) -> SyncError: + """Handle data transformation errors.""" + self.logger.error( + "Transform error", + error_type=type(exception).__name__, + error=str(exception), + **context, + ) + + return SyncError( + error_type=SyncErrorType.TRANSFORM_ERROR, + message=f"Data transformation failed: {exception}", + details={ + "error_type": type(exception).__name__, + "error": str(exception), + **context, + }, + retry_after_seconds=None, # Don't auto-retry data errors + is_transient=False, # Data errors need investigation + original_exception=exception, + ) + + def _handle_unknown_error( + self, + exception: Exception, + context: dict[str, Any], + ) -> SyncError: + """Handle unknown/unexpected errors.""" + self.logger.exception( + "Unexpected sync error", + error_type=type(exception).__name__, + error=str(exception), + **context, + ) + + return SyncError( + error_type=SyncErrorType.INTERNAL_ERROR, + message=f"Unexpected error: {type(exception).__name__}: {exception}", + details={ + "error_type": type(exception).__name__, + "error": str(exception)[:500], + **context, + }, + retry_after_seconds=300, # Retry in 5 minutes cautiously + is_transient=True, # Assume transient, investigate logs + original_exception=exception, + ) + + +# Default retry delays for each error type (seconds) +RETRY_DELAYS: dict[SyncErrorType, int | None] = { + # Transient - will retry + SyncErrorType.TOKEN_EXPIRED: 0, # Retry immediately with refreshed token + SyncErrorType.RATE_LIMITED_15M: 900, # 15 minutes + SyncErrorType.RATE_LIMITED_24H: 86400, # 24 hours + SyncErrorType.API_UNAVAILABLE: 300, # 5 minutes + SyncErrorType.API_TIMEOUT: 60, # 1 minute + SyncErrorType.DATABASE_ERROR: 60, # 1 minute + # Permanent - won't auto-retry + SyncErrorType.TOKEN_INVALID: None, + SyncErrorType.TOKEN_REVOKED: None, + SyncErrorType.API_ERROR: None, + SyncErrorType.INVALID_RESPONSE: None, + SyncErrorType.TRANSFORM_ERROR: None, + SyncErrorType.INTERNAL_ERROR: None, +} + + +def get_retry_delay(error_type: SyncErrorType) -> int | None: + """Get the default retry delay for an error type. + + Args: + error_type: The categorized error type + + Returns: + Seconds to wait before retry, or None if no retry + """ + return RETRY_DELAYS.get(error_type) + + +def is_retryable(error_type: SyncErrorType) -> bool: + """Check if an error type is retryable. + + Args: + error_type: The categorized error type + + Returns: + True if the error can be retried automatically + """ + return RETRY_DELAYS.get(error_type) is not None diff --git a/src/polar_flow_server/services/sync_orchestrator.py b/src/polar_flow_server/services/sync_orchestrator.py new file mode 100644 index 0000000..e1b9e2d --- /dev/null +++ b/src/polar_flow_server/services/sync_orchestrator.py @@ -0,0 +1,556 @@ +"""Sync orchestrator for rate-limit-aware, priority-based user syncing. + +This module provides intelligent orchestration of sync operations across +multiple users while respecting Polar API rate limits and prioritizing +users based on data freshness and activity level. + +Architecture: + The orchestrator manages a priority queue of sync jobs and processes + them according to available rate limit budget. It tracks all operations + in SyncLog for complete audit trail. + + ┌─────────────────────────────────────────────────────────────────────┐ + │ SyncOrchestrator │ + │ │ + │ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │ + │ │ Priority │ -> │ Rate Limit │ -> │ SyncService │ │ + │ │ Queue │ │ Manager │ │ (actual sync) │ │ + │ └──────────────┘ └──────────────┘ └──────────────────────┘ │ + │ ↑ ↑ │ │ + │ │ │ ↓ │ + │ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │ + │ │ User │ │ Polar API │ │ SyncLog │ │ + │ │ Database │ │ Headers │ │ (audit trail) │ │ + │ └──────────────┘ └──────────────┘ └──────────────────────┘ │ + └─────────────────────────────────────────────────────────────────────┘ + +Rate Limits: + Polar API has two rate limits: + - 15-minute: 500 + (users × 20) requests + - 24-hour: 5000 + (users × 100) requests + + We track remaining quota from response headers and pause syncing + when approaching limits to avoid 429 errors. + +Priority Levels: + - CRITICAL: Token expiring or hasn't synced in 48h+ + - HIGH: Active user, hasn't synced in 12h+ + - NORMAL: Regular user, hasn't synced in 24h+ + - LOW: Dormant user, hasn't synced in 7d+ +""" + +from __future__ import annotations + +import uuid +from datetime import UTC, datetime, timedelta +from typing import TYPE_CHECKING + +import structlog +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from polar_flow_server.models.sync_log import ( + SyncErrorType, + SyncLog, + SyncPriority, + SyncStatus, + SyncTrigger, +) +from polar_flow_server.models.user import User +from polar_flow_server.services.baseline import BaselineService +from polar_flow_server.services.pattern import PatternService +from polar_flow_server.services.sync import SyncService +from polar_flow_server.services.sync_error_handler import SyncErrorHandler + +if TYPE_CHECKING: + from collections.abc import Sequence + +logger = structlog.get_logger() + + +class RateLimitTracker: + """Tracks Polar API rate limit state. + + Monitors remaining quota from API response headers and provides + safe sync scheduling based on current limits. + + Attributes: + remaining_15m: Remaining requests in 15-minute window + remaining_24h: Remaining requests in 24-hour window + limit_15m: Max requests in 15-minute window + limit_24h: Max requests in 24-hour window + last_updated: When limits were last updated from API + """ + + # Conservative estimate of API calls per sync + # Actual usage depends on which endpoints return data + CALLS_PER_SYNC_ESTIMATE = 15 + + # Safety buffer - don't use all available quota + SAFETY_BUFFER_PERCENT = 0.1 # Keep 10% buffer + + def __init__(self) -> None: + """Initialize rate limit tracker with defaults.""" + self.remaining_15m: int | None = None + self.remaining_24h: int | None = None + self.limit_15m: int | None = None + self.limit_24h: int | None = None + self.last_updated: datetime | None = None + self.logger = logger.bind(component="rate_limit_tracker") + + def update_from_sync_log(self, sync_log: SyncLog) -> None: + """Update limits from a completed sync log. + + Args: + sync_log: Completed sync with rate limit data + """ + if sync_log.rate_limit_remaining_15m is not None: + self.remaining_15m = sync_log.rate_limit_remaining_15m + if sync_log.rate_limit_remaining_24h is not None: + self.remaining_24h = sync_log.rate_limit_remaining_24h + if sync_log.rate_limit_limit_15m is not None: + self.limit_15m = sync_log.rate_limit_limit_15m + if sync_log.rate_limit_limit_24h is not None: + self.limit_24h = sync_log.rate_limit_limit_24h + self.last_updated = datetime.now(UTC) + + def can_sync_now(self) -> bool: + """Check if we have enough rate limit budget to sync. + + Returns: + True if we can safely attempt a sync + """ + # If we don't have limit info yet, allow sync to get headers + if self.remaining_15m is None or self.remaining_24h is None: + return True + + # Calculate minimum required quota with safety buffer + min_calls = int(self.CALLS_PER_SYNC_ESTIMATE * (1 + self.SAFETY_BUFFER_PERCENT)) + + # Check both windows + return self.remaining_15m >= min_calls and self.remaining_24h >= min_calls + + def get_wait_time_seconds(self) -> int: + """Get seconds to wait before next sync attempt. + + Returns: + Seconds to wait (0 if can sync now) + """ + if self.can_sync_now(): + return 0 + + # If 15-min limit is exhausted, wait up to 15 minutes + if self.remaining_15m is not None and self.remaining_15m < self.CALLS_PER_SYNC_ESTIMATE: + return 900 # 15 minutes + + # If 24-hour limit is exhausted, wait longer + if self.remaining_24h is not None and self.remaining_24h < self.CALLS_PER_SYNC_ESTIMATE: + return 3600 # 1 hour (don't wait full 24h) + + return 0 + + def get_safe_batch_size(self) -> int: + """Get how many users we can safely sync in current batch. + + Returns: + Number of users to sync in this batch + """ + if self.remaining_15m is None: + return 10 # Conservative default + + # Calculate based on 15-min window (more restrictive) + safe_calls = int(self.remaining_15m * (1 - self.SAFETY_BUFFER_PERCENT)) + return max(1, safe_calls // self.CALLS_PER_SYNC_ESTIMATE) + + def to_dict(self) -> dict[str, int | bool | str | None]: + """Convert to dict for logging/monitoring.""" + return { + "remaining_15m": self.remaining_15m, + "remaining_24h": self.remaining_24h, + "limit_15m": self.limit_15m, + "limit_24h": self.limit_24h, + "can_sync": self.can_sync_now(), + "safe_batch_size": self.get_safe_batch_size(), + "last_updated": self.last_updated.isoformat() if self.last_updated else None, + } + + +class SyncOrchestrator: + """Orchestrates sync operations across multiple users. + + Manages priority queue, rate limits, and audit logging for + all sync operations. + + Usage: + async with async_session() as session: + orchestrator = SyncOrchestrator(session) + + # Process sync queue (called by scheduler) + results = await orchestrator.process_sync_queue() + + # Manual sync for specific user + result = await orchestrator.sync_user( + user_id="123", + polar_token="...", + trigger=SyncTrigger.MANUAL, + ) + """ + + def __init__(self, session: AsyncSession) -> None: + """Initialize sync orchestrator. + + Args: + session: Database session + """ + self.session = session + self.sync_service = SyncService(session) + self.error_handler = SyncErrorHandler() + self.rate_limiter = RateLimitTracker() + self.logger = logger.bind(component="sync_orchestrator") + + async def sync_user( + self, + user_id: str, + polar_token: str, + trigger: SyncTrigger = SyncTrigger.MANUAL, + priority: SyncPriority | None = None, + recalculate_analytics: bool = True, + ) -> SyncLog: + """Sync a single user with full audit logging. + + Creates a SyncLog entry, performs the sync, handles errors, + and marks analytics completion. + + Args: + user_id: User identifier + polar_token: Polar API access token + trigger: What triggered this sync + priority: Priority level (for logging) + recalculate_analytics: Whether to recalculate baselines/patterns + + Returns: + Completed SyncLog with results + """ + job_id = str(uuid.uuid4()) + log = self.logger.bind(user_id=user_id, job_id=job_id, trigger=trigger.value) + + # Create sync log entry + sync_log = SyncLog( + user_id=user_id, + job_id=job_id, + trigger=trigger.value, + priority=priority.value if priority else None, + ) + self.session.add(sync_log) + await self.session.flush() # Get ID assigned + + log.info("Starting user sync") + + try: + # Perform sync (without auto-baseline recalc - we handle it here) + results = await self.sync_service.sync_user( + user_id=user_id, + polar_token=polar_token, + recalculate_baselines=False, # We'll do it ourselves + ) + + # Mark sync successful + api_calls = sum(results.values()) + 1 # +1 for initial call + sync_log.complete_success(records=results, api_calls=api_calls) + + log.info("Sync completed successfully", records_synced=results, api_calls=api_calls) + + # Post-sync analytics + if recalculate_analytics: + await self._run_post_sync_analytics(user_id, sync_log, log) + + except Exception as e: + # Classify and log error + sync_error = self.error_handler.classify(e, context={"user_id": user_id}) + sync_log.complete_failed( + error_type=SyncErrorType(sync_error.error_type), + message=sync_error.message, + details=sync_error.details, + ) + + log.error( + "Sync failed", + error_type=sync_error.error_type.value, + error=sync_error.message, + is_transient=sync_error.is_transient, + ) + + # Update rate limits from response (if available) + # Note: This would require the sync service to capture headers + # For now, we track what we can + + # Commit the sync log + await self.session.commit() + + return sync_log + + async def _run_post_sync_analytics( + self, + user_id: str, + sync_log: SyncLog, + log: structlog.stdlib.BoundLogger, + ) -> None: + """Run post-sync analytics (baselines, patterns). + + Args: + user_id: User identifier + sync_log: Sync log to update with analytics status + log: Bound logger for context + """ + baselines_done = False + patterns_done = False + insights_done = False + + # Recalculate baselines + try: + log.info("Recalculating baselines") + baseline_service = BaselineService(self.session) + await baseline_service.calculate_all_baselines(user_id) + baselines_done = True + log.info("Baselines recalculated successfully") + except Exception as e: + log.error("Baseline recalculation failed", error=str(e)) + + # Run pattern detection + try: + log.info("Running pattern detection") + pattern_service = PatternService(self.session) + await pattern_service.detect_all_patterns(user_id) + patterns_done = True + log.info("Pattern detection completed") + except Exception as e: + log.error("Pattern detection failed", error=str(e)) + + # Mark analytics complete (insights are on-demand, mark true) + insights_done = True + sync_log.mark_analytics_complete( + baselines=baselines_done, + patterns=patterns_done, + insights=insights_done, + ) + + async def process_sync_queue( + self, + max_users: int | None = None, + ) -> list[SyncLog]: + """Process the sync queue for all users needing sync. + + Fetches users ordered by priority, respects rate limits, + and processes syncs with full audit logging. + + Args: + max_users: Maximum users to sync (None = use rate limit safe batch) + + Returns: + List of SyncLog entries for processed syncs + """ + log = self.logger.bind(trigger="scheduler") + + # Check rate limits + if not self.rate_limiter.can_sync_now(): + wait_time = self.rate_limiter.get_wait_time_seconds() + log.warning( + "Rate limit exhausted, skipping sync cycle", + wait_time_seconds=wait_time, + rate_limits=self.rate_limiter.to_dict(), + ) + return [] + + # Determine batch size + batch_size = max_users or self.rate_limiter.get_safe_batch_size() + + # Get users needing sync, ordered by priority + users_to_sync = await self._get_users_needing_sync(limit=batch_size) + + if not users_to_sync: + log.info("No users need syncing") + return [] + + log.info( + "Processing sync queue", + users_count=len(users_to_sync), + batch_size=batch_size, + ) + + results: list[SyncLog] = [] + + for user in users_to_sync: + # Double-check rate limits before each sync + if not self.rate_limiter.can_sync_now(): + log.warning("Rate limit reached mid-batch, stopping") + break + + # Calculate priority for this user + priority = await self._calculate_user_priority(user) + + try: + # Decrypt token and sync + polar_token = await self._get_user_token(user) + + sync_log = await self.sync_user( + user_id=user.polar_user_id, + polar_token=polar_token, + trigger=SyncTrigger.SCHEDULER, + priority=priority, + ) + results.append(sync_log) + + # Update rate limiter from results + self.rate_limiter.update_from_sync_log(sync_log) + + except Exception as e: + log.error( + "Failed to sync user", + user_id=user.polar_user_id, + error=str(e), + ) + # Continue with next user + continue + + log.info( + "Sync queue processing complete", + processed=len(results), + successful=sum(1 for r in results if r.is_successful), + failed=sum(1 for r in results if r.has_error), + ) + + return results + + async def _get_users_needing_sync( + self, + limit: int = 50, + ) -> Sequence[User]: + """Get users who need syncing, ordered by priority. + + Priority order: + 1. Users with expiring tokens (< 24h) + 2. Users who haven't synced in 48h+ (CRITICAL) + 3. Users who haven't synced in 12h+ (HIGH) + 4. Users who haven't synced in 24h+ (NORMAL) + 5. Everyone else (LOW) + + Args: + limit: Maximum users to return + + Returns: + List of User objects needing sync + """ + # Get all active users with tokens + stmt = ( + select(User) + .where(User.access_token_encrypted.isnot(None)) + .order_by(User.last_synced_at.asc().nullsfirst()) # Oldest sync first + .limit(limit) + ) + result = await self.session.execute(stmt) + return result.scalars().all() + + async def _calculate_user_priority(self, user: User) -> SyncPriority: + """Calculate sync priority for a user. + + Args: + user: User to calculate priority for + + Returns: + Priority level based on user state + """ + now = datetime.now(UTC) + + # Check token expiry (if tracked) + # For now, base on last sync time + + if user.last_synced_at is None: + return SyncPriority.CRITICAL # Never synced + + hours_since_sync = (now - user.last_synced_at).total_seconds() / 3600 + + if hours_since_sync >= 48: + return SyncPriority.CRITICAL + elif hours_since_sync >= 12: + return SyncPriority.HIGH + elif hours_since_sync >= 6: + return SyncPriority.NORMAL + else: + return SyncPriority.LOW + + async def _get_user_token(self, user: User) -> str: + """Decrypt and return user's Polar token. + + Args: + user: User to get token for + + Returns: + Decrypted Polar access token + + Raises: + ValueError: If user has no token + """ + if not user.access_token_encrypted: + raise ValueError(f"User {user.polar_user_id} has no Polar token") + + from polar_flow_server.core.security import token_encryption + + return token_encryption.decrypt(user.access_token_encrypted) + + async def get_sync_stats(self) -> dict[str, object]: + """Get sync statistics for monitoring. + + Returns: + Dict with sync statistics + """ + from sqlalchemy import func + + now = datetime.now(UTC) + day_ago = now - timedelta(days=1) + + # Get counts from last 24 hours + stats_stmt = select( + func.count().label("total"), + func.count().filter(SyncLog.status == SyncStatus.SUCCESS.value).label("successful"), + func.count().filter(SyncLog.status == SyncStatus.FAILED.value).label("failed"), + func.count().filter(SyncLog.status == SyncStatus.PARTIAL.value).label("partial"), + func.count().filter(SyncLog.status == SyncStatus.SKIPPED.value).label("skipped"), + ).where(SyncLog.started_at >= day_ago) + + result = await self.session.execute(stats_stmt) + row = result.one() + + return { + "last_24h": { + "total": row.total, + "successful": row.successful, + "failed": row.failed, + "partial": row.partial, + "skipped": row.skipped, + "success_rate": (row.successful / row.total * 100) if row.total > 0 else 0, + }, + "rate_limits": self.rate_limiter.to_dict(), + } + + async def get_user_sync_history( + self, + user_id: str, + limit: int = 10, + ) -> Sequence[SyncLog]: + """Get recent sync history for a user. + + Args: + user_id: User identifier + limit: Max records to return + + Returns: + List of recent SyncLog entries + """ + stmt = ( + select(SyncLog) + .where(SyncLog.user_id == user_id) + .order_by(SyncLog.started_at.desc()) + .limit(limit) + ) + result = await self.session.execute(stmt) + return result.scalars().all() diff --git a/src/polar_flow_server/templates/admin/dashboard.html b/src/polar_flow_server/templates/admin/dashboard.html index e060414..d189cc6 100644 --- a/src/polar_flow_server/templates/admin/dashboard.html +++ b/src/polar_flow_server/templates/admin/dashboard.html @@ -267,7 +267,7 @@

No API keys

Data Sync

-

Auto-sync runs every {{ sync_interval_hours }} hour(s). Syncs all 9 endpoints.

+

Auto-sync runs every {{ sync_interval_minutes }} minutes. Syncs all 9 endpoints.

@@ -137,7 +137,8 @@

Today's Readiness

Records in Database
-
+ +
{{ sleep_count }}
nights
@@ -175,6 +176,35 @@

Today's Readiness

HR days
+ +
+
+
+
{{ spo2_count }}
+
SpO2
+
+
+
{{ ecg_count }}
+
ECG
+
+
+
{{ body_temp_count }}
+
body temp
+
+
+
{{ skin_temp_count }}
+
skin temp
+
+
+
{{ baseline_count }}
+
baselines
+
+
+
{{ pattern_count }}
+
patterns
+
+
+
@@ -262,28 +292,124 @@

No API keys

{% endif %}
- +
-

Data Sync

-

Auto-sync runs every {{ sync_interval_minutes }} minutes. Syncs all 9 endpoints.

-
- +

Background Sync

+

Automatic sync every {{ sync_interval_minutes }} minutes

+
+
+ {% if scheduler_status.is_running %} + + + Running + + {% elif scheduler_status.enabled %} + + Enabled (not started) + + {% else %} + + Disabled + + {% endif %} + +
+ + +
+
+
Next Run
+
+ {% if scheduler_status.next_run_at %}{{ scheduler_status.next_run_at }}{% else %}--{% endif %} +
+
+
+
Last Run
+
+ {% if scheduler_status.last_run_at %}{{ scheduler_status.last_run_at }}{% else %}Never{% endif %} +
+
+
+
24h Success
+
{{ sync_stats.successful_24h }}/{{ sync_stats.total_24h }}
+
+
+
24h Failed
+
{{ sync_stats.failed_24h }}
+
+
+
+ + + {% if recent_sync_logs %} +
+

Recent Sync History

+
+ + + + + + + + + + + + + {% for log in recent_sync_logs %} + + + + + + + + + {% endfor %} + +
TimeUserStatusTriggerDurationRecords
+ {{ log.started_at.strftime('%m/%d %H:%M') }} + + {{ log.user_id[:12] }}{% if log.user_id|length > 12 %}...{% endif %} + + {% if log.status == 'success' %} + Success + {% elif log.status == 'partial' %} + Partial + {% elif log.status == 'failed' %} + Failed + {% elif log.status == 'started' %} + Running + {% else %} + {{ log.status }} + {% endif %} + {{ log.trigger }} + {% if log.duration_ms %}{{ (log.duration_ms / 1000)|round(1) }}s{% else %}--{% endif %} + + {% if log.records_synced %} + {{ log.records_synced|length }} types + {% else %}--{% endif %} +
+
+
+ {% endif %} @@ -481,7 +607,7 @@

Training Load ({{ latest_ca {% if latest_hr %} -
+

Heart Rate ({{ latest_hr.date }})

@@ -504,6 +630,52 @@

Heart Rate ({{ latest_hr.da

{% endif %} + +{% if latest_spo2 or latest_skin_temp %} +
+

+ + + + + Biosensing + +

+
+ {% if latest_spo2 %} +
+
SpO2 ({{ latest_spo2.test_time.strftime('%m/%d') }})
+
+ {{ latest_spo2.spo2_avg|round(1) if latest_spo2.spo2_avg else '--' }}% +
+ {% if latest_spo2.spo2_min %} +
Min: {{ latest_spo2.spo2_min }}%
+ {% endif %} +
+ {% endif %} + {% if latest_skin_temp %} +
+
Skin Temp ({{ latest_skin_temp.sleep_date }})
+
+ {{ latest_skin_temp.temperature_celsius|round(1) }}°C +
+
+ {% if latest_skin_temp.deviation_from_baseline > 0 %}+{% endif %}{{ latest_skin_temp.deviation_from_baseline|round(2) }}° from baseline +
+
+ {% endif %} +
+
SpO2 Records
+
{{ spo2_count }}
+
+
+
ECG Records
+
{{ ecg_count }}
+
+
+
+{% endif %} +