Skip to content

Commit bba70a4

Browse files
authored
feat: standalone KV indexer runtime integration (#7295)
1 parent 3718da8 commit bba70a4

File tree

24 files changed

+1139
-79
lines changed

24 files changed

+1139
-79
lines changed

components/src/dynamo/common/configuration/groups/kv_router_args.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
"router_event_threads",
3636
"router_enable_cache_control",
3737
"router_queue_policy",
38+
"remote_indexer_component",
3839
)
3940

4041

@@ -58,6 +59,7 @@ class KvRouterConfigBase(ConfigBase):
5859
router_event_threads: int
5960
router_enable_cache_control: bool
6061
router_queue_policy: str
62+
remote_indexer_component: Optional[str]
6163

6264
def kv_router_kwargs(self) -> dict:
6365
"""Return a dict suitable for ``KvRouterConfig(**kwargs)``."""
@@ -269,3 +271,15 @@ def add_arguments(self, parser) -> None:
269271
arg_type=str,
270272
choices=["fcfs", "wspt"],
271273
)
274+
add_argument(
275+
g,
276+
flag_name="--remote-indexer-component",
277+
env_var="DYN_REMOTE_INDEXER_COMPONENT",
278+
default=None,
279+
help=(
280+
"[EXPERIMENTAL] KV Router: Component name of a standalone KV indexer to use for overlap scoring. "
281+
"When set, the router queries the standalone indexer via the request plane instead "
282+
"of maintaining a local radix tree (e.g. 'kv-indexer')."
283+
),
284+
arg_type=str,
285+
)

components/src/dynamo/mocker/args.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,13 @@ def parse_args() -> argparse.Namespace:
471471
default=os.environ.get("DYN_REQUEST_PLANE", "tcp"),
472472
help="Determines how requests are distributed from routers to workers. 'tcp' is fastest [nats|http|tcp]",
473473
)
474+
parser.add_argument(
475+
"--event-plane",
476+
type=str,
477+
choices=["nats", "zmq"],
478+
default=os.environ.get("DYN_EVENT_PLANE", "nats"),
479+
help="Determines how events are published [nats|zmq]",
480+
)
474481

475482
args = parser.parse_args()
476483
validate_worker_type_args(args)

components/src/dynamo/mocker/main.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
os.environ.setdefault("DYN_COMPUTE_THREADS", "0")
2020

21+
from dynamo.common.utils.runtime import create_runtime
2122
from dynamo.llm import (
2223
EngineType,
2324
EntrypointArgs,
@@ -26,7 +27,6 @@
2627
make_engine,
2728
run_input,
2829
)
29-
from dynamo.runtime import DistributedRuntime
3030
from dynamo.runtime.logging import configure_dynamo_logging
3131

