Skip to content

Commit adaf1a3

Browse files
authored
feat: Metric for detokenization latency (#6160)
Signed-off-by: Graham King <grahamk@nvidia.com>
1 parent 1488ef2 commit adaf1a3

File tree

7 files changed

+180
-46
lines changed

7 files changed

+180
-46
lines changed

lib/llm/benches/tokenizer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ pub fn decode(c: &mut Criterion) {
5454
let tokenizer: Arc<dyn Tokenizer> =
5555
Arc::new(HuggingFaceTokenizer::from_file(TEST_TOKENIZER).unwrap());
5656
let ds = DecodeStream::new(tokenizer, &[], false);
57-
Decoder::new(ds, StopConditions::default(), false)
57+
Decoder::new(ds, StopConditions::default(), false, None)
5858
},
5959
|mut decoder| {
6060
for tok in black_box(TEST_TOKS) {
@@ -78,7 +78,7 @@ pub fn decode_big(c: &mut Criterion) {
7878
let tokenizer: Arc<dyn Tokenizer> =
7979
Arc::new(HuggingFaceTokenizer::from_file(TEST_TOKENIZER).unwrap());
8080
let ds = DecodeStream::new(tokenizer, &[], false);
81-
Decoder::new(ds, StopConditions::default(), false)
81+
Decoder::new(ds, StopConditions::default(), false, None)
8282
},
8383
|mut decoder| {
8484
for tok in black_box(&BIG_TEST_TOKS) {

lib/llm/src/backend.rs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,10 @@
1515
//! Further post-processing can happen in the response stream. One example is the jailing mechanism for partial
1616
//! hidden stop condition matches, which can be handled in the response stream rather than the backend.
1717
18-
use std::{collections::HashSet, sync::Arc};
18+
use std::{collections::HashSet, sync::Arc, time::Instant};
1919

2020
use anyhow::Result;
2121
use futures::stream::{self, StreamExt};
22-
use tracing as log;
2322

2423
use crate::model_card::ModelDeploymentCard;
2524
use dynamo_runtime::{
@@ -39,6 +38,7 @@ use crate::protocols::{
3938
PreprocessedRequest,
4039
},
4140
preprocessor::PreprocessedEmbeddingRequest,
41+
timing::RequestTracker,
4242
},
4343
};
4444
use crate::tokenizers::{DecodeStream, HuggingFaceTokenizer, Tokenizer};
@@ -99,6 +99,7 @@ impl Backend {
9999
stop_conditions: StopConditions,
100100
skip_special_tokens: bool,
101101
include_stop_str_in_output: bool,
102+
tracker: Option<Arc<RequestTracker>>,
102103
) -> anyhow::Result<DecoderUnfoldState> {
103104
let Some(tokenizer) = self.tokenizer.as_ref() else {
104105
anyhow::bail!("Backend built from blank ModelDeploymentCard, no tokenizer");
@@ -107,6 +108,7 @@ impl Backend {
107108
tokenizer.decode_stream(prompt_token_ids, skip_special_tokens),
108109
stop_conditions,
109110
include_stop_str_in_output,
111+
tracker,
110112
);
111113

112114
Ok(DecoderUnfoldState {
@@ -144,6 +146,7 @@ impl
144146
.sampling_options
145147
.include_stop_str_in_output
146148
.unwrap_or(false);
149+
let tracker = request.tracker.clone();
147150

148151
let next_stream = next.generate(request).await?;
149152

@@ -154,6 +157,7 @@ impl
154157
stop_conditions,
155158
skip_special_tokens,
156159
include_stop_str_in_output,
160+
tracker,
157161
)?;
158162

159163
let processed_stream = stream::unfold(state, |mut state| async move {
@@ -226,15 +230,19 @@ impl
226230

227231
if state.validate_engine_decode {
228232
if data.finish_reason != finish_reason {
229-
log::warn!(
233+
tracing::warn!(
230234
"finish reason mismatch: expected {:?}, got {:?}",
231235
data.finish_reason,
232236
finish_reason
233237
);
234238
}
235239

236240
if data.text.is_some() && data.text != text {
237-
log::warn!("text mismatch: expected {:?}, got {:?}", data.text, text);
241+
tracing::warn!(
242+
"text mismatch: expected {:?}, got {:?}",
243+
data.text,
244+
text
245+
);
238246
}
239247
}
240248

@@ -326,6 +334,7 @@ impl
326334
#[allow(dead_code)]
327335
pub struct Decoder {
328336
decode_stream: DecodeStream,
337+
tracker: Option<Arc<RequestTracker>>,
329338

330339
// do not trigger stop conditions until at least this many tokens have been generated
331340
min_tokens: u32,
@@ -398,6 +407,7 @@ impl Decoder {
398407
decode_stream: DecodeStream,
399408
stop_condition: StopConditions,
400409
include_stop_str_in_output: bool,
410+
tracker: Option<Arc<RequestTracker>>,
401411
) -> Self {
402412
let hidden_stop_ids: HashSet<TokenIdType> = stop_condition
403413
.stop_token_ids_hidden
@@ -425,6 +435,7 @@ impl Decoder {
425435

426436
Self {
427437
decode_stream,
438+
tracker,
428439
hidden_stop_ids,
429440
hidden_stop_sequences,
430441
visible_stop_sequences,
@@ -447,7 +458,11 @@ impl Decoder {
447458
self.generated_tokens += 1;
448459

449460
// decode the token
461+
let detokenize_start = Instant::now();
450462
let token = self.decode_stream.step(token_id)?;
463+
if let Some(tracker) = &self.tracker {
464+
tracker.record_detokenize_latency(detokenize_start.elapsed());
465+
}
451466

452467
// stop conditions to not apply until the minimum number of tokens have been generated
453468
if self.generated_tokens < self.min_tokens {
@@ -468,18 +483,12 @@ impl Decoder {
468483
&& let Some(token) = &token
469484
{
470485
let pre_append = self.jail.len();
471-
log::debug!("pre_append: {}", pre_append);
472-
log::debug!("jail: {}", self.jail);
473486
self.jail.push_str(token);
474-
log::debug!("post_append: {}", self.jail.len());
475-
log::debug!("jail: {}", self.jail);
476487

477488
// Check hidden stop sequences first (excluded from output)
478489
for seq in &self.hidden_stop_sequences {
479-
log::debug!("stop seq: {}", seq);
480490
if let Some(offset) = galil_seiferas::gs_find(self.jail.as_bytes(), seq.as_bytes())
481491
{
482-
log::debug!("offset: {}", offset);
483492
// return only new bytes after pre_append .. offset (excluding stop sequence)
484493
// example: seq = "ox", token = "boxes", return "b"
485494
// note: this changes when we start jailing tokens for partial matches

lib/llm/src/http/service/metrics.rs

Lines changed: 102 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -328,8 +328,11 @@ pub struct ResponseMetricCollector {
328328
osl: usize,
329329
// we track if cached_tokens has been observed to ensure we only increment once per request
330330
cached_tokens_observed: bool,
331-
// we track if tokenizer latency has been observed to ensure we only increment once per request
332-
tokenizer_latency_observed: bool,
331+
// we track if tokenize latency has been observed to ensure we only increment once per request
332+
tokenize_latency_observed: bool,
333+
// latest accumulated detokenize latency and sample count reported by tracker
334+
detokenize_latency_total: Duration,
335+
detokenize_count_total: u64,
333336
// Prefill worker info for TTFT attribution (set from LLMMetricAnnotation)
334337
prefill_worker_id: Option<u64>,
335338
prefill_dp_rank: Option<u32>,
@@ -987,7 +990,9 @@ impl ResponseMetricCollector {
987990
start_time: Instant::now(),
988991
osl: 0,
989992
cached_tokens_observed: false,
990-
tokenizer_latency_observed: false,
993+
tokenize_latency_observed: false,
994+
detokenize_latency_total: Duration::ZERO,
995+
detokenize_count_total: 0,
991996
prefill_worker_id: None,
992997
prefill_dp_rank: None,
993998
prefill_worker_type: None,
@@ -1052,17 +1057,30 @@ impl ResponseMetricCollector {
10521057
}
10531058
}
10541059

1055-
/// Observe tokenizer latency in milliseconds, once per request.
1056-
pub fn observe_tokenizer_latency(&mut self, tokenizer_latency: Option<Duration>) {
1057-
if let Some(latency) = tokenizer_latency
1058-
&& !self.tokenizer_latency_observed
1060+
/// Observe tokenize/detokenize latencies in milliseconds.
1061+
/// Tokenize is observed once per request; detokenize is accumulated and observed at request end.
1062+
pub fn observe_tokenize_latencies(
1063+
&mut self,
1064+
tokenize_latency: Option<Duration>,
1065+
detokenize_latency: Option<Duration>,
1066+
detokenize_count: Option<u64>,
1067+
) {
1068+
if let Some(latency) = tokenize_latency
1069+
&& !self.tokenize_latency_observed
10591070
{
1060-
self.tokenizer_latency_observed = true;
1071+
self.tokenize_latency_observed = true;
10611072
self.metrics
10621073
.tokenizer_latency
10631074
.with_label_values(&[frontend_service::operation::TOKENIZE])
10641075
.observe(latency.as_secs_f64() * 1000.0);
10651076
}
1077+
1078+
if let Some(latency) = detokenize_latency {
1079+
self.detokenize_latency_total = latency;
1080+
}
1081+
if let Some(count) = detokenize_count {
1082+
self.detokenize_count_total = count;
1083+
}
10661084
}
10671085

10681086
/// Observe a response with input sequence length and number of new tokens
@@ -1155,6 +1173,15 @@ impl ResponseMetricCollector {
11551173

11561174
impl Drop for ResponseMetricCollector {
11571175
fn drop(&mut self) {
1176+
if !self.detokenize_latency_total.is_zero() && self.detokenize_count_total > 0 {
1177+
let avg_detokenize_latency_ms = (self.detokenize_latency_total.as_secs_f64() * 1000.0)
1178+
/ self.detokenize_count_total as f64;
1179+
self.metrics
1180+
.tokenizer_latency
1181+
.with_label_values(&[frontend_service::operation::DETOKENIZE])
1182+
.observe(avg_detokenize_latency_ms);
1183+
}
1184+
11581185
// Publish final OSL when the collector is dropped
11591186
self.metrics
11601187
.output_sequence_length
@@ -1179,7 +1206,11 @@ pub fn process_response_and_observe_metrics<T>(
11791206
if let Ok(Some(metrics)) = LLMMetricAnnotation::from_annotation(annotated) {
11801207
response_collector.observe_current_osl(metrics.output_tokens);
11811208
response_collector.observe_cached_tokens(metrics.cached_tokens);
1182-
response_collector.observe_tokenizer_latency(metrics.tokenizer_latency);
1209+
response_collector.observe_tokenize_latencies(
1210+
metrics.tokenize_latency,
1211+
metrics.detokenize_total_latency,
1212+
metrics.detokenize_count,
1213+
);
11831214
response_collector.set_worker_info(
11841215
metrics.prefill_worker_id,
11851216
metrics.prefill_dp_rank,
@@ -1229,7 +1260,11 @@ pub fn process_response_using_event_converter_and_observe_metrics<T: Serialize>(
12291260
if let Ok(Some(metrics)) = LLMMetricAnnotation::from_annotation(&annotated) {
12301261
response_collector.observe_current_osl(metrics.output_tokens);
12311262
response_collector.observe_cached_tokens(metrics.cached_tokens);
1232-
response_collector.observe_tokenizer_latency(metrics.tokenizer_latency);
1263+
response_collector.observe_tokenize_latencies(
1264+
metrics.tokenize_latency,
1265+
metrics.detokenize_total_latency,
1266+
metrics.detokenize_count,
1267+
);
12331268
response_collector.set_worker_info(
12341269
metrics.prefill_worker_id,
12351270
metrics.prefill_dp_rank,
@@ -1735,7 +1770,9 @@ mod tests {
17351770
decode_worker_id: None,
17361771
decode_dp_rank: None,
17371772
decode_worker_type: None,
1738-
tokenizer_latency: Some(Duration::from_millis(8)),
1773+
tokenize_latency: Some(Duration::from_millis(8)),
1774+
detokenize_total_latency: Some(Duration::from_micros(100)),
1775+
detokenize_count: Some(2),
17391776
};
17401777

17411778
let annotation = llm_metrics.to_annotation::<()>().unwrap();
@@ -1753,6 +1790,9 @@ mod tests {
17531790
// Should return Ok(None) for metrics annotation events
17541791
assert!(matches!(result, Ok(None)));
17551792

1793+
// Drop collector so the detokenize observation fires in Drop
1794+
drop(collector);
1795+
17561796
// Should have observed the cached tokens from the metrics annotation event
17571797
let metric_families = registry.gather();
17581798
let histogram_family = metric_families
@@ -1770,11 +1810,31 @@ mod tests {
17701810
.iter()
17711811
.find(|mf| mf.name() == expected_tokenizer_metric_name)
17721812
.expect("histogram should be registered");
1773-
assert_eq!(
1774-
histogram_family.get_metric()[0]
1775-
.get_histogram()
1776-
.get_sample_count(),
1777-
1
1813+
1814+
// Find the tokenize and detokenize observations by label
1815+
let tokenize_metric = histogram_family
1816+
.get_metric()
1817+
.iter()
1818+
.find(|m| m.get_label().iter().any(|l| l.value() == "tokenize"))
1819+
.expect("tokenize metric should exist");
1820+
assert_eq!(tokenize_metric.get_histogram().get_sample_count(), 1);
1821+
// 8ms
1822+
assert!(
1823+
(tokenize_metric.get_histogram().get_sample_sum() - 8.0).abs() < 0.001,
1824+
"tokenize latency should be 8.0ms"
1825+
);
1826+
1827+
let detokenize_metric = histogram_family
1828+
.get_metric()
1829+
.iter()
1830+
.find(|m| m.get_label().iter().any(|l| l.value() == "detokenize"))
1831+
.expect("detokenize metric should exist");
1832+
assert_eq!(detokenize_metric.get_histogram().get_sample_count(), 1);
1833+
// Average: 100us total / 2 samples = 50us = 0.05ms
1834+
assert!(
1835+
(detokenize_metric.get_histogram().get_sample_sum() - 0.05).abs() < 0.001,
1836+
"detokenize average latency should be 0.05ms, got {}",
1837+
detokenize_metric.get_histogram().get_sample_sum()
17781838
);
17791839
}
17801840

@@ -1813,7 +1873,9 @@ mod tests {
18131873
decode_worker_id: None,
18141874
decode_dp_rank: None,
18151875
decode_worker_type: None,
1816-
tokenizer_latency: Some(Duration::from_millis(8)),
1876+
tokenize_latency: Some(Duration::from_millis(8)),
1877+
detokenize_total_latency: Some(Duration::from_micros(100)),
1878+
detokenize_count: Some(2),
18171879
};
18181880

18191881
let annotation = llm_metrics.to_annotation::<()>().unwrap();
@@ -1824,6 +1886,9 @@ mod tests {
18241886
let mut http_queue_guard = None;
18251887
process_response_and_observe_metrics(&annotated, &mut collector, &mut http_queue_guard);
18261888

1889+
// Drop collector so the detokenize observation fires in Drop
1890+
drop(collector);
1891+
18271892
// Should have observed the cached tokens from the metrics annotation event
18281893
let metric_families = registry.gather();
18291894
let histogram_family = metric_families
@@ -1841,11 +1906,26 @@ mod tests {
18411906
.iter()
18421907
.find(|mf| mf.name() == expected_tokenizer_metric_name)
18431908
.expect("histogram should be registered");
1844-
assert_eq!(
1845-
histogram_family.get_metric()[0]
1846-
.get_histogram()
1847-
.get_sample_count(),
1848-
1
1909+
1910+
// Find the tokenize and detokenize observations by label
1911+
let tokenize_metric = histogram_family
1912+
.get_metric()
1913+
.iter()
1914+
.find(|m| m.get_label().iter().any(|l| l.value() == "tokenize"))
1915+
.expect("tokenize metric should exist");
1916+
assert_eq!(tokenize_metric.get_histogram().get_sample_count(), 1);
1917+
1918+
let detokenize_metric = histogram_family
1919+
.get_metric()
1920+
.iter()
1921+
.find(|m| m.get_label().iter().any(|l| l.value() == "detokenize"))
1922+
.expect("detokenize metric should exist");
1923+
assert_eq!(detokenize_metric.get_histogram().get_sample_count(), 1);
1924+
// Average: 100us total / 2 samples = 50us = 0.05ms
1925+
assert!(
1926+
(detokenize_metric.get_histogram().get_sample_sum() - 0.05).abs() < 0.001,
1927+
"detokenize average latency should be 0.05ms, got {}",
1928+
detokenize_metric.get_histogram().get_sample_sum()
18491929
);
18501930
}
18511931
}

0 commit comments

Comments
 (0)