Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ For HTTP transport, the wire protocol maps to separate endpoints: `POST /vgi/{me

**Error propagation**: Server exceptions become zero-row batches with error metadata; clients receive `RpcError` with `error_type`, `error_message`, and `remote_traceback`. The transport stays clean for subsequent requests.

**Sticky sessions (HTTP, opt-in)**: With `make_wsgi_app(enable_sticky=True)`, a method body may call `ctx.open_session(state)` to register a Python object (DB cursor, model handle, file handle) in a per-worker registry; subsequent calls from the same client (inside `conn.with_session_token():`) carry a `VGI-Session` header that resolves back to the object as `ctx.session`. Eviction is TTL-driven (default 300s, override per-call via `ttl=`) or explicit (`ctx.close_session()`); `state.close()` is invoked on eviction if defined. The framework serializes concurrent calls on the same session via a per-session `RLock`; different sessions run in parallel. Misroute / expiry / cross-principal token surfaces as `SessionLostError` (typed, `error_kind="session_lost"`); drain-time opens surface as `ServerDrainingError`. Client uses `conn.with_session_token() as sess:` (auto-sends `VGI-Session-Accept: true` for the server's leak-prevention guard) and `sess.detach()` to stash a token for later resumption without firing the exit-time best-effort `DELETE /vgi/__session__`. Sticky machinery is not installed on pipe / subprocess / unix transports; `ctx.open_session` raises `RuntimeError` there. **Echo headers** (`sticky_echo_headers=` kwarg on `make_wsgi_app`): tell the client to replay arbitrary headers on every subsequent request in the session — emitted as `VGI-Echo-<name>: <value>` on the session-opening response, captured + replayed by the client view, exposed via `sess.current_echo_headers()`. Used for client-driven routing on platforms like Fly.io; `vgi_rpc.http.fly.fly_sticky_echo_headers()` + `vgi_rpc.http.fly.auto_server_id()` are the ~25-line Fly quickstart helpers (return `None` off Fly so the same code works everywhere). Full spec: `docs/sticky-sessions-spec.md`. Cross-language conformance group: `TestSticky` in `vgi_rpc/conformance/_pytest_suite.py`, capability-gated on `VGI-Sticky-Enabled`; `TestSticky::test_echo_header_round_trip` further capability-gated on `VGI-Sticky-Echo-Headers`.
**Sticky sessions (HTTP, opt-in)**: With `make_wsgi_app(enable_sticky=True)`, a method body may call `ctx.open_session(state)` to register a Python object (DB cursor, model handle, file handle) in a per-worker registry; subsequent calls from the same client (inside `conn.with_session_token():`) carry a `VGI-Session` header that resolves back to the object as `ctx.session`. Eviction is TTL-driven (default 300s, override per-call via `ttl=`) or explicit (`ctx.close_session()`); `state.close()` is invoked on eviction if defined. The framework serializes concurrent calls on the same session via a per-session `RLock`; different sessions run in parallel. Misroute / expiry / cross-principal token surfaces as `SessionLostError` (typed, `error_kind="session_lost"`); drain-time opens surface as `ServerDrainingError`. Client uses `conn.with_session_token() as sess:` (auto-sends `VGI-Session-Accept: true` for the server's leak-prevention guard) and `sess.detach()` to stash a token for later resumption without firing the exit-time best-effort `DELETE /vgi/__session__`. Sticky machinery is not installed on pipe / subprocess / unix transports; `ctx.open_session` raises `RuntimeError` there. **Echo headers** (`sticky_echo_headers=` kwarg on `make_wsgi_app`): tell the client to replay arbitrary headers on every subsequent request in the session — emitted as `VGI-Echo-<name>: <value>` on the session-opening response, captured + replayed by the client view, exposed via `sess.current_echo_headers()`. Used for client-driven routing on platforms like Fly.io; `vgi_rpc.http.fly.fly_sticky_echo_headers()` + `vgi_rpc.http.fly.auto_server_id()` are the ~25-line Fly quickstart helpers (return `None` off Fly so the same code works everywhere). **Graceful drain**: `drain_handle(app)` returns `DrainHandle(drain, shutdown, is_draining)` for operator-facing shutdown wiring; `serve_http(enable_sticky=True)` auto-installs SIGTERM/SIGINT handlers that drain → wait `drain_grace_seconds` → invoke `state.close()` on every live session → exit. Pre-fork servers (gunicorn) use `drain_handle(app)` inside `worker_exit` hooks. **Access log** carries `session_id` (24-char hex) + `session_action` (`none`/`open`/`resume`/`close`) on sticky-touching records; both absent on non-sticky servers. Full spec: `docs/sticky-sessions-spec.md`. Cross-language conformance group: `TestSticky` in `vgi_rpc/conformance/_pytest_suite.py`, capability-gated on `VGI-Sticky-Enabled`; `TestSticky::test_echo_header_round_trip` further capability-gated on `VGI-Sticky-Echo-Headers`; `TestSticky::test_drain_rejects_new_opens` further capability-gated on the conformance server exposing `POST /__test_drain__` admin endpoint.

## Code Style

Expand Down
11 changes: 11 additions & 0 deletions docs/access-log-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,17 @@ These six fields appear together. Implementations MAY omit the entire group, but
| `input_bytes` | integer | Sum of `RecordBatch.nbytes` across input batches (uncompressed in-memory size). |
| `output_bytes` | integer | Sum of `RecordBatch.nbytes` across output batches. |

### 4.7 Sticky session lifecycle

When the request flows through a sticky-enabled HTTP transport (see [`sticky-sessions-spec.md`](sticky-sessions-spec.md)), the access record carries two additional fields describing the session lifecycle.

| Field | Type | Condition |
|---|---|---|
| `session_action` | enum | One of `"none"` / `"open"` / `"resume"` / `"close"`. `"none"` = the request flowed through sticky middleware but neither carried a session token nor opened one (e.g. a unary call from a non-`with_session_token()` caller). `"open"` = the method called `ctx.open_session(...)`. `"resume"` = a valid `VGI-Session` token resolved to a live registry entry. `"close"` = the method called `ctx.close_session()`. Absent for non-sticky servers. |
| `session_id` | string | Present when the request touched a session — i.e. when `session_action` is `"open"` / `"resume"` / `"close"`. Format: 12-byte hex, exactly 24 characters. Absent on `"none"` and on non-sticky servers. The id is stable across the open / resume / close lifecycle records for a given session. |

**Gaps**: middleware-short-circuit cases (token validation failed; `server_id` mismatch; registry miss for an apparently-valid token) currently do NOT produce access-log records. The middleware emits a typed `SessionLostError` response without invoking dispatch, and the access-log emitter lives in the dispatch path. Operators monitoring for misroutes should rely on the typed error surface on the wire instead. Adding short-circuit access-log records is a documented follow-up.

## 5. Method-type rules

All conditional behavior is keyed off `method_type` (and, for streams, whether the record is an init or continuation — distinguishable by the presence of `request_data`). **Rules MUST NOT be keyed off method names.** Method names are application-specific; framework conformance applies uniformly.
Expand Down
4 changes: 4 additions & 0 deletions docs/porting-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,8 @@ If a port claims sticky support it MUST also implement the three sticky conforma

**Echo headers** (`VGI-Echo-<name>` response headers / `VGI-Sticky-Echo-Headers` capability advert) are a sub-feature; ports that don't implement them stay conformant on `TestSticky` core but skip `TestSticky::test_echo_header_round_trip` cleanly (the test is capability-gated on `VGI-Sticky-Echo-Headers`). Implementing them unlocks zero-LB-config deployments on Fly.io (`fly-force-instance-id`) and any other platform with header-based proactive routing. See [`vgi_rpc/http/fly.py`](https://github.com/Query-farm/vgi-rpc-python/blob/main/vgi_rpc/http/fly.py) for the Python Fly quickstart helpers — a ~25-line module that other ports can mirror directly.

**Graceful drain** (`drain_handle(app)` operator API / `POST /__test_drain__` conformance admin endpoint) is similarly a sub-feature. The canonical `TestSticky::test_drain_rejects_new_opens` is capability-gated on the presence of the admin endpoint — ports that don't expose it skip cleanly. Implementing drain on the server side means: a per-worker drain flag observable by the sticky middleware so `ctx.open_session` raises `error_kind="server_draining"` while the flag is set; an operator-facing equivalent of `drain_handle(app)` so SIGTERM handlers / pre-fork worker-exit hooks can wire shutdown; and a `POST /__test_drain__` admin endpoint on the conformance server (and `DELETE` to clear, so the same fixture can run multiple conformance passes). See [`tests/serve_conformance_http.py`](https://github.com/Query-farm/vgi-rpc-python/blob/main/tests/serve_conformance_http.py)'s `_TestDrainResource` for the ~10-line Python reference.

**Access-log fields** `session_id` and `session_action` (see [`docs/access-log-spec.md`](access-log-spec.md) §4.7) are required for any port that emits the `vgi_rpc.access` log AND advertises `VGI-Sticky-Enabled: true`. Ports without sticky support omit both fields (they're absent, not null).

Recognising the two new error kinds is the **minimum** any port should do: even ports that have no sticky implementation may end up talking to a Python sticky server in the wild, and a typed exception is much friendlier than a flat `RpcError` whose meaning the caller has to grep out of the message text.
38 changes: 35 additions & 3 deletions docs/sticky-sessions-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,43 @@ Cross-language clients MUST recognize `error_kind="session_lost"` and `error_kin