3232
from .args import create_temp_engine_args_file, parse_args, resolve_planner_profile_data
@@ -193,7 +193,6 @@ async def launch_workers(args: argparse.Namespace, extra_engine_args_path: Path)
193193
- Independent service registration and stats scraping
194194
- But still sharing the same tokio runtime (efficient)
195195
"""
196-
loop = asyncio.get_running_loop()
197196
futures = []
198197
runtimes = []
199198
per_worker_temp_files: list[Path] = []
@@ -227,10 +226,12 @@ async def launch_workers(args: argparse.Namespace, extra_engine_args_path: Path)
227226
logger.info(f"Creating mocker worker {worker_id + 1}/{args.num_workers}")
228227

229228
# Create a separate DistributedRuntime for this worker (on same event loop)
230-
runtime = DistributedRuntime(
231-
loop,
229+
230+
runtime, loop = create_runtime(
232231
args.discovery_backend,
233232
args.request_plane,
233+
args.event_plane,
234+
True, # statically set to True, just determines to enable_nats if event_plane is nats
234235
)
235236
runtimes.append(runtime)
236237

docs/components/router/standalone-indexer.md

Lines changed: 140 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ subtitle: Run the KV cache indexer as an independent HTTP service for querying b
77

88
## Overview
99

10-
The standalone KV indexer (`dynamo-kv-indexer`) is a lightweight HTTP binary that subscribes to ZMQ KV event streams from workers, maintains a radix tree of cached blocks, and exposes HTTP endpoints for querying and managing workers.
10+
The standalone KV indexer (`dynamo-kv-indexer`) is a lightweight binary that maintains a radix tree of cached blocks and exposes HTTP endpoints for querying and managing workers. It supports two operational modes:
11+
12+
- **Standalone mode** (default): Subscribes to ZMQ KV event streams directly from workers. No Dynamo runtime dependencies required.
13+
- **Dynamo runtime mode** (`--dynamo-runtime`): Integrates with the Dynamo runtime for automatic worker discovery via MDC, KV event ingestion via the event plane (NATS or ZMQ), and serves indexer queries over the request plane for remote frontends.
1114

1215
This is distinct from the [Standalone Router](../../../components/src/dynamo/router/README.md), which is a full routing service. The standalone indexer provides only the indexing and query layer without routing logic.
1316

@@ -23,14 +26,17 @@ The indexer maintains one radix tree per `(model_name, tenant_id)` pair. Workers
2326

2427
## Compatibility
2528

26-
The standalone indexer works with any engine that publishes KV cache events over ZMQ in the expected msgpack format. This includes bare vLLM and SGLang engines, which emit ZMQ KV events natively — no Dynamo-specific wrapper is required.
29+
In standalone mode, the indexer works with any engine that publishes KV cache events over ZMQ in the expected msgpack format. This includes bare vLLM and SGLang engines, which emit ZMQ KV events natively — no Dynamo-specific wrapper is required.
30+
31+
In Dynamo runtime mode, the indexer discovers workers automatically via MDC and receives KV events through the event plane. It also registers a query endpoint on the request plane, allowing frontends to query overlap scores remotely without needing direct HTTP access.
2732

2833
## Use Cases
2934

3035
- **Debugging**: Inspect the radix tree state to verify which blocks are cached on which workers.
3136
- **State verification**: Confirm that the indexer's view of KV cache state matches the router's internal state (used in integration tests).
3237
- **Custom routing**: Build external routing logic that queries the indexer for overlap scores and makes its own worker selection decisions.
3338
- **Monitoring**: Observe KV cache distribution across workers without running a full router.
39+
- **Remote indexing**: In Dynamo runtime mode, frontends can offload KV cache indexing to a dedicated service and query it over the request plane.
3440

3541
## P2P Recovery
3642

@@ -75,18 +81,56 @@ Peers can be registered at startup via `--peers` or dynamically via the HTTP API
7581

7682
## Building
7783

78-
The binary is a feature-gated target in the `dynamo-kv-router` crate:
84+
The binary is a feature-gated target in the `dynamo-kv-router` crate. The available cargo features control which capabilities are compiled in:
85+
86+
| Feature | Description |
87+
|---------|-------------|
88+
| `standalone-indexer` | Core standalone indexer library (HTTP server, ZMQ listeners, P2P recovery) |
89+
| `metrics` | Prometheus metrics (`/metrics` endpoint, request/worker gauges) |
90+
| `indexer-bin` | CLI binary target |
91+
| `indexer-runtime` | Dynamo runtime integration (discovery, event plane, request plane) |
92+
| `test-endpoints` | Test-only endpoints (`/test/pause_listener`, `/test/resume_listener`) |
93+
94+
### Standalone build (no runtime dependency)
7995

8096
```bash
8197
cargo build -p dynamo-kv-router --features indexer-bin --bin dynamo-kv-indexer
8298
```
8399

100+
This produces a binary with no `dynamo-runtime` dependency. It supports ZMQ event listeners, HTTP API, and P2P recovery.
101+
102+
### Standalone build with metrics
103+
104+
```bash
105+
cargo build -p dynamo-kv-router --features indexer-bin,metrics --bin dynamo-kv-indexer
106+
```
107+
108+
Adds Prometheus metrics support (`/metrics` endpoint). Pulls in `dynamo-runtime` for the metrics implementation.
109+
110+
### Runtime-enabled build
111+
112+
```bash
113+
cargo build -p dynamo-kv-router --features indexer-bin,indexer-runtime --bin dynamo-kv-indexer
114+
```
115+
116+
Enables the `--dynamo-runtime` CLI flag for MDC discovery, event plane subscription, and request plane query endpoint. Includes metrics.
117+
84118
## CLI
85119

120+
### Standalone mode (default)
121+
86122
```bash
87123
dynamo-kv-indexer --port 8090 [--threads 4] [--block-size 16 --model-name my-model --tenant-id default --workers "1=tcp://host:5557,2:1=tcp://host:5558"] [--peers "http://peer1:8090,http://peer2:8091"]
88124
```
89125

126+
### Dynamo runtime mode (requires `indexer-runtime` feature)
127+
128+
```bash
129+
dynamo-kv-indexer --dynamo-runtime --namespace default --component-name kv-indexer --worker-component backend --port 8090 [--threads 4]
130+
```
131+
132+
In runtime mode, workers are discovered automatically via MDC. The `--workers` flag can still be used to register additional static workers alongside discovered ones.
133+
90134
| Flag | Default | Description |
91135
|------|---------|-------------|
92136
| `--block-size` | (none) | KV cache block size for initial `--workers` (required when `--workers` is set) |
@@ -96,6 +140,10 @@ dynamo-kv-indexer --port 8090 [--threads 4] [--block-size 16 --model-name my-mod
96140
| `--model-name` | `default` | Model name for initial `--workers` |
97141
| `--tenant-id` | `default` | Tenant ID for initial `--workers` |
98142
| `--peers` | (none) | Comma-separated peer indexer URLs for P2P recovery on startup |
143+
| `--dynamo-runtime` | `false` | Enable Dynamo runtime integration (requires `indexer-runtime` feature) |
144+
| `--namespace` | `default` | Dynamo namespace to register the indexer component under (runtime mode) |
145+
| `--component-name` | `kv-indexer` | Component name for this indexer in the Dynamo runtime (runtime mode) |
146+
| `--worker-component` | `backend` | Component name that workers register under, for event plane subscription (runtime mode) |
99147

100148
## HTTP API
101149

@@ -109,7 +157,7 @@ curl http://localhost:8090/health
109157

110158
### `GET /metrics` — Prometheus metrics
111159

112-
Returns metrics in Prometheus text exposition format. Available when the binary is built with the `metrics` feature (enabled by default via `standalone-indexer`).
160+
Returns metrics in Prometheus text exposition format. Available when the binary is built with the `metrics` or `indexer-runtime` feature.
113161

114162
```bash
115163
curl http://localhost:8090/metrics
@@ -313,13 +361,44 @@ If no `replay_endpoint` is configured, gaps are logged as warnings but not recov
313361

314362
The sequence counter (`last_seq`) persists across unregister/register cycles, so re-registering a worker after a gap will trigger replay on the first batch received by the new listener.
315363

364+
## Dynamo Runtime Mode
365+
366+
When started with `--dynamo-runtime`, the indexer integrates with the Dynamo distributed runtime:
367+
368+
### Worker Discovery
369+
370+
The indexer watches MDC (Model Discovery Catalog) for worker additions and removals. When a worker registers with MDC, the indexer automatically creates an indexer for its model and block size. Workers discovered via MDC are tracked separately from those registered via `--workers` or the `/register` HTTP API — a worker cannot be registered through both paths simultaneously.
371+
372+
### Event Plane Subscription
373+
374+
Instead of connecting directly to ZMQ PUB sockets on each worker, the indexer subscribes to KV events through the Dynamo event plane. The transport (NATS or ZMQ) is determined by the `DYNAMO_EVENT_TRANSPORT` environment variable. Events are routed to the appropriate indexer based on the worker ID.
375+
376+
### Request Plane Query Endpoint
377+
378+
The indexer registers a query endpoint on the Dynamo request plane, allowing frontends to send `IndexerQueryRequest` messages containing a model name, namespace, and block hashes. The indexer looks up the appropriate radix tree and returns overlap scores. This enables frontends to use a remote indexer for KV-aware routing without direct HTTP access.
379+
380+
### Example
381+
382+
```bash
383+
# Start the indexer with runtime integration
384+
dynamo-kv-indexer --dynamo-runtime \
385+
--namespace my-namespace \
386+
--component-name kv-indexer \
387+
--worker-component backend \
388+
--port 8090 --threads 4
389+
```
390+
391+
The HTTP API remains fully available in runtime mode. Static workers can be added via `--workers` alongside discovered workers.
392+
316393
## Limitations
317394

318-
- **ZMQ only**: Workers must publish KV events via ZMQ PUB sockets. The standalone indexer does not subscribe to NATS event streams.
395+
- **Standalone mode is ZMQ only**: In standalone mode, workers must publish KV events via ZMQ PUB sockets. Build with `indexer-runtime` and use `--dynamo-runtime` to receive events via the event plane (NATS or ZMQ).
319396
- **No routing logic**: The indexer only maintains the radix tree and answers queries. It does not track active blocks, manage request lifecycle, or perform worker selection.
320397

321398
## Architecture
322399

400+
### Standalone Mode
401+
323402
```mermaid
324403
graph TD
325404
subgraph Workers
@@ -353,6 +432,62 @@ graph TD
353432
style CLIENT fill:#fff3e0,stroke:#333,color:#333
354433
```
355434

435+
### Dynamo Runtime Mode
436+
437+
```mermaid
438+
graph TD
439+
subgraph Workers
440+
W1[Worker 1]
441+
W2[Worker 2]
442+
end
443+
444+
subgraph "Dynamo Runtime"
445+
MDC[MDC Discovery]
446+
EP[Event Plane<br/>NATS / ZMQ]
447+
RP[Request Plane]
448+
end
449+
450+
subgraph "Standalone Indexer"
451+
DISC[Discovery Watcher]
452+
SUB[Event Subscriber]
453+
REG[Worker Registry]
454+
IDX["Indexer Map<br/>(model, tenant) → Radix Tree"]
455+
QE[Query Endpoint]
456+
HTTP[HTTP API<br/>/query /dump /register]
457+
end
458+
459+
FRONTEND[Frontend / Router]
460+
CLIENT[External Client]
461+
462+
W1 -->|register| MDC
463+
W2 -->|register| MDC
464+
MDC -->|added/removed| DISC
465+
DISC -->|add/remove workers| REG
466+
W1 -->|KV events| EP
467+
W2 -->|KV events| EP
468+
EP -->|RouterEvent| SUB
469+
SUB -->|apply events| IDX
470+
FRONTEND -->|IndexerQueryRequest| RP
471+
RP --> QE
472+
QE -->|query| IDX
473+
CLIENT -->|POST /query, GET /dump| HTTP
474+
HTTP -->|query| IDX
475+
476+
style W1 fill:#f3e5f5,stroke:#333,color:#333
477+
style W2 fill:#f3e5f5,stroke:#333,color:#333
478+
style MDC fill:#e3f2fd,stroke:#333,color:#333
479+
style EP fill:#e3f2fd,stroke:#333,color:#333
480+
style RP fill:#e3f2fd,stroke:#333,color:#333
481+
style IDX fill:#2e8b57,stroke:#333,color:#fff
482+
style SUB fill:#2e8b57,stroke:#333,color:#fff
483+
style DISC fill:#2e8b57,stroke:#333,color:#fff
484+
style REG fill:#2e8b57,stroke:#333,color:#fff
485+
style QE fill:#2e8b57,stroke:#333,color:#fff
486+
style HTTP fill:#2e8b57,stroke:#333,color:#fff
487+
style FRONTEND fill:#fff3e0,stroke:#333,color:#333
488+
style CLIENT fill:#fff3e0,stroke:#333,color:#333
489+
```
490+
356491
### P2P Recovery Flow
357492

358493
```mermaid

lib/bindings/c/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -691,8 +691,9 @@ pub unsafe extern "C" fn create_routers(
691691
.kv_chooser_for(
692692
&endpoint,
693693
block_size,
694-
Some(kv_router_config),
694+
Some(kv_router_config.clone()),
695695
WORKER_TYPE_DECODE,
696+
Some(model_name.clone()),
696697
)
697698
.await
698699
{

lib/bindings/python/rust/llm/entrypoint.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,21 +40,21 @@ pub enum EngineType {
4040
}
4141

4242
#[pyclass]
43-
#[derive(Default, Clone, Debug, Copy)]
43+
#[derive(Default, Clone, Debug)]
4444
pub struct KvRouterConfig {
4545
inner: RsKvRouterConfig,
4646
}
4747

4848
impl KvRouterConfig {
4949
pub fn inner(&self) -> RsKvRouterConfig {
50-
self.inner
50+
self.inner.clone()
5151
}
5252
}
5353

5454
#[pymethods]
5555
impl KvRouterConfig {
5656
#[new]
57-
#[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, durable_kv_events=false, router_replica_sync=false, router_track_active_blocks=true, router_track_output_blocks=false, router_assume_kv_reuse=true, router_snapshot_threshold=1000000, router_reset_states=false, router_ttl_secs=120.0, router_max_tree_size=1048576, router_prune_target_ratio=0.8, router_queue_threshold=Some(2.0), router_event_threads=4, router_enable_cache_control=false, router_queue_policy="fcfs"))]
57+
#[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, durable_kv_events=false, router_replica_sync=false, router_track_active_blocks=true, router_track_output_blocks=false, router_assume_kv_reuse=true, router_snapshot_threshold=1000000, router_reset_states=false, router_ttl_secs=120.0, router_max_tree_size=1048576, router_prune_target_ratio=0.8, router_queue_threshold=Some(2.0), router_event_threads=4, router_enable_cache_control=false, router_queue_policy="fcfs", remote_indexer_component=None))]
5858
#[allow(clippy::too_many_arguments)]
5959
fn new(
6060
overlap_score_weight: f64,
@@ -74,6 +74,7 @@ impl KvRouterConfig {
7474
router_event_threads: u32,
7575
router_enable_cache_control: bool,
7676
router_queue_policy: &str,
77+
remote_indexer_component: Option<String>,
7778
) -> Self {
7879
KvRouterConfig {
7980
inner: RsKvRouterConfig {
@@ -96,6 +97,7 @@ impl KvRouterConfig {
9697
router_queue_policy: router_queue_policy.parse().unwrap_or_else(|_| {
9798
panic!("invalid router_queue_policy: {router_queue_policy:?}")
9899
}),
100+
remote_indexer_component,
99101
},
100102
}
101103
}

0 commit comments

Comments
 (0)