Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().d
```bash
# With per-user API key (includes rate limit headers)
curl -H "X-API-Key: pfk_your_api_key_here" \
http://localhost:8000/users/{user_id}/sleep?days=7
http://localhost:8000/api/v1/users/{user_id}/sleep?days=7

# Response headers include:
# X-RateLimit-Limit: 1000
Expand Down Expand Up @@ -211,7 +211,7 @@ Store `api_key` and `polar_user_id` for this user. Use the API key for all data

```bash
curl -H "X-API-Key: pfk_abc123..." \
"https://your-polar-server.com/users/12345678/sleep?days=7"
"https://your-polar-server.com/api/v1/users/12345678/sleep?days=7"
```

### Polar Admin Setup
Expand All @@ -226,43 +226,43 @@ https://your-polar-server.com/oauth/callback

```bash
# Get key info
GET /users/{user_id}/api-key/info
GET /api/v1/users/{user_id}/api-key/info
X-API-Key: pfk_...

# Regenerate key (invalidates old key)
POST /users/{user_id}/api-key/regenerate
POST /api/v1/users/{user_id}/api-key/regenerate
X-API-Key: pfk_...

# Revoke key
POST /users/{user_id}/api-key/revoke
POST /api/v1/users/{user_id}/api-key/revoke
X-API-Key: pfk_...
```

## API Endpoints

```bash
# Health check
# Health check (no auth required)
curl http://localhost:8000/health

# Get sleep data (last 7 days)
curl -H "X-API-Key: pfk_..." \
"http://localhost:8000/users/{user_id}/sleep?days=7"
"http://localhost:8000/api/v1/users/{user_id}/sleep?days=7"

# Get activity data
curl -H "X-API-Key: pfk_..." \
"http://localhost:8000/users/{user_id}/activity?days=7"
"http://localhost:8000/api/v1/users/{user_id}/activity?days=7"

# Get nightly recharge (HRV)
curl -H "X-API-Key: pfk_..." \
"http://localhost:8000/users/{user_id}/recharge?days=7"
"http://localhost:8000/api/v1/users/{user_id}/recharge?days=7"

# Get exercises
curl -H "X-API-Key: pfk_..." \
"http://localhost:8000/users/{user_id}/exercises?days=30"
"http://localhost:8000/api/v1/users/{user_id}/exercises?days=30"

