Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions backend/alembic/versions/cfe1d8e4e001_add_sync_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,21 @@


def upgrade() -> None:
"""
Create the `sync_events` table to record metadata about incoming synchronization events.

The table schema:
- id: integer primary key, autoincrement.
- event_id: varchar(255), nullable.
- source: varchar(100), not nullable.
- body_sha256: varchar(64), not nullable.
- received_at: timestamp with timezone, not nullable, defaults to CURRENT_TIMESTAMP.

Unique constraints:
- uq_sync_events_src_event on (source, event_id)
- uq_sync_events_src_body on (source, body_sha256)
- uq_sync_events_body on (body_sha256)
"""
op.create_table(
"sync_events",
sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True),
Expand All @@ -28,4 +43,9 @@ def upgrade() -> None:


def downgrade() -> None:
"""
Drop the "sync_events" table from the database.

This operation permanently removes the table and all its data.
"""
op.drop_table("sync_events")
16 changes: 14 additions & 2 deletions backend/api/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@ def _normalize_postgres_url(url: str) -> str:


def get_engine_url() -> str:
"""Resolve the database URL from env or settings with sane defaults."""
"""
Resolve the database URL, preferring the DATABASE_URL environment variable over the configured settings.

If the URL scheme is "sqlite" it is returned unchanged; otherwise PostgreSQL URLs are normalized to use the psycopg driver.

Returns:
engine_url (str): The resolved, normalized database URL.
"""
settings = Settings()
url = os.getenv("DATABASE_URL", settings.database_url)
if url.startswith("sqlite"):
Expand All @@ -51,7 +58,12 @@ def get_engine_url() -> str:


def get_db() -> Generator:
"""FastAPI dependency to provide a session per request."""
"""
FastAPI dependency that yields a per-request SQLAlchemy session.

Yields:
db (Session): A SQLAlchemy Session instance for the request. The session is closed when the generator completes.
"""
db = SessionLocal()
try:
yield db
Expand Down
40 changes: 34 additions & 6 deletions backend/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
if _Settings().database_url.startswith("sqlite"):
@app.on_event("startup")
async def _ensure_sqlite_tables() -> None:
"""
Create all database tables declared on the ORM Base using the configured SQLite engine.

This ensures any tables defined on `Base` exist in the SQLite database by invoking the ORM metadata creation routine.
"""
Base.metadata.create_all(bind=engine)