## 7. Graceful drain

`RpcServer.drain()` sets a process-local flag. While drained:
The framework exposes a per-worker drain flag via the operator-facing :func:`vgi_rpc.http.drain_handle` helper:

```python
from vgi_rpc.http import drain_handle, make_wsgi_app

app = make_wsgi_app(server, enable_sticky=True)
handle = drain_handle(app) # returns None when sticky is disabled
if handle is not None:
handle.drain() # flip the drain flag
# ... wait for in-flight sessions to complete ...
handle.shutdown() # invoke state.close() on every live session
```

While the flag is set:

- `ctx.open_session` raises `ServerDrainingError`. Existing-session calls continue to serve until TTL or explicit close.
- A graceful shutdown SHOULD call `drain()` first, wait for in-flight requests to complete (operator-controlled timeout), then tear the WSGI app down.
- WSGI app teardown invokes `state.close()` on every live registry entry.
- `handle.shutdown()` invokes `state.close()` on every live registry entry — used by operators when their grace period elapses.

`serve_http` ships with a built-in graceful-shutdown handler that wires SIGTERM / SIGINT to this flow automatically. Pass `drain_grace_seconds=30.0` (default) to control how long the framework waits between flipping the flag and forcibly exiting. A second signal during grace skips the wait and exits immediately.

