diff --git a/CLAUDE.md b/CLAUDE.md index cafce32..3bf4d15 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -122,7 +122,7 @@ For HTTP transport, the wire protocol maps to separate endpoints: `POST /vgi/{me **Error propagation**: Server exceptions become zero-row batches with error metadata; clients receive `RpcError` with `error_type`, `error_message`, and `remote_traceback`. The transport stays clean for subsequent requests. -**Sticky sessions (HTTP, opt-in)**: With `make_wsgi_app(enable_sticky=True)`, a method body may call `ctx.open_session(state)` to register a Python object (DB cursor, model handle, file handle) in a per-worker registry; subsequent calls from the same client (inside `conn.with_session_token():`) carry a `VGI-Session` header that resolves back to the object as `ctx.session`. Eviction is TTL-driven (default 300s, override per-call via `ttl=`) or explicit (`ctx.close_session()`); `state.close()` is invoked on eviction if defined. The framework serializes concurrent calls on the same session via a per-session `RLock`; different sessions run in parallel. Misroute / expiry / cross-principal token surfaces as `SessionLostError` (typed, `error_kind="session_lost"`); drain-time opens surface as `ServerDrainingError`. Client uses `conn.with_session_token() as sess:` (auto-sends `VGI-Session-Accept: true` for the server's leak-prevention guard) and `sess.detach()` to stash a token for later resumption without firing the exit-time best-effort `DELETE /vgi/__session__`. Sticky machinery is not installed on pipe / subprocess / unix transports; `ctx.open_session` raises `RuntimeError` there. Full spec: `docs/sticky-sessions-spec.md`. Cross-language conformance group: `TestSticky` in `vgi_rpc/conformance/_pytest_suite.py`, capability-gated on `VGI-Sticky-Enabled`. +**Sticky sessions (HTTP, opt-in)**: With `make_wsgi_app(enable_sticky=True)`, a method body may call `ctx.open_session(state)` to register a Python object (DB cursor, model handle, file handle) in a per-worker registry; subsequent calls from the same client (inside `conn.with_session_token():`) carry a `VGI-Session` header that resolves back to the object as `ctx.session`. Eviction is TTL-driven (default 300s, override per-call via `ttl=`) or explicit (`ctx.close_session()`); `state.close()` is invoked on eviction if defined. The framework serializes concurrent calls on the same session via a per-session `RLock`; different sessions run in parallel. Misroute / expiry / cross-principal token surfaces as `SessionLostError` (typed, `error_kind="session_lost"`); drain-time opens surface as `ServerDrainingError`. Client uses `conn.with_session_token() as sess:` (auto-sends `VGI-Session-Accept: true` for the server's leak-prevention guard) and `sess.detach()` to stash a token for later resumption without firing the exit-time best-effort `DELETE /vgi/__session__`. Sticky machinery is not installed on pipe / subprocess / unix transports; `ctx.open_session` raises `RuntimeError` there. **Echo headers** (`sticky_echo_headers=` kwarg on `make_wsgi_app`): tell the client to replay arbitrary headers on every subsequent request in the session — emitted as `VGI-Echo-: ` on the session-opening response, captured + replayed by the client view, exposed via `sess.current_echo_headers()`. Used for client-driven routing on platforms like Fly.io; `vgi_rpc.http.fly.fly_sticky_echo_headers()` + `vgi_rpc.http.fly.auto_server_id()` are the ~25-line Fly quickstart helpers (return `None` off Fly so the same code works everywhere). Full spec: `docs/sticky-sessions-spec.md`. Cross-language conformance group: `TestSticky` in `vgi_rpc/conformance/_pytest_suite.py`, capability-gated on `VGI-Sticky-Enabled`; `TestSticky::test_echo_header_round_trip` further capability-gated on `VGI-Sticky-Echo-Headers`. ## Code Style diff --git a/docs/api/http.md b/docs/api/http.md index 6885a68..bb2dfd6 100644 --- a/docs/api/http.md +++ b/docs/api/http.md @@ -123,6 +123,32 @@ The block's exit fires a best-effort `DELETE /vgi/__session__` so handle-bearing **HTTP-only.** Sticky machinery is not installed on pipe/subprocess/unix transports — those run as single processes where "sticky" is meaningless. `ctx.open_session` raises `RuntimeError("sticky sessions not available on this transport")` if called over a non-HTTP transport, so apps can detect-and-fall-back. +#### Client-driven routing via echo headers + +Sticky LBs are not the only way to get a session-token-carrying request back to the worker that owns the session. With **echo headers**, the server tells the client (at session-open time) to attach an arbitrary set of headers on every subsequent request in the session, and the platform's edge proxy routes on those headers. Two helpers ship for [Fly.io](https://fly.io), where `fly-force-instance-id` is the proactive routing header `fly-proxy` honours: + +```python +from vgi_rpc import RpcServer +from vgi_rpc.http import make_wsgi_app +from vgi_rpc.http.fly import auto_server_id, fly_sticky_echo_headers + +server = RpcServer( + MyService, MyServiceImpl(), + server_id=auto_server_id(), # ⇒ FLY_MACHINE_ID on Fly, random elsewhere +) +app = make_wsgi_app( + server, + enable_sticky=True, + sticky_echo_headers=fly_sticky_echo_headers(), # ⇒ {"fly-force-instance-id": } on Fly, None elsewhere +) +``` + +On Fly the server emits `VGI-Echo-fly-force-instance-id: ` on session-opening responses; the client captures it and replays `fly-force-instance-id: ` on every subsequent request in the session; fly-proxy routes directly to the owning Machine. No LB configuration required. + +Off Fly the helpers return `None` so the same code is a no-op — operators don't need conditional branches. + +Generic API (for non-Fly platforms): pass any `dict[str, str]` as `sticky_echo_headers` and the server will emit them as `VGI-Echo-` on the session-opening response. The client's `with_session_token()` view captures + replays automatically; `sess.current_echo_headers()` exposes the captured map for inspection or stashing. + ## API Reference ### Server diff --git a/docs/porting-guide.md b/docs/porting-guide.md index 8687a38..58b3b42 100644 --- a/docs/porting-guide.md +++ b/docs/porting-guide.md @@ -195,9 +195,11 @@ ignore the budget. HTTP sticky sessions are an **opt-in additive feature** layered on top of the stateless HTTP transport. The full spec lives at [`docs/sticky-sessions-spec.md`](sticky-sessions-spec.md). A port may choose to: - **Skip sticky entirely.** The canonical `TestSticky` conformance group is capability-gated on the `VGI-Sticky-Enabled` header; ports that don't advertise it skip every test in the group cleanly. The non-sticky wire path is byte-identical for both implementations, so the rest of the conformance suite still passes. -- **Implement the client side only.** A client running against a Python sticky server needs to (1) recognize `error_kind="session_lost"` / `error_kind="server_draining"` on EXCEPTION-level batches and surface them as typed exceptions, (2) optionally implement a `with_session_token()`-equivalent that sends `VGI-Session-Accept: true` + `VGI-Session: ` on every request inside a scope and captures `VGI-Session` / `VGI-Session-Close: true` from responses. The cookie-jar avoidance is intentional — header-only multiplexes concurrent sessions cleanly. -- **Implement the full server side.** Port `_StickyMiddleware` (per-worker registry + reaper thread + token sealing), the `DELETE /vgi/__session__` resource (idempotent, principal-bound), and the `ctx.open_session` / `ctx.close_session` runtime API. The session token format from the spec is language-neutral: `created_at:u64 | server_id_len:u8 | server_id | session_id:bytes(12) | expires_at:u64`, AEAD-sealed with the same AAD shape used by stream tokens. +- **Implement the client side only.** A client running against a Python sticky server needs to (1) recognize `error_kind="session_lost"` / `error_kind="server_draining"` on EXCEPTION-level batches and surface them as typed exceptions, (2) optionally implement a `with_session_token()`-equivalent that sends `VGI-Session-Accept: true` + `VGI-Session: ` on every request inside a scope, captures `VGI-Session` / `VGI-Session-Close: true` from responses, and captures + replays any `VGI-Echo-` response headers (case-insensitive, prefix-stripped) on subsequent requests in the same session. The cookie-jar avoidance is intentional — header-only multiplexes concurrent sessions cleanly. +- **Implement the full server side.** Port `_StickyMiddleware` (per-worker registry + reaper thread + token sealing + optional echo-header emission), the `DELETE /vgi/__session__` resource (idempotent, principal-bound), and the `ctx.open_session` / `ctx.close_session` runtime API. The session token format from the spec is language-neutral: `created_at:u64 | server_id_len:u8 | server_id | session_id:bytes(12) | expires_at:u64`, AEAD-sealed with the same AAD shape used by stream tokens. If a port claims sticky support it MUST also implement the three sticky conformance methods (`open_counter`, `increment_counter`, `close_counter`) on its `ConformanceService` implementation, so the canonical `TestSticky` group has something to exercise. Servers that advertise `VGI-Sticky-Enabled: true` but fail `TestSticky` are non-conformant. +**Echo headers** (`VGI-Echo-` response headers / `VGI-Sticky-Echo-Headers` capability advert) are a sub-feature; ports that don't implement them stay conformant on `TestSticky` core but skip `TestSticky::test_echo_header_round_trip` cleanly (the test is capability-gated on `VGI-Sticky-Echo-Headers`). Implementing them unlocks zero-LB-config deployments on Fly.io (`fly-force-instance-id`) and any other platform with header-based proactive routing. See [`vgi_rpc/http/fly.py`](https://github.com/Query-farm/vgi-rpc-python/blob/main/vgi_rpc/http/fly.py) for the Python Fly quickstart helpers — a ~25-line module that other ports can mirror directly. + Recognising the two new error kinds is the **minimum** any port should do: even ports that have no sticky implementation may end up talking to a Python sticky server in the wild, and a typed exception is much friendlier than a flat `RpcError` whose meaning the caller has to grep out of the message text. diff --git a/docs/sticky-sessions-spec.md b/docs/sticky-sessions-spec.md index ee622d9..cf0295b 100644 --- a/docs/sticky-sessions-spec.md +++ b/docs/sticky-sessions-spec.md @@ -36,8 +36,23 @@ When `enable_sticky=True`, the server MUST advertise these on every response (ch |---|---|---| | `VGI-Sticky-Enabled` | `"true"` | Discovery flag; absent or `"false"` on non-sticky servers. | | `VGI-Sticky-Default-TTL` | integer seconds | The TTL applied by `ctx.open_session` when its `ttl` argument is `None`. Operator-tunable via `sticky_default_ttl`. | +| `VGI-Sticky-Echo-Headers` | comma-separated header names | Headers the client must replay on every subsequent request in the session — see §2.5. Absent when `sticky_echo_headers` is unset. | -### 2.4 Framework-managed endpoints +### 2.4 Echo headers (`VGI-Echo-*`) + +When the server is configured with `sticky_echo_headers={name: value, ...}`, every session-opening response (the response carrying the `VGI-Session` token) also carries `VGI-Echo-: ` for each configured pair. The client MUST: + +1. Capture each `VGI-Echo-` header on the response (case-insensitive lookup). +2. Strip the `VGI-Echo-` prefix. +3. Send the inner header (`: `) on every subsequent request inside the same session view, until the server emits `VGI-Session-Close: true` (which clears the captured echo headers alongside the token). + +Echo headers are emitted **once-only**, on the session-opening response. Subsequent responses MUST NOT re-emit them. Clients hold the captured map for the lifetime of the session view. + +The primary use case is **client-driven routing**: on Fly.io the server emits `VGI-Echo-fly-force-instance-id: `, the client sends `fly-force-instance-id: ` on every subsequent request, and fly-proxy routes directly to the owning Machine without any LB configuration. Other platforms with similar header-based routing (Railway, custom Envoy filters) work identically — only the header name and value change. + +Echo headers carry no security guarantees beyond what the underlying transport provides; in particular they are NOT bound to the session token via AAD. A misbehaving client could echo a different header value than the server told it to. The contract assumes cooperative clients — the feature exists to make sticky routing *work*, not to enforce it. + +### 2.5 Framework-managed endpoints `DELETE {prefix}/__session__` — idempotent best-effort session teardown. @@ -115,8 +130,7 @@ vgi-rpc-test --url http:// --filter "Sticky::*" The group is capability-gated: servers without `VGI-Sticky-Enabled: true` skip every test in the group cleanly. The Python implementation passes all tests; cross-language ports that wire up sticky support must pass them too. See [`docs/porting-guide.md`](porting-guide.md) for the full porting checklist. -## 10. Out of scope for v1 +## 10. Out of scope -- **Cookie emission.** AWS ALB application-based stickiness and CloudFront sticky sessions both require a cookie set by the application. Operators on those platforms can front with Envoy / NGINX (header-hash policies on `VGI-Session`) or switch to NLB (flow-hash). Cookie emission can be added as an additive operator flag in a follow-up without changing the v1 wire surface. -- **Client-driven routing (echo headers).** A second PR will add server-side `sticky_echo_headers` config that tells the client to echo arbitrary routing headers on subsequent requests in the same session — enabling Fly.io's `fly-force-instance-id` and similar mechanisms. The current PR is the foundation; the echo-header layer composes on top without changes to existing wire contracts. +- **Cookie emission.** AWS ALB application-based stickiness and CloudFront sticky sessions both require a cookie set by the application. Operators on those platforms can front with Envoy / NGINX (header-hash policies on `VGI-Session`) or switch to NLB (flow-hash). Cookie emission can be added as an additive operator flag in a follow-up without changing the wire surface. - **Pluggable session store.** Sessions hold live Python objects in-process. Redis-style external stores are explicitly excluded — they don't work for the cursor/handle pattern the feature is designed for, and the additional persistence story would compete with the well-defined "TTL eviction + crash = state lost" contract. diff --git a/tests/serve_conformance_http.py b/tests/serve_conformance_http.py index 24f0612..50a88fc 100644 --- a/tests/serve_conformance_http.py +++ b/tests/serve_conformance_http.py @@ -65,9 +65,27 @@ def main() -> None: default=False, help="Disable sticky sessions. Default: enabled, so TestSticky conformance group runs.", ) + parser.add_argument( + "--no-sticky-echo", + action="store_true", + default=False, + help=( + "Disable the canonical sticky-echo-header advertisement. " + "Default: enabled with a fixed marker header so TestSticky::" + "test_echo_header_round_trip exercises the contract." + ), + ) args = parser.parse_args() enable_sticky = not args.no_sticky + # Fixed marker the canonical TestSticky::test_echo_header_round_trip + # captures + replays. Operators wiring up real deployments use + # vgi_rpc.http.fly.fly_sticky_echo_headers() or their own mapping — + # this constant exists only to give the conformance group a stable + # contract to exercise. + sticky_echo_headers: dict[str, str] | None = ( + None if args.no_sticky_echo or not enable_sticky else {"x-vgi-conformance-echo": "conformance-fixed-marker"} + ) if not args.fake_storage: # Plain HTTP server, no external storage. @@ -90,7 +108,11 @@ def main() -> None: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((args.host, 0)) port = int(s.getsockname()[1]) - app = make_wsgi_app(server, enable_sticky=True) + app = make_wsgi_app( + server, + enable_sticky=True, + sticky_echo_headers=sticky_echo_headers, + ) try: import waitress except ImportError: @@ -133,6 +155,7 @@ def main() -> None: max_request_bytes=max_request_bytes, max_upload_bytes=64 * 1024 * 1024, enable_sticky=enable_sticky, + sticky_echo_headers=sticky_echo_headers, ) try: diff --git a/tests/test_conformance_http_sticky.py b/tests/test_conformance_http_sticky.py index ac06aa4..4dbe2fd 100644 --- a/tests/test_conformance_http_sticky.py +++ b/tests/test_conformance_http_sticky.py @@ -16,7 +16,7 @@ import time import warnings from concurrent.futures import ThreadPoolExecutor -from typing import TYPE_CHECKING, Any, Protocol +from typing import TYPE_CHECKING, Any, Protocol, cast import pytest @@ -363,3 +363,180 @@ def test_session_header_constants_exist() -> None: """Sanity: the session-header constants exported from ``vgi_rpc.http._common`` are stable strings.""" assert SESSION_HEADER == "VGI-Session" assert SESSION_ACCEPT_HEADER == "VGI-Session-Accept" + + +# --------------------------------------------------------------------------- +# Echo headers (PR2) +# --------------------------------------------------------------------------- + + +@pytest.fixture +def echo_sticky_client() -> Iterator[_SyncTestClient]: + """Sticky-enabled client configured with a marker echo header. + + Used for tests that exercise the server emission + client capture/ + replay round-trip end-to-end. The marker name is intentionally + multi-token so case-insensitivity bugs surface. + """ + server = RpcServer(ConformanceService, ConformanceServiceImpl()) + client = make_sync_client( + server, + enable_sticky=True, + sticky_default_ttl=10.0, + sticky_echo_headers={"x-test-echo-marker": "captured-by-client"}, + ) + try: + yield client + finally: + client.close() + + +class TestEchoHeadersServer: + """Server-side echo-header emission contract.""" + + def test_emitted_on_session_open(self, echo_sticky_client: _SyncTestClient) -> None: + """Server emits ``VGI-Echo-: `` on the session-opening response.""" + from vgi_rpc.http._common import ECHO_HEADER_PREFIX + + seen_echo_headers: list[dict[str, str]] = [] + # Patch the capture callback so we can inspect what the inner test + # client returned BEFORE the view consumes the response. + from vgi_rpc.http import _client as _hc + + orig_capture = _hc._SessionTrackingClient._capture + + def patched(self: _hc._SessionTrackingClient, resp: Any) -> None: + seen_echo_headers.append( + { + k[len(ECHO_HEADER_PREFIX) :]: v + for k, v in resp.headers.items() + if k.lower().startswith(ECHO_HEADER_PREFIX.lower()) + } + ) + orig_capture(self, resp) + + _hc._SessionTrackingClient._capture = patched # type: ignore[method-assign] + try: + with _connect(echo_sticky_client) as proxy, proxy.with_session_token() as sess: + sess.open_counter(initial=1) + sess.increment_counter(by=1) # follow-up response should NOT carry echo + finally: + _hc._SessionTrackingClient._capture = orig_capture # type: ignore[method-assign] + # First captured response is the open: must have the echo header. + assert seen_echo_headers[0] == {"x-test-echo-marker": "captured-by-client"}, ( + f"open response must carry VGI-Echo-x-test-echo-marker; saw {seen_echo_headers[0]!r}" + ) + # Subsequent responses do NOT carry the echo header (once-only emission). + assert seen_echo_headers[1] == {}, ( + f"subsequent responses must NOT carry VGI-Echo-* (echo is once-only); saw {seen_echo_headers[1]!r}" + ) + + def test_absent_when_unconfigured(self, sticky_client: _SyncTestClient) -> None: + """Server with no ``sticky_echo_headers`` emits no ``VGI-Echo-*`` headers and no capability advert.""" + caps = http_capabilities(client=sticky_client) + got = caps.sticky_echo_headers + assert got == (), f"sticky-enabled-but-no-echo server must advertise empty echo-headers tuple; got {got!r}" + + def test_capability_lists_configured_names(self, echo_sticky_client: _SyncTestClient) -> None: + """``VGI-Sticky-Echo-Headers`` lists the configured header names; surfaces in capabilities.""" + caps = http_capabilities(client=echo_sticky_client) + assert caps.sticky_echo_headers == ("x-test-echo-marker",) + + +class TestEchoHeadersClient: + """Client-side capture + replay contract.""" + + def test_current_echo_headers_populated_after_open(self, echo_sticky_client: _SyncTestClient) -> None: + """After ``open_counter``, ``view.current_echo_headers()`` returns the marker dict.""" + with _connect(echo_sticky_client) as proxy, proxy.with_session_token() as sess: + assert dict(sess.current_echo_headers()) == {}, ( + "no echo headers should be captured before the first session-opening call" + ) + sess.open_counter(initial=1) + assert dict(sess.current_echo_headers()) == {"x-test-echo-marker": "captured-by-client"} + + def test_replay_on_subsequent_requests(self, echo_sticky_client: _SyncTestClient) -> None: + """Captured echo headers ride on every subsequent request inside the same block.""" + captured_request_headers: list[dict[str, str]] = [] + + from vgi_rpc.http import _client as _hc + + orig_merge = _hc._SessionTrackingClient._merge_headers + + def patched(self: _hc._SessionTrackingClient, headers: dict[str, str] | None) -> dict[str, str]: + merged = orig_merge(self, headers) + captured_request_headers.append(dict(merged)) + return merged + + _hc._SessionTrackingClient._merge_headers = patched # type: ignore[method-assign] + try: + with _connect(echo_sticky_client) as proxy, proxy.with_session_token() as sess: + sess.open_counter(initial=1) + sess.increment_counter(by=1) + sess.increment_counter(by=1) + finally: + _hc._SessionTrackingClient._merge_headers = orig_merge # type: ignore[method-assign] + # Open call has no echo header on the way out (it's the FIRST call; + # the server's echo header lands on the *response*). + assert "x-test-echo-marker" not in captured_request_headers[0] + # Every subsequent request carries the captured echo header. + for i, hdrs in enumerate(captured_request_headers[1:], start=1): + assert hdrs.get("x-test-echo-marker") == "captured-by-client", ( + f"request #{i} must carry the captured echo header; got {hdrs!r}" + ) + + def test_close_clears_echo_headers(self, echo_sticky_client: _SyncTestClient) -> None: + """``VGI-Session-Close: true`` from the server clears the captured echo headers too.""" + with _connect(echo_sticky_client) as proxy, proxy.with_session_token() as sess: + sess.open_counter(initial=1) + assert dict(sess.current_echo_headers()) == {"x-test-echo-marker": "captured-by-client"} + sess.close_counter() + assert dict(sess.current_echo_headers()) == {}, ( + "close_session must clear captured echo headers alongside the token" + ) + + def test_current_echo_headers_returns_readonly_snapshot(self, echo_sticky_client: _SyncTestClient) -> None: + """``current_echo_headers()`` returns a read-only mapping; caller can't mutate view state.""" + with _connect(echo_sticky_client) as proxy, proxy.with_session_token() as sess: + sess.open_counter(initial=1) + snapshot = sess.current_echo_headers() + # MappingProxyType is read-only — writing raises TypeError. + with pytest.raises(TypeError): + # Cast to ignore the type-system disagreement; the runtime + # behaviour is what's being verified. + cast("dict[str, str]", snapshot)["x-test-echo-marker"] = "modified" + # Inner state unaffected. + assert dict(sess.current_echo_headers()) == {"x-test-echo-marker": "captured-by-client"} + + +class TestFlyHelper: + """``vgi_rpc.http.fly`` quickstart helpers.""" + + def test_auto_server_id_off_fly(self, monkeypatch: pytest.MonkeyPatch) -> None: + """``auto_server_id()`` returns ``None`` when ``FLY_MACHINE_ID`` is unset.""" + monkeypatch.delenv("FLY_MACHINE_ID", raising=False) + # Force reimport so module-level FLY_MACHINE_ID is recomputed. + import importlib + + from vgi_rpc.http import fly + + importlib.reload(fly) + assert fly.auto_server_id() is None + assert fly.fly_sticky_echo_headers() is None + + def test_auto_server_id_on_fly(self, monkeypatch: pytest.MonkeyPatch) -> None: + """``auto_server_id()`` returns the machine ID when ``FLY_MACHINE_ID`` is set.""" + monkeypatch.setenv("FLY_MACHINE_ID", "machine-test-abc123") + import importlib + + from vgi_rpc.http import fly + + importlib.reload(fly) + assert fly.auto_server_id() == "machine-test-abc123" + assert fly.fly_sticky_echo_headers() == {"fly-force-instance-id": "machine-test-abc123"} + + def test_module_exports(self) -> None: + """``vgi_rpc.http.fly.__all__`` lists the documented surface.""" + from vgi_rpc.http import fly + + assert set(fly.__all__) == {"FLY_MACHINE_ID", "auto_server_id", "fly_sticky_echo_headers"} diff --git a/vgi_rpc/conformance/_pytest_suite.py b/vgi_rpc/conformance/_pytest_suite.py index fa892c1..5e91e34 100644 --- a/vgi_rpc/conformance/_pytest_suite.py +++ b/vgi_rpc/conformance/_pytest_suite.py @@ -2053,3 +2053,46 @@ def test_capabilities_advertised(self, conformance_http_port: int) -> None: assert caps.sticky_enabled is True # Default TTL is operator-tunable. Conformance just requires it be advertised as a positive int. assert caps.sticky_default_ttl is not None and caps.sticky_default_ttl > 0 + + def test_echo_header_round_trip(self, conformance_http_port: int) -> None: + """Echo headers advertised by the server are captured + replayed by a conformant client. + + Capability-gated on ``VGI-Sticky-Echo-Headers`` so deployments + without echo-header support skip cleanly. The conformance server + is configured with a fixed marker echo header + (``x-vgi-conformance-echo: conformance-fixed-marker``) so this + test has a stable contract to exercise; real deployments + substitute their own (e.g. ``fly-force-instance-id`` on Fly). + + Cross-language ports that implement sticky must implement the + same capture/replay contract to pass this test — see + ``docs/sticky-sessions-spec.md`` for the ``VGI-Echo-`` + wire shape. + """ + self._skip_unless_sticky(conformance_http_port) + from vgi_rpc.http import http_capabilities + + caps = http_capabilities(base_url=f"http://127.0.0.1:{conformance_http_port}") + if not caps.sticky_echo_headers: + pytest.skip( + "server doesn't advertise sticky echo headers — echo conformance N/A", + ) + expected_name = "x-vgi-conformance-echo" + assert expected_name in caps.sticky_echo_headers, ( + f"conformance server must advertise the {expected_name!r} echo header; got {caps.sticky_echo_headers!r}" + ) + with self._connect(conformance_http_port) as proxy, proxy.with_session_token() as sess: + # Opening a session must populate the captured echo headers + # via the VGI-Echo-* response header. We check the captured + # map directly (the contract surface) — the *next* call would + # carry the header on the wire, but verifying that requires + # server-side echo-back, which is out of scope for the + # conformance service. The capture proves the contract. + sess.open_counter(initial=1) + captured = dict(sess.current_echo_headers()) + assert expected_name in captured, ( + f"client must capture VGI-Echo-{expected_name} into current_echo_headers(); got {captured!r}" + ) + assert captured[expected_name] == "conformance-fixed-marker", ( + f"captured value must round-trip the server-configured marker; got {captured[expected_name]!r}" + ) diff --git a/vgi_rpc/http/_client.py b/vgi_rpc/http/_client.py index 21ca7c8..196fce2 100644 --- a/vgi_rpc/http/_client.py +++ b/vgi_rpc/http/_client.py @@ -13,11 +13,11 @@ import json import logging import re -from collections.abc import Callable, Iterator +from collections.abc import Callable, Iterator, Mapping from dataclasses import dataclass from http import HTTPStatus from io import BytesIO -from types import TracebackType +from types import MappingProxyType, TracebackType from typing import TYPE_CHECKING, Any, cast import httpx @@ -60,6 +60,7 @@ _SESSION_ENDPOINT, _UPLOAD_URL_METHOD, _UPLOAD_URL_PARAMS_SCHEMA, + ECHO_HEADER_PREFIX, EXTERNALIZATION_ENABLED_HEADER, MAX_EXTERNALIZED_RESPONSE_BYTES_HEADER, MAX_REQUEST_BYTES_HEADER, @@ -69,6 +70,7 @@ SESSION_CLOSE_HEADER, SESSION_HEADER, STICKY_DEFAULT_TTL_HEADER, + STICKY_ECHO_HEADERS_HEADER, STICKY_ENABLED_HEADER, SUPPORTED_ENCODINGS_HEADER, UPLOAD_URL_HEADER, @@ -1230,15 +1232,23 @@ def __getattr__(self, name: str) -> Any: return getattr(self._inner, name) def _merge_headers(self, headers: dict[str, str] | None) -> dict[str, str]: - """Add session opt-in + token headers to a per-request header dict.""" + """Add session opt-in + token + replayed echo headers to a per-request header dict.""" merged: dict[str, str] = dict(headers or {}) merged[SESSION_ACCEPT_HEADER] = "true" if self._view._token is not None: merged[SESSION_HEADER] = self._view._token + # Replay every echo header the server told us to carry through. + # The dict is populated lazily on the session-opening response; + # before that it's empty and the loop is a no-op. + for name, value in self._view._echo_headers.items(): + # Caller-supplied headers take precedence — operators MAY override + # an echo header per-call via the explicit headers kwarg if they + # have a reason to. The normal path leaves them untouched. + merged.setdefault(name, value) return merged def _capture(self, resp: Any) -> None: - """Update the view's session state from response headers.""" + """Update the view's session state and captured echo headers from response headers.""" try: hdrs = resp.headers except AttributeError: @@ -1246,9 +1256,19 @@ def _capture(self, resp: Any) -> None: token = hdrs.get(SESSION_HEADER) or hdrs.get(SESSION_HEADER.lower()) if token: self._view._token = token + # Capture VGI-Echo-* on every response (cheap; only emitted on session + # open, so subsequent responses are no-ops). httpx headers are + # case-insensitive but _SyncTestResponse stores lowercase — iterate + # over items() so we hit both shapes uniformly. + prefix_lower = ECHO_HEADER_PREFIX.lower() + for hname, hvalue in hdrs.items(): + name_lower = hname.lower() + if name_lower.startswith(prefix_lower): + self._view._echo_headers[hname[len(ECHO_HEADER_PREFIX) :]] = hvalue close_flag = hdrs.get(SESSION_CLOSE_HEADER) or hdrs.get(SESSION_CLOSE_HEADER.lower()) if (close_flag or "").strip().lower() == "true": self._view._token = None + self._view._echo_headers.clear() self._view._closed = True def post(self, url: str, **kwargs: Any) -> Any: @@ -1303,12 +1323,17 @@ class _SessionView: :meth:`current_session_token` for stashing across processes. """ - __slots__ = ("_closed", "_outer", "_proxy", "_token", "_tracking_client") + __slots__ = ("_closed", "_echo_headers", "_outer", "_proxy", "_token", "_tracking_client") def __init__(self, outer: _HttpProxy, initial_token: str | None) -> None: self._outer = outer self._token = initial_token self._closed = False + # Headers the server told us to echo on subsequent requests in this + # session. Populated by ``_SessionTrackingClient._capture`` from + # ``VGI-Echo-*`` response headers on the session-opening response. + # Caller-readable via :meth:`current_echo_headers`. + self._echo_headers: dict[str, str] = {} self._tracking_client = _SessionTrackingClient(outer._client, self) # The tracking client implements the same duck-typed surface that # _HttpProxy uses (post/get/options/delete/put + close + base_url @@ -1336,6 +1361,23 @@ def current_session_token(self) -> str | None: """ return self._token + def current_echo_headers(self) -> Mapping[str, str]: + """Return a read-only snapshot of headers the server told the client to echo. + + Populated by the server's ``VGI-Echo-: `` response + headers on the session-opening response — used for client-driven + routing (e.g. ``fly-force-instance-id`` on Fly.io). Empty before + the first session-opening call returns, and empty again after the + server emits ``VGI-Session-Close: true``. + + Callers stashing a token for later resumption SHOULD also stash + this mapping; without the echo headers the resumed session may + misroute on platforms that depend on client-supplied routing + (no `Set-Cookie` analogue exists for headers). + """ + # Return a copy so callers can't mutate our internal state. + return MappingProxyType(dict(self._echo_headers)) + def detach(self) -> str | None: """Hand the session token off to the caller; suppress the exit-time DELETE. @@ -1431,6 +1473,17 @@ class HttpServerCapabilities: """Whether the server has ``enable_sticky=True`` and supports ``VGI-Session``.""" sticky_default_ttl: int | None = None """Default session TTL in seconds when ``open_session`` is called without an explicit TTL.""" + sticky_echo_headers: tuple[str, ...] = () + """Header names the server tells the client to echo on every subsequent session request. + + Parsed from the comma-separated ``VGI-Sticky-Echo-Headers`` capability + header. Empty tuple when the server is sticky-enabled but has no + echo-header config (the default), or when the server is non-sticky. + Concrete values land on the ``_SessionView`` via captured + ``VGI-Echo-`` response headers on the session-opening response; + this field exposes the *names* for introspection (LB configuration, + cross-language client implementations). + """ def http_capabilities( @@ -1543,6 +1596,13 @@ def http_capabilities( with contextlib.suppress(ValueError): sticky_ttl = int(sticky_ttl_raw) + sticky_echo_raw = headers.get(STICKY_ECHO_HEADERS_HEADER) or headers.get(STICKY_ECHO_HEADERS_HEADER.lower()) + sticky_echo: tuple[str, ...] + if sticky_echo_raw: + sticky_echo = tuple(name.strip() for name in sticky_echo_raw.split(",") if name.strip()) + else: + sticky_echo = () + return HttpServerCapabilities( max_request_bytes=max_req, max_response_bytes=max_resp, @@ -1554,6 +1614,7 @@ def http_capabilities( cache_expires_at=cache_expires_at, sticky_enabled=sticky_enabled, sticky_default_ttl=sticky_ttl, + sticky_echo_headers=sticky_echo, ) finally: if own_client: diff --git a/vgi_rpc/http/_common.py b/vgi_rpc/http/_common.py index 3be559d..4cafd78 100644 --- a/vgi_rpc/http/_common.py +++ b/vgi_rpc/http/_common.py @@ -47,6 +47,14 @@ SESSION_CLOSE_HEADER = "VGI-Session-Close" STICKY_ENABLED_HEADER = "VGI-Sticky-Enabled" STICKY_DEFAULT_TTL_HEADER = "VGI-Sticky-Default-TTL" +STICKY_ECHO_HEADERS_HEADER = "VGI-Sticky-Echo-Headers" +"""Capability header listing the comma-separated names of headers a client must echo back.""" + +# Prefix the server uses to tell the client "echo this header on subsequent +# requests in this session". The client strips the prefix; e.g. on Fly the +# server emits ``VGI-Echo-fly-force-instance-id: `` and the +# client sends back ``fly-force-instance-id: ``. +ECHO_HEADER_PREFIX = "VGI-Echo-" # Framework-managed sticky session teardown endpoint. Parallel to the # synthetic __describe__ method but served by a dedicated Falcon resource; diff --git a/vgi_rpc/http/_testing.py b/vgi_rpc/http/_testing.py index 5a2e6d1..5b714ce 100644 --- a/vgi_rpc/http/_testing.py +++ b/vgi_rpc/http/_testing.py @@ -9,7 +9,7 @@ from __future__ import annotations -from collections.abc import Callable +from collections.abc import Callable, Mapping from typing import TYPE_CHECKING from urllib.parse import urlparse @@ -145,6 +145,7 @@ def make_sync_client( oauth_resource_metadata: OAuthResourceMetadata | None = None, enable_sticky: bool = False, sticky_default_ttl: float = 300.0, + sticky_echo_headers: Mapping[str, str] | None = None, ) -> _SyncTestClient: """Create a synchronous test client for an RpcServer. @@ -176,6 +177,7 @@ def make_sync_client( oauth_resource_metadata: See ``make_wsgi_app``. enable_sticky: See ``make_wsgi_app``. sticky_default_ttl: See ``make_wsgi_app``. + sticky_echo_headers: See ``make_wsgi_app``. Returns: A sync client that can be passed to ``http_connect(client=...)``. @@ -204,5 +206,6 @@ def make_sync_client( oauth_resource_metadata=oauth_resource_metadata, enable_sticky=enable_sticky, sticky_default_ttl=sticky_default_ttl, + sticky_echo_headers=sticky_echo_headers, ) return _SyncTestClient(app, default_headers=default_headers, prefix=prefix) diff --git a/vgi_rpc/http/fly.py b/vgi_rpc/http/fly.py new file mode 100644 index 0000000..e285f68 --- /dev/null +++ b/vgi_rpc/http/fly.py @@ -0,0 +1,97 @@ +# © Copyright 2025-2026, Query.Farm LLC - https://query.farm +# SPDX-License-Identifier: Apache-2.0 + +"""Fly.io quickstart helpers for HTTP sticky sessions. + +Fly.io routes traffic via Anycast to whatever Machine the edge proxy +(fly-proxy) picks; a request opening a sticky session may land on a +different Machine on the next call. ``fly-force-instance-id`` is the +proactive routing header fly-proxy honours: the client supplies the +target Machine ID, fly-proxy routes directly there. + +vgi-rpc's :ref:`sticky session echo-header mechanism ` +fits this exactly: at session-open time the server tells the client to +echo a given header on subsequent calls. On Fly, that header is +``fly-force-instance-id`` and the value is ``$FLY_MACHINE_ID``. + +Two helpers are provided: + +* :func:`auto_server_id` — read ``$FLY_MACHINE_ID`` and use it as the + RpcServer's ``server_id``. Aligns the session-token's stamped server + identity with the Fly Machine identity so operators reading logs see + the same string in both places. +* :func:`fly_sticky_echo_headers` — return the + ``{"fly-force-instance-id": $FLY_MACHINE_ID}`` mapping ready to pass + as ``sticky_echo_headers=`` to :func:`vgi_rpc.http.make_wsgi_app`. + +Both return ``None`` in non-Fly environments (when ``FLY_MACHINE_ID`` is +unset) so a single codebase deployed to Fly and elsewhere reads the same +configuration without conditional branches: + +.. code-block:: python + + from vgi_rpc import RpcServer + from vgi_rpc.http import make_wsgi_app + from vgi_rpc.http.fly import auto_server_id, fly_sticky_echo_headers + + server = RpcServer( + MyService, MyServiceImpl(), + server_id=auto_server_id(), # ⇒ FLY_MACHINE_ID on Fly, random elsewhere + ) + app = make_wsgi_app( + server, + enable_sticky=True, + sticky_echo_headers=fly_sticky_echo_headers(), # ⇒ Fly hint, or None + ) + +Off Fly, ``sticky_echo_headers=None`` is a no-op (no headers emitted); +the deployment falls back to whatever sticky-routing mechanism the LB +provides (or no sticky routing at all if there's no LB). +""" + +from __future__ import annotations + +import os + +__all__ = ["FLY_MACHINE_ID", "auto_server_id", "fly_sticky_echo_headers"] + + +FLY_MACHINE_ID: str | None = os.environ.get("FLY_MACHINE_ID") +"""The current Fly Machine ID, or ``None`` outside Fly. + +Read once at module import. Fly Machines have stable IDs that persist +across restarts of the same Machine, so caching at import time is safe. +""" + + +def auto_server_id() -> str | None: + """Return ``FLY_MACHINE_ID`` if running on Fly, else ``None``. + + Use as ``RpcServer(server_id=auto_server_id())`` to make the session + token's stamped server identity match the Fly Machine ID. The + framework's session-token format embeds ``server_id`` length-prefixed, + so this works for any length of identifier — Fly Machine IDs are + 14 hex characters today but the contract doesn't depend on that. + + Returns ``None`` outside Fly so RpcServer falls back to its + default random 12-char hex ``server_id``. + """ + return FLY_MACHINE_ID + + +def fly_sticky_echo_headers() -> dict[str, str] | None: + """Return ``{"fly-force-instance-id": FLY_MACHINE_ID}`` on Fly, else ``None``. + + Use as ``make_wsgi_app(..., sticky_echo_headers=fly_sticky_echo_headers())``. + When a method opens a session via ``ctx.open_session(...)`` on Fly, the + server emits ``VGI-Echo-fly-force-instance-id: `` on the + response; the client captures and replays it as ``fly-force-instance-id`` + on every subsequent request in the same session, and fly-proxy routes + directly to the owning Machine. + + Returns ``None`` outside Fly so passing this through unchanged is a + no-op in non-Fly environments — operators don't need a conditional. + """ + if FLY_MACHINE_ID is None: + return None + return {"fly-force-instance-id": FLY_MACHINE_ID} diff --git a/vgi_rpc/http/server/_factory.py b/vgi_rpc/http/server/_factory.py index 2806b5b..d7cef25 100644 --- a/vgi_rpc/http/server/_factory.py +++ b/vgi_rpc/http/server/_factory.py @@ -8,7 +8,7 @@ import logging import os import warnings -from collections.abc import Callable, Iterable +from collections.abc import Callable, Iterable, Mapping from typing import TYPE_CHECKING, Any if TYPE_CHECKING: @@ -21,6 +21,7 @@ from .._common import ( _SESSION_ENDPOINT, + ECHO_HEADER_PREFIX, EXTERNALIZATION_ENABLED_HEADER, MAX_EXTERNALIZED_RESPONSE_BYTES_HEADER, MAX_REQUEST_BYTES_HEADER, @@ -30,6 +31,7 @@ SESSION_CLOSE_HEADER, SESSION_HEADER, STICKY_DEFAULT_TTL_HEADER, + STICKY_ECHO_HEADERS_HEADER, STICKY_ENABLED_HEADER, SUPPORTED_ENCODINGS_HEADER, UPLOAD_URL_HEADER, @@ -99,6 +101,7 @@ def make_wsgi_app( max_stream_response_bytes: int | None = None, enable_sticky: bool = False, sticky_default_ttl: float = 300.0, + sticky_echo_headers: Mapping[str, str] | None = None, ) -> falcon.App[falcon.Request, falcon.Response]: """Create a Falcon WSGI app that serves RPC requests over HTTP. @@ -227,6 +230,19 @@ def make_wsgi_app( ``300.0`` (5 minutes) by default. Methods can override per-session via ``ctx.open_session(state, ttl=60)``. Only meaningful when ``enable_sticky=True``. + sticky_echo_headers: Optional mapping of headers the server tells + the client to echo on every subsequent request inside a + ``with_session_token()`` block. Emitted as ``VGI-Echo-: + `` on session-opening responses; the client strips the + prefix and replays the inner header on later requests. Used + for client-driven routing — e.g. on Fly.io, pass + ``{"fly-force-instance-id": FLY_MACHINE_ID}`` so subsequent + requests inside the session carry ``fly-force-instance-id`` + and fly-proxy routes directly to the owning Machine. Names + are also advertised in the ``VGI-Sticky-Echo-Headers`` capability + header so clients/LBs can introspect the contract via + ``OPTIONS /health``. See ``vgi_rpc/http/fly.py`` for a Fly- + specific helper. Only meaningful when ``enable_sticky=True``. Returns: A Falcon application with routes for unary and stream RPC calls. @@ -394,6 +410,14 @@ def make_wsgi_app( cors_expose.append(STICKY_DEFAULT_TTL_HEADER) cors_expose.append(SESSION_HEADER) cors_expose.append(SESSION_CLOSE_HEADER) + # Echo headers (PR2): advertise the names a client must replay on + # subsequent session requests so clients/LBs can discover the + # contract via OPTIONS /health. Each VGI-Echo- response + # header is also CORS-exposed so browser clients can read it. + if sticky_echo_headers: + capability_headers[STICKY_ECHO_HEADERS_HEADER] = ", ".join(sticky_echo_headers.keys()) + cors_expose.append(STICKY_ECHO_HEADERS_HEADER) + cors_expose.extend(f"{ECHO_HEADER_PREFIX}{name}" for name in sticky_echo_headers) # OAuth resource metadata (RFC 9728) from vgi_rpc.http._oauth import OAuthResourceMetadata as _OAuthMeta @@ -508,6 +532,7 @@ def make_wsgi_app( f"{prefix}/health", f"{prefix}/{_SESSION_ENDPOINT}", ), + echo_headers=sticky_echo_headers, ) ) if authenticate is not None and _pkce_active: diff --git a/vgi_rpc/http/server/_sticky.py b/vgi_rpc/http/server/_sticky.py index d8e383e..427fbbc 100644 --- a/vgi_rpc/http/server/_sticky.py +++ b/vgi_rpc/http/server/_sticky.py @@ -63,6 +63,7 @@ from vgi_rpc.rpc._common import ServerDrainingError from .._common import ( + ECHO_HEADER_PREFIX, SESSION_ACCEPT_HEADER, SESSION_CLOSE_HEADER, SESSION_HEADER, @@ -71,7 +72,7 @@ from ._state_token import _compute_aad if TYPE_CHECKING: - from collections.abc import Callable + from collections.abc import Callable, Mapping _logger = logging.getLogger("vgi_rpc.sticky") @@ -398,6 +399,7 @@ class is purely the request/response shim. """ __slots__ = ( + "_echo_headers", "_exempt_prefixes", "_reaper", "_reaper_lock", @@ -411,10 +413,14 @@ def __init__( token_key: bytes, *, exempt_prefixes: tuple[str, ...] = (), + echo_headers: Mapping[str, str] | None = None, ) -> None: self._registry = registry self._token_key = token_key self._exempt_prefixes = exempt_prefixes + # Frozen snapshot so per-response emission doesn't re-read a mutable + # operator dict mid-request. Empty dict ⇒ no echo headers emitted. + self._echo_headers: tuple[tuple[str, str], ...] = tuple(echo_headers.items()) if echo_headers else () self._reaper: _ReaperThread | None = None self._reaper_lock = threading.Lock() @@ -576,6 +582,14 @@ def process_response( if sink is not None: if sink.mint_token is not None: resp.set_header(SESSION_HEADER, sink.mint_token) + # Tell the client to echo these headers on every subsequent + # request in this session. Used for client-driven routing + # (fly-force-instance-id on Fly.io, similar mechanisms + # elsewhere). Emitted only on session-opening responses; + # the client captures and replays them for the lifetime of + # the session view, no need to repeat. + for name, value in self._echo_headers: + resp.set_header(f"{ECHO_HEADER_PREFIX}{name}", value) if sink.closed: resp.set_header(SESSION_CLOSE_HEADER, "true") # Release the per-session lock if dispatch held it and close_session