# Add CORS middleware
Expand All @@ -63,6 +68,24 @@ async def _ensure_sqlite_tables() -> None:
@app.exception_handler(RequestValidationError)
async def handle_validation_error(request: Request, exc: RequestValidationError) -> JSONResponse:
# Unified error envelope for malformed JSON / validation errors
"""
Handle request validation errors and return a unified JSON error envelope.

Parameters:
request (Request): The incoming HTTP request that failed validation.
exc (RequestValidationError): The validation error containing detailed error items.

Returns:
JSONResponse: HTTP 422 response with body:
{
"ok": False,
"error": {
"code": "BAD_REQUEST",
"message": "invalid request",
"details": [<validation error dicts>]
}
}
"""
return JSONResponse(
status_code=422,
content={
Expand All @@ -78,12 +101,11 @@ async def handle_validation_error(request: Request, exc: RequestValidationError)

@app.get("/")
async def root() -> Dict[str, str]:
"""Root endpoint of the API.

Returns
-------
Dict[str, str]
A welcome message for the API.
"""
Provide the root endpoint response containing a welcome message.

Returns:
dict: A mapping with a single key "message" whose value is the welcome string.
"""
return {"message": "Welcome to the MVP API"}

Expand All @@ -104,4 +126,10 @@ async def healthcheck() -> Dict[str, str]:
@app.get("/metrics")
async def metrics() -> Response:
# Expose Prometheus metrics including default process/python collectors
"""
Serve Prometheus metrics including default process and Python collectors.

Returns:
Response: An HTTP response whose body is the latest Prometheus metrics (bytes) and whose Content-Type is the Prometheus exposition format.
"""
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
52 changes: 42 additions & 10 deletions backend/api/routes_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@


def _error_envelope(code: str, message: str, details: Optional[dict] = None) -> Dict[str, Any]:
"""
Constructs a standardized error response payload for API responses.

Parameters:
code (str): Machine-readable error code.
message (str): Human-readable error message.
details (dict, optional): Additional contextual information; defaults to an empty dict.

Returns:
Dict[str, Any]: A dictionary with `"ok": False` and an `"error"` object containing `code`, `message`, and `details`.
"""
return {"ok": False, "error": {"code": code, "message": message, "details": details or {}}}


Expand All @@ -48,12 +59,15 @@ async def hygraph_webhook(
db: Session = Depends(get_db),
) -> Dict[str, Any]:
"""
Webhook receiver:
- HMAC validated (dependency)
- Single size guard (2MB) already enforced by dependency; body/raw set on request.state
- DB dedup via SyncEvent(event_id, body_sha256 unique)
- 202 fast-ack with background processing (pull_all)
- Structured JSON log line and Prometheus counters
Handle Hygraph webhook requests, deduplicate events, acknowledge quickly, and enqueue a background full pull.

Validates HMAC and request size via dependencies, records a SyncEvent to prevent duplicate processing, and returns a fast acknowledgement. If the incoming request body is not valid JSON, raises a 400 with a structured BAD_REQUEST envelope. On new events, schedules a background task that invokes HygraphService.pull_all and updates Prometheus metrics and logs. Duplicate events return immediately with a dedup response.

Returns:
JSONResponse: 202 response with {"ok": True, "accepted": True} when the event is accepted and background processing is scheduled, or 200 with {"ok": True, "dedup": True} when the event was already processed.

Raises:
HTTPException: 400 with a BAD_REQUEST error envelope if the request body contains invalid JSON.
"""
start = time.perf_counter()
raw = getattr(request.state, "raw_body", b"")
Expand Down Expand Up @@ -88,6 +102,15 @@ async def hygraph_webhook(
raise HTTPException(status_code=400, detail=_error_envelope("BAD_REQUEST", "Invalid JSON payload"))

async def _process(event_id_local: Optional[str], body_sha_local: str) -> None:
"""
Execute a full Hygraph pull, update Prometheus metrics for the results, and log the outcome.

Calls the service to pull all Hygraph data, increments per-type upsert counters and a success counter on success, or increments a failure counter and logs the exception on error.

Parameters:
event_id_local (Optional[str]): Optional Hygraph delivery event ID to include in log context.
body_sha_local (str): SHA-256 hex of the request body to include in log/processing context.
"""
t0 = time.perf_counter()
try:
counts = await HygraphService.pull_all(db)
Expand Down Expand Up @@ -126,10 +149,19 @@ async def hygraph_pull(
db: Session = Depends(get_db),
) -> Dict[str, Any]:
"""
Admin pull:
- Auth via Bearer token (constant-time compare)
- Accepts "type" or "sync_type" + optional "page_size"
- Validates positive page_size and caps inside service (≤200)
Trigger an administrative Hygraph sync for a specified resource type and return the resulting counts.

Parameters:
body (Dict[str, Any]): Request payload containing either "type" or "sync_type" (one of "materials", "modules", "systems", or "all")
and an optional "page_size" (positive integer) to request a specific page size.
db (Session): Database session (provided by dependency injection).

Returns:
Dict[str, Any]: A dictionary with "ok": True and "data": counts returned by the HygraphService call.

Raises:
HTTPException: 400 if "page_size" is not a positive integer or if an unsupported type is provided.
HTTPException: 500 if an internal sync error occurs (includes the attempted type in the error details).
"""
sync_type = str((body.get("type") or body.get("sync_type") or "")).lower().strip()
page_size_raw = body.get("page_size")
Expand Down
66 changes: 61 additions & 5 deletions backend/api/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,35 @@

def get_settings() -> Settings:
# Instantiate settings outside of Pydantic request validation to avoid DI issues.
"""
Create and return a new Settings instance.

Returns:
Settings: A fresh Settings object initialized from environment variables and defaults.
"""
return Settings()


async def require_write_token(
authorization: str | None = Security(api_key_header),
settings: Settings = Depends(get_settings),
) -> bool:
"""
Authenticate requests using an admin write token provided via the Bearer Authorization header.

Validates that the Authorization header contains a Bearer token and that the token matches the configured write token (Settings.api_write_token or the API_WRITE_TOKEN environment variable).

Parameters:
authorization (str | None): The raw Authorization header value (expected to start with "Bearer ").
settings (Settings): Configuration used to obtain the expected write token.

Returns:
bool: `True` if the provided Bearer token matches the configured write token.

Raises:
HTTPException: With status 401 if the Authorization header is missing or not a Bearer token.
HTTPException: With status 403 if the token does not match the configured write token.
"""
if not authorization or not authorization.lower().startswith("bearer "):
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED, detail="Invalid or missing Authorization header."
Expand All @@ -41,11 +63,16 @@ async def require_write_token(


def _parse_kv_signature(header: str) -> dict[str, str]:
"""Parse a Hygraph signature header.

Supports:
- "sha256=<hex>" (Hygraph native)
- "sign=<hex>, t=<epoch_ms>" (extended with timestamp for replay window)
"""
Parse a Hygraph signature header into key/value components.

Accepts either the simple `sha256=<hex>` form or comma-separated `key=value` pairs such as `sign=<hex>, t=<epoch_ms>`. An empty or falsy `header` yields an empty dict.

Parameters:
header (str): Raw value of the Hygraph signature header.

Returns:
dict[str, str]: Mapping of parsed component names to their string values (for example `{'sha256': '...'}` or `{'sign': '...', 't': '...'}`).
"""
header = (header or "").strip()
out: dict[str, str] = {}
Expand All @@ -64,6 +91,20 @@ def _parse_kv_signature(header: str) -> dict[str, str]:
def verify_hygraph_signature(
body: bytes, signature_header: str, secret: str, max_skew_ms: int = 5 * 60 * 1000
) -> bool:
"""
Validate a Hygraph webhook payload using the provided shared secret.

Checks the signature_header for either a simple "sha256=<hex>" token or an extended "sign=<hex>,t=<ms>" pair. For the simple form, the function compares the provided hex to the HMAC-SHA256 of the raw body using the secret. For the extended form, the function enforces that the timestamp `t` is within max_skew_ms of the current time and compares the provided `sign` to the HMAC-SHA256 of the body concatenated with the timestamp. Any missing secret, malformed header, clock skew violation, mismatch, or error results in a False result.

Parameters:
body (bytes): Raw request body used to compute the HMAC.
signature_header (str): Hygraph signature header value; expected formats: "sha256=<hex>" or "sign=<hex>,t=<ms>".
secret (str): Shared secret used to compute HMAC-SHA256.
max_skew_ms (int): Maximum allowed clock skew in milliseconds for timestamped signatures (default: 5 minutes).

Returns:
bool: `true` if the signature is valid and within allowed skew, `false` otherwise.
"""
if not secret:
return False
try:
Expand Down Expand Up @@ -92,6 +133,21 @@ async def validate_hygraph_request(
request: Request,
settings: Settings = Depends(get_settings),
) -> bool:
"""
Validate a Hygraph webhook request, enforce size limits, verify its signature, and attach the raw payload and its SHA-256 to request.state.

Reads the signature from the `x-hygraph-signature` header, enforces a maximum body size (from settings.max_webhook_body_bytes or the MAX_WEBHOOK_BODY_BYTES environment variable; defaults to 2 MB), and validates the body against the configured Hygraph webhook secret. On successful validation this function sets `request.state.raw_body` to the raw bytes and `request.state.body_sha256` to the hex SHA-256 digest.

Parameters:
request (Request): Incoming FastAPI request; its body is consumed and on success `request.state.raw_body` and `request.state.body_sha256` are populated.
settings (Settings): Configuration providing `max_webhook_body_bytes` and `hygraph_webhook_secret` (if absent, corresponding environment variables are used).

Returns:
bool: `True` if the request body was accepted and the signature validated.

Raises:
HTTPException: 413 if the payload exceeds the configured limit; 401 if the signature is missing or invalid.
"""
signature = request.headers.get(HYGRAPH_SIGNATURE_HEADER)
body = await request.body()
limit = getattr(settings, "max_webhook_body_bytes", None) or int(os.getenv("MAX_WEBHOOK_BODY_BYTES", str(2 * 1024 * 1024)))
Expand Down
Loading
Loading