For pre-fork servers (gunicorn, uwsgi) operators wire their own hook against `drain_handle(app)`:

```python
# gunicorn config (gunicorn.conf.py)
import time
from vgi_rpc.http import drain_handle

def worker_exit(server, worker):
"""gunicorn calls this when a worker is being retired."""
handle = drain_handle(worker.app.callable) # the WSGI app
if handle is not None:
handle.drain()
time.sleep(30) # grace period — tune for your workload
handle.shutdown()
```

The drain flag is per-worker process (it lives in the per-worker `_SessionRegistry`); pre-fork deployments effectively get one drain cycle per worker.

## 8. Crash semantics

Expand Down
51 changes: 50 additions & 1 deletion tests/serve_conformance_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,56 @@
import socket
import sys

import falcon

from vgi_rpc.conformance import ConformanceService, ConformanceServiceImpl
from vgi_rpc.external import Compression, ExternalLocationConfig
from vgi_rpc.http import make_wsgi_app, serve_http
from vgi_rpc.http import DrainHandle, drain_handle, make_wsgi_app, serve_http
from vgi_rpc.rpc import RpcServer


class _TestDrainResource:
"""Test-only admin endpoint at ``/__test_drain__``.

Lets the canonical ``TestSticky::test_drain_rejects_new_opens`` test
flip the sticky registry's drain flag over the wire (without sending
SIGTERM, which would kill the subprocess fixture). NOT installed in
production :func:`vgi_rpc.http.make_wsgi_app` — this is purely a
conformance-fixture concern.

Supports:
* ``POST`` — set the drain flag (subsequent ``open_session`` calls raise ``ServerDrainingError``).
* ``DELETE`` — clear the drain flag (so subsequent tests in the same fixture session aren't poisoned).

Both are idempotent and return 204.
No-op if the server isn't sticky-enabled.
"""

__slots__ = ("_handle",)

def __init__(self, handle: DrainHandle | None) -> None:
self._handle = handle

def on_post(self, req: falcon.Request, resp: falcon.Response) -> None:
"""Set the drain flag; idempotent."""
if self._handle is not None:
self._handle.drain()
resp.status = falcon.HTTP_204

def on_delete(self, req: falcon.Request, resp: falcon.Response) -> None:
"""Clear the drain flag; idempotent."""
# Reach through the handle's drain closure to find the registry.
# The DrainHandle dataclass exposes is_draining + drain + shutdown
# but not set_draining(False) — that's a deliberate operator-API
# choice (production deployments only ever drain, never undrain).
# Tests need the reverse so they don't poison the fixture.
if self._handle is not None:
# Walk to the underlying _SessionRegistry via the bound shutdown method.
registry = self._handle.shutdown.__self__ # type: ignore[attr-defined]
registry.set_draining(False)
resp.status = falcon.HTTP_204


def main() -> None:
"""Start an HTTP server for the conformance service."""
parser = argparse.ArgumentParser()
Expand Down Expand Up @@ -113,6 +157,9 @@ def main() -> None:
enable_sticky=True,
sticky_echo_headers=sticky_echo_headers,
)
# Test-only admin endpoint so canonical conformance tests can
# trigger drain over the wire without sending SIGTERM.
app.add_route("/__test_drain__", _TestDrainResource(drain_handle(app)))
try:
import waitress
except ImportError:
Expand Down Expand Up @@ -157,6 +204,8 @@ def main() -> None:
enable_sticky=enable_sticky,
sticky_echo_headers=sticky_echo_headers,
)
# Test-only admin endpoint (see _TestDrainResource for rationale).
app.add_route("/__test_drain__", _TestDrainResource(drain_handle(app)))

try:
import waitress
Expand Down
Loading