Skip to content

Commit 64869f3

Browse files
committed
chore(kv-router): untangle runtime from metrics
Signed-off-by: PeaBrane <yanrpei@gmail.com>
1 parent 59a9ecc commit 64869f3

File tree

7 files changed

+155
-110
lines changed

7 files changed

+155
-110
lines changed

Cargo.lock

Lines changed: 0 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ dynamo-config = { path = "lib/config", version = "1.0.0" }
4343
dynamo-tokens = { path = "lib/tokens", version = "1.0.0" }
4444
dynamo-memory = { path = "lib/memory", version = "1.0.0" }
4545
dynamo-mocker = { path = "lib/mocker", version = "1.0.0" }
46-
dynamo-kv-router = { path = "lib/kv-router", version = "1.0.0", features = ["metrics"] }
46+
dynamo-kv-router = { path = "lib/kv-router", version = "1.0.0", features = ["metrics", "runtime-protocols"] }
4747
dynamo-async-openai = { path = "lib/async-openai", version = "1.0.0", features = ["byot"] }
4848
dynamo-parsers = { path = "lib/parsers", version = "1.0.0" }
4949

lib/kv-router/Cargo.toml

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@ repository.workspace = true
1212

1313
[features]
1414
default = []
15-
metrics = ["dep:dynamo-runtime"]
16-
bench = ["dep:clap", "dep:indicatif", "dep:serde_json", "dep:plotters"]
15+
metrics = ["dep:prometheus"]
16+
runtime-protocols = ["dep:dynamo-runtime"]
17+
bench = []
1718
standalone-indexer = ["dep:axum", "dep:bytes", "dep:zeromq", "dep:serde_json", "dep:reqwest"]
18-
indexer-runtime = ["metrics", "standalone-indexer"]
19+
indexer-runtime = ["metrics", "runtime-protocols", "standalone-indexer"]
1920

2021
[dependencies]
2122
# repo
@@ -29,7 +30,7 @@ dashmap = { workspace = true }
2930
ordered-float = { workspace = true }
3031
derive_builder = { workspace = true }
3132
derive-getters = { workspace = true }
32-
prometheus = { workspace = true }
33+
prometheus = { workspace = true, optional = true }
3334
rand = { workspace = true }
3435
serde = { workspace = true }
3536
serde_json = { workspace = true, optional = true }
@@ -46,10 +47,6 @@ flume = "0.12.0"
4647
parking_lot = { workspace = true }
4748
rmp-serde = { workspace = true }
4849

49-
# bench (optional)
50-
clap = { version = "4.5", features = ["derive"], optional = true }
51-
indicatif = { version = "0.18.0", optional = true }
52-
plotters = { version = "0.3", optional = true, default-features = false, features = ["svg_backend", "line_series", "point_series", "full_palette"] }
5350
rustc-hash = "2.1.1"
5451

5552
# standalone-indexer (optional)
@@ -58,9 +55,6 @@ bytes = { workspace = true, optional = true }
5855
reqwest = { workspace = true, optional = true }
5956
zeromq = { version = "0.4.1", optional = true }
6057

61-
[package.metadata.cargo-machete]
62-
ignored = ["clap", "indicatif", "plotters"]
63-
6458
[dev-dependencies]
6559
rstest = "0.18.2"
6660
rstest_reuse = "0.7.0"
Lines changed: 73 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
#[cfg(feature = "metrics")]
5-
use std::sync::{Arc, OnceLock};
4+
#[cfg(feature = "runtime-protocols")]
5+
use std::sync::Arc;
6+
#[cfg(all(feature = "metrics", feature = "runtime-protocols"))]
7+
use std::sync::OnceLock;
68

9+
#[cfg(feature = "runtime-protocols")]
10+
use dynamo_runtime::component::Component;
11+
#[cfg(all(feature = "metrics", feature = "runtime-protocols"))]
12+
use dynamo_runtime::metrics::MetricsHierarchy;
713
#[cfg(feature = "metrics")]
8-
use dynamo_runtime::{
9-
component::Component,
10-
metrics::{MetricsHierarchy, prometheus_names::kvrouter},
11-
};
1214
use prometheus::{IntCounterVec, Opts};
1315

1416
use crate::protocols::{KvCacheEventData, KvCacheEventError};
@@ -17,9 +19,17 @@ use crate::protocols::{KvCacheEventData, KvCacheEventError};
1719
#[derive(Clone)]
1820
pub struct KvIndexerMetrics {
1921
/// Counter of events applied.
22+
#[cfg(feature = "metrics")]
2023
pub kv_cache_events_applied: IntCounterVec,
2124
}
2225

26+
#[cfg(not(feature = "metrics"))]
27+
impl Default for KvIndexerMetrics {
28+
fn default() -> Self {
29+
Self {}
30+
}
31+
}
32+
2333
/// Metric status labels.
2434
pub const METRIC_STATUS_OK: &str = "ok";
2535
pub const METRIC_STATUS_PARENT_NOT_FOUND: &str = "parent_block_not_found";
@@ -32,9 +42,10 @@ pub const METRIC_EVENT_REMOVED: &str = "removed";
3242
pub const METRIC_EVENT_CLEARED: &str = "cleared";
3343