# Export summary
curl -H "X-API-Key: pfk_..." \
"http://localhost:8000/users/{user_id}/export/summary?days=30"
"http://localhost:8000/api/v1/users/{user_id}/export/summary?days=30"
```

## Development
Expand Down
212 changes: 196 additions & 16 deletions src/polar_flow_server/admin/routes.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
"""Admin panel routes."""

import asyncio
import csv
import io
import logging
import os
import re
import secrets
from collections import OrderedDict
from datetime import UTC, date, datetime, timedelta
from typing import Any
from urllib.parse import urlencode
Expand Down Expand Up @@ -52,9 +55,150 @@
from polar_flow_server.services.scheduler import get_scheduler
from polar_flow_server.services.sync import SyncService

# In-memory OAuth state storage (for self-hosted single-instance use)
# In production SaaS, use Redis or database with TTL
_oauth_states: dict[str, datetime] = {}
logger = logging.getLogger(__name__)


# =============================================================================
# Bounded TTL Cache for OAuth States (prevents memory exhaustion)
# =============================================================================


class BoundedTTLCache:
"""Simple bounded cache with TTL for OAuth states.

Prevents memory exhaustion attacks by limiting max entries.
Automatically evicts expired entries on access.
Thread-safe via asyncio lock.
"""

def __init__(self, maxsize: int = 100, ttl_minutes: int = 10) -> None:
self._cache: OrderedDict[str, datetime] = OrderedDict()
self._maxsize = maxsize
self._ttl = timedelta(minutes=ttl_minutes)
self._lock = asyncio.Lock()

async def set(self, key: str, expires_at: datetime | None = None) -> None:
"""Add or update a key with expiry time."""
async with self._lock:
self._cleanup_expired()
# If at max, evict oldest entry and log warning
if len(self._cache) >= self._maxsize:
logger.warning(f"OAuth state cache full ({self._maxsize}), evicting oldest entries")
while len(self._cache) >= self._maxsize:
self._cache.popitem(last=False)
self._cache[key] = expires_at or (datetime.now(UTC) + self._ttl)

async def get(self, key: str) -> datetime | None:
"""Get expiry time for a key, or None if not found/expired."""
async with self._lock:
self._cleanup_expired()
return self._cache.get(key)

async def pop(self, key: str) -> datetime | None:
"""Remove and return expiry time for a key."""
async with self._lock:
return self._cache.pop(key, None)

async def contains(self, key: str) -> bool:
"""Check if key exists (async version of __contains__)."""
async with self._lock:
self._cleanup_expired()
return key in self._cache

def _cleanup_expired(self) -> None:
"""Remove expired entries. Must be called with lock held."""
now = datetime.now(UTC)
# Use dict comprehension for atomic update
self._cache = OrderedDict((k, exp) for k, exp in self._cache.items() if exp >= now)


# OAuth state storage with bounded size (prevents memory exhaustion)
_oauth_states = BoundedTTLCache(maxsize=100, ttl_minutes=10)


# =============================================================================
# Login Rate Limiting (prevents brute force attacks)
# =============================================================================


class LoginRateLimiter:
"""Simple in-memory rate limiter for login attempts.

Tracks failed attempts by IP address and locks out after threshold.
Thread-safe via asyncio lock.
"""

def __init__(
self, max_attempts: int = 5, lockout_minutes: int = 15, cleanup_interval: int = 100
) -> None:
self._attempts: dict[str, list[datetime]] = {}
self._lockouts: dict[str, datetime] = {}
self._max_attempts = max_attempts
self._lockout_duration = timedelta(minutes=lockout_minutes)
self._attempt_window = timedelta(minutes=15)
self._cleanup_counter = 0
self._cleanup_interval = cleanup_interval
self._lock = asyncio.Lock()

async def is_locked_out(self, ip: str) -> bool:
"""Check if IP is currently locked out."""
async with self._lock:
self._maybe_cleanup()
lockout_until = self._lockouts.get(ip)
if lockout_until and lockout_until > datetime.now(UTC):
return True
# Clear expired lockout
if lockout_until:
del self._lockouts[ip]
return False

async def record_failure(self, ip: str) -> bool:
"""Record a failed login attempt. Returns True if now locked out."""
async with self._lock:
now = datetime.now(UTC)
self._maybe_cleanup()

# Get recent attempts within window
attempts = self._attempts.get(ip, [])
cutoff = now - self._attempt_window
attempts = [t for t in attempts if t > cutoff]
attempts.append(now)
self._attempts[ip] = attempts

# Check if should lock out
if len(attempts) >= self._max_attempts:
self._lockouts[ip] = now + self._lockout_duration
logger.warning(f"Login rate limit exceeded for IP {ip}, locked out")
return True
return False

async def record_success(self, ip: str) -> None:
"""Clear attempts on successful login."""
async with self._lock:
self._attempts.pop(ip, None)
self._lockouts.pop(ip, None)

def _maybe_cleanup(self) -> None:
"""Periodically clean up old entries. Must be called with lock held."""
self._cleanup_counter += 1
if self._cleanup_counter < self._cleanup_interval:
return
self._cleanup_counter = 0

now = datetime.now(UTC)
cutoff = now - self._attempt_window

# Atomic cleanup using dict comprehension
self._attempts = {
ip: [t for t in attempts if t > cutoff]
for ip, attempts in self._attempts.items()
if any(t > cutoff for t in attempts)
}
self._lockouts = {ip: exp for ip, exp in self._lockouts.items() if exp >= now}


# Global rate limiter instance
_login_rate_limiter = LoginRateLimiter(max_attempts=5, lockout_minutes=15)

# Simple email validation pattern
_EMAIL_PATTERN = re.compile(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
Expand Down Expand Up @@ -347,11 +491,50 @@ async def login_form(request: Request[Any, Any, Any], session: AsyncSession) ->
)


def _get_client_ip(request: Request[Any, Any, Any]) -> str:
"""Get client IP from request, checking proxy headers only from trusted sources.

Only trusts X-Forwarded-For/X-Real-IP when request comes from localhost
(i.e., from a reverse proxy like nginx/Coolify running on the same host).
This prevents IP spoofing attacks where attackers set fake headers.
"""
client = request.client
direct_ip = client.host if client else "unknown"

# Only trust proxy headers if request comes from localhost (reverse proxy)
trusted_proxies = {"127.0.0.1", "::1", "localhost"}
if direct_ip in trusted_proxies:
# Check X-Forwarded-For header (set by proxies like nginx, Coolify)
forwarded_for = request.headers.get("x-forwarded-for")
if forwarded_for:
# Take the first IP (original client)
return forwarded_for.split(",")[0].strip()
# Check X-Real-IP header
real_ip = request.headers.get("x-real-ip")
if real_ip:
return real_ip

return direct_ip


@post("/login", sync_to_thread=False)
async def login_submit(
request: Request[Any, Any, Any], session: AsyncSession
) -> Template | Redirect:
"""Process login form submission."""
client_ip = _get_client_ip(request)

# Check if IP is locked out due to too many failed attempts
if await _login_rate_limiter.is_locked_out(client_ip):
return Template(
template_name="admin/login.html",
context={
"error": "Too many failed attempts. Please try again later.",
"email": "",
"csrf_token": _get_csrf_token(request),
},
)

form_data = await request.form()
email = form_data.get("email", "").strip()
password = form_data.get("password", "")
Expand All @@ -368,6 +551,8 @@ async def login_submit(

admin = await authenticate_admin(str(email), str(password), session)
if not admin:
# Record failed attempt
await _login_rate_limiter.record_failure(client_ip)
return Template(
template_name="admin/login.html",
context={
Expand All @@ -377,6 +562,8 @@ async def login_submit(
},
)

# Successful login - clear any failed attempts
await _login_rate_limiter.record_success(client_ip)
login_admin(request, admin)
return Redirect(path="/admin", status_code=HTTP_303_SEE_OTHER)

Expand Down Expand Up @@ -777,15 +964,9 @@ async def oauth_authorize(request: Request[Any, Any, Any], session: AsyncSession
# No OAuth credentials configured, redirect to setup
return Redirect(path="/admin", status_code=HTTP_303_SEE_OTHER)

# Generate CSRF state token
# Generate CSRF state token (BoundedTTLCache handles cleanup and size limits)
state = secrets.token_urlsafe(32)
_oauth_states[state] = datetime.now(UTC) + timedelta(minutes=10)

# Clean up expired states
now = datetime.now(UTC)
expired = [s for s, exp in _oauth_states.items() if exp < now]
for s in expired:
del _oauth_states[s]
await _oauth_states.set(state)

# Build authorization URL with state for CSRF protection
base_url = _get_base_url(request)
Expand Down Expand Up @@ -819,20 +1000,19 @@ async def oauth_callback(
)

# Validate CSRF state token
if not state or state not in _oauth_states:
if not state or not await _oauth_states.contains(state):
return Template(
template_name="admin/partials/sync_error.html",
context={"error": "Invalid OAuth state - possible CSRF attack. Please try again."},
)

# Check state hasn't expired and remove it (one-time use)
if _oauth_states[state] < datetime.now(UTC):
del _oauth_states[state]
# Get and remove state (one-time use) - also checks expiry
state_expires = await _oauth_states.pop(state)
if state_expires and state_expires < datetime.now(UTC):
return Template(
template_name="admin/partials/sync_error.html",
context={"error": "OAuth state expired. Please try again."},
)
del _oauth_states[state]

# Get OAuth credentials from database
stmt = select(AppSettings).where(AppSettings.id == 1)
Expand Down
17 changes: 13 additions & 4 deletions src/polar_flow_server/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""API routes."""

from litestar import Router

from polar_flow_server.api.baselines import baselines_router
from polar_flow_server.api.data import data_router
from polar_flow_server.api.health import health_router
Expand All @@ -9,17 +11,24 @@
from polar_flow_server.api.sleep import sleep_router
from polar_flow_server.api.sync import sync_router

# All API routers
api_routers = [
health_router,
# Versioned API routers (user data endpoints)
# These get the /api/v1 prefix
_v1_routers = [
sleep_router,
sync_router,
data_router,
baselines_router, # Analytics baselines
patterns_router, # Pattern detection and anomalies
insights_router, # Unified insights API
oauth_router, # OAuth flow and code exchange
keys_router, # Key management (regenerate, revoke, status)
]

api_v1_router = Router(path="/api/v1", route_handlers=_v1_routers)

# Export: health (root), oauth (root), v1 (prefixed)
# - health_router: /health - no auth needed, no version prefix
# - oauth_router: /oauth/* - external OAuth flow, no version prefix
# - api_v1_router: /api/v1/* - all user data endpoints
api_routers = [health_router, oauth_router, api_v1_router]

__all__ = ["api_routers"]
Loading