3444
/// Metric name for KV cache events applied counter.
45+
const KV_CACHE_EVENTS_APPLIED_SUFFIX: &str = "kv_cache_events_applied";
3546
const KV_CACHE_EVENTS_APPLIED_NAME: &str = "dynamo_kvrouter_kv_cache_events_applied";
3647

37-
#[cfg(feature = "metrics")]
48+
#[cfg(all(feature = "metrics", feature = "runtime-protocols"))]
3849
static KV_INDEXER_METRICS: OnceLock<Arc<KvIndexerMetrics>> = OnceLock::new();
3950

4051
impl KvIndexerMetrics {
@@ -47,26 +58,40 @@ impl KvIndexerMetrics {
4758

4859
/// Creates a new KvIndexerMetrics from a Component, memoizing the result in
4960
/// KV_INDEXER_METRICS to avoid duplicate registration issues.
50-
#[cfg(feature = "metrics")]
61+
#[cfg(feature = "runtime-protocols")]
5162
pub fn from_component(component: &Component) -> Arc<Self> {
52-
KV_INDEXER_METRICS.get_or_init(|| {
53-
match component.metrics().create_intcountervec(
54-
kvrouter::KV_CACHE_EVENTS_APPLIED,
55-
"Total number of KV cache events applied to index",
56-
&["event_type", "status"],
57-
&[],
58-
) {
59-
Ok(kv_cache_events_applied) => Arc::new(Self::new(kv_cache_events_applied)),
60-
Err(e) => {
61-
tracing::warn!("Failed to create kv indexer metrics from component: {}. Using unregistered metrics as fallback.", e);
62-
Arc::new(Self::new_unregistered())
63-
}
64-
}
65-
}).clone()
63+
#[cfg(feature = "metrics")]
64+
{
65+
KV_INDEXER_METRICS
66+
.get_or_init(|| {
67+
match component.metrics().create_intcountervec(
68+
KV_CACHE_EVENTS_APPLIED_SUFFIX,
69+
"Total number of KV cache events applied to index",
70+
&["event_type", "status"],
71+
&[],
72+
) {
73+
Ok(kv_cache_events_applied) => {
74+
Arc::new(Self::new(kv_cache_events_applied))
75+
}
76+
Err(e) => {
77+
tracing::warn!("Failed to create kv indexer metrics from component: {}. Using unregistered metrics as fallback.", e);
78+
Arc::new(Self::new_unregistered())
79+
}
80+
}
81+
})
82+
.clone()
83+
}
84+
85+
#[cfg(not(feature = "metrics"))]
86+
{
87+
let _ = component;
88+
Arc::new(Self::new_unregistered())
89+
}
6690
}
6791

6892
/// Creates a new KvIndexerMetrics which is not registered with a MetricsRegistry.
6993
/// This may be used for tests or as a fallback for when a MetricsRegistry is not available / has errored.
94+
#[cfg(feature = "metrics")]
7095
pub fn new_unregistered() -> Self {
7196
Self {
7297
kv_cache_events_applied: IntCounterVec::new(
@@ -80,6 +105,12 @@ impl KvIndexerMetrics {
80105
}
81106
}
82107

108+
/// Creates a no-op metrics instance when Prometheus support is disabled.
109+
#[cfg(not(feature = "metrics"))]
110+
pub fn new_unregistered() -> Self {
111+
Self::default()
112+
}
113+
83114
pub fn get_event_type(event_data: &KvCacheEventData) -> &'static str {
84115
match event_data {
85116
KvCacheEventData::Stored(_) => METRIC_EVENT_STORED,
@@ -93,22 +124,27 @@ impl KvIndexerMetrics {
93124
event_type: &'static str,
94125
result: Result<(), KvCacheEventError>,
95126
) {
96-
match result {
97-
Ok(_) => {
98-
self.kv_cache_events_applied
99-
.with_label_values(&[event_type, METRIC_STATUS_OK])
100-
.inc_by(1);
101-
}
102-
Err(e) => {
103-
let error_label = match e {
104-
KvCacheEventError::ParentBlockNotFound => METRIC_STATUS_PARENT_NOT_FOUND,
105-
KvCacheEventError::BlockNotFound => METRIC_STATUS_BLOCK_NOT_FOUND,
106-
KvCacheEventError::InvalidBlockSequence => METRIC_STATUS_INVALID_BLOCK,
107-
};
108-
self.kv_cache_events_applied
109-
.with_label_values(&[event_type, error_label])
110-
.inc_by(1);
127+
#[cfg(feature = "metrics")]
128+
{
129+
match result {
130+
Ok(_) => {
131+
self.kv_cache_events_applied
132+
.with_label_values(&[event_type, METRIC_STATUS_OK])
133+
.inc_by(1);
134+
}
135+
Err(e) => {
136+
let error_label = match e {
137+
KvCacheEventError::ParentBlockNotFound => METRIC_STATUS_PARENT_NOT_FOUND,
138+
KvCacheEventError::BlockNotFound => METRIC_STATUS_BLOCK_NOT_FOUND,
139+
KvCacheEventError::InvalidBlockSequence => METRIC_STATUS_INVALID_BLOCK,
140+
};
141+
self.kv_cache_events_applied
142+
.with_label_values(&[event_type, error_label])
143+
.inc_by(1);
144+
}
111145
}
112146
}
147+
#[cfg(not(feature = "metrics"))]
148+
let _ = (self, event_type, result);
113149
}
114150
}

lib/kv-router/src/indexer/types.rs

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,6 @@
44
#[cfg(feature = "bench")]
55
use std::time::Instant;
66

7-
#[cfg(feature = "metrics")]
8-
use dynamo_runtime::error::DynamoError;
9-
#[cfg(feature = "metrics")]
10-
pub use dynamo_runtime::protocols::maybe_error::MaybeError;
117
use serde::{Deserialize, Serialize};
128
use tokio::sync::{mpsc, oneshot};
139

@@ -16,7 +12,6 @@ use dynamo_tokens::SequenceHash;
1612

1713
/// Trait for types that may represent an error response.
1814
/// Used for RPC-style responses that can indicate success or failure.
19-
#[cfg(not(feature = "metrics"))]
2015
pub trait MaybeError {
2116
/// Construct an instance from an error.
2217
fn from_err(err: impl std::error::Error + 'static) -> Self;
@@ -75,15 +70,30 @@ pub enum WorkerKvQueryResponse {
7570
Error(String),
7671
}
7772

78-
#[cfg(feature = "metrics")]
7973
impl MaybeError for WorkerKvQueryResponse {
8074
fn from_err(err: impl std::error::Error + 'static) -> Self {
8175
WorkerKvQueryResponse::Error(err.to_string())
8276
}
8377

84-
fn err(&self) -> Option<DynamoError> {
78+
fn err(&self) -> Option<Box<dyn std::error::Error + Send + Sync>> {
79+
match self {
80+
WorkerKvQueryResponse::Error(msg) => Some(Box::new(std::io::Error::other(msg.clone()))),
81+
_ => None,
82+
}
83+
}
84+
}
85+
86+
#[cfg(feature = "runtime-protocols")]
87+
impl dynamo_runtime::protocols::maybe_error::MaybeError for WorkerKvQueryResponse {
88+
fn from_err(err: impl std::error::Error + 'static) -> Self {
89+
WorkerKvQueryResponse::Error(err.to_string())
90+
}
91+
92+
fn err(&self) -> Option<dynamo_runtime::error::DynamoError> {
8593
match self {
86-
WorkerKvQueryResponse::Error(msg) => Some(DynamoError::msg(msg.clone())),
94+
WorkerKvQueryResponse::Error(msg) => {
95+
Some(dynamo_runtime::error::DynamoError::msg(msg.clone()))
96+
}
8797
_ => None,
8898
}
8999
}
@@ -147,29 +157,30 @@ pub enum IndexerQueryResponse {
147157
Error(String),
148158
}
149159

150-
#[cfg(feature = "metrics")]
151160
impl MaybeError for IndexerQueryResponse {
152161
fn from_err(err: impl std::error::Error + 'static) -> Self {
153162
IndexerQueryResponse::Error(err.to_string())
154163
}
155164

156-
fn err(&self) -> Option<DynamoError> {
165+
fn err(&self) -> Option<Box<dyn std::error::Error + Send + Sync>> {
157166
match self {
158-
IndexerQueryResponse::Error(msg) => Some(DynamoError::msg(msg.clone())),
167+
IndexerQueryResponse::Error(msg) => Some(Box::new(std::io::Error::other(msg.clone()))),
159168
_ => None,
160169
}
161170
}
162171
}
163172

164-
#[cfg(not(feature = "metrics"))]
165-
impl MaybeError for IndexerQueryResponse {
173+
#[cfg(feature = "runtime-protocols")]
174+
impl dynamo_runtime::protocols::maybe_error::MaybeError for IndexerQueryResponse {
166175
fn from_err(err: impl std::error::Error + 'static) -> Self {
167176
IndexerQueryResponse::Error(err.to_string())
168177
}
169178

170-
fn err(&self) -> Option<Box<dyn std::error::Error + Send + Sync>> {
179+
fn err(&self) -> Option<dynamo_runtime::error::DynamoError> {
171180
match self {
172-
IndexerQueryResponse::Error(msg) => Some(Box::new(std::io::Error::other(msg.clone()))),
181+
IndexerQueryResponse::Error(msg) => {
182+
Some(dynamo_runtime::error::DynamoError::msg(msg.clone()))
183+
}
173184
_ => None,
174185
}
175186
}

0 commit comments

Comments
 (0)