Skip to content

Commit 030f778

Browse files
Merge pull request IBM#82 from opendatahub-io/main
sync release with main
2 parents 1b3dc71 + 9b721d6 commit 030f778

File tree

12 files changed

+575
-392
lines changed

12 files changed

+575
-392
lines changed

Cargo.lock

Lines changed: 448 additions & 300 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Dockerfile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
## Global Args #################################################################
2-
ARG BASE_UBI_IMAGE_TAG=9.3-1610
2+
ARG BASE_UBI_IMAGE_TAG=latest
33
ARG PROTOC_VERSION=25.3
44
ARG PYTORCH_INDEX="https://download.pytorch.org/whl"
55
# ARG PYTORCH_INDEX="https://download.pytorch.org/whl/nightly"
@@ -85,8 +85,8 @@ ENV LIBRARY_PATH="$CUDA_HOME/lib64/stubs"
8585

8686

8787
## Rust builder ################################################################
88-
# Specific debian version so that compatible glibc version is used
89-
FROM rust:1.77.2-bullseye as rust-builder
88+
# Using bookworm for compilation so the rust binaries get linked against libssl.so.3
89+
FROM rust:1.78-bookworm as rust-builder
9090
ARG PROTOC_VERSION
9191

9292
ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse

integration_tests/poetry.lock

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

integration_tests/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ protobuf = "^4.25.3"
1212
grpcio-tools = "^1.62.2"
1313
pytest = "^8.1.1"
1414
pytest-asyncio = "^0.23.6"
15-
requests = "^2.31.0"
15+
requests = "^2.32.0"
1616
pyyaml = "^6.0.1"
1717

1818
[tool.pytest.ini_options]

router/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ text-generation-client = { path = "client" }
2020
clap = { version = "^4.5.4", features = ["derive", "env"] }
2121
futures = "^0.3.30"
2222
flume = "^0.11.0"
23-
metrics = "0.21.1"
24-
metrics-exporter-prometheus = { version = "0.12.2", features = [] }
23+
metrics = "0.22.3"
24+
metrics-exporter-prometheus = { version = "0.14.0", features = ["http-listener"] }
2525
moka = { version = "0.12.6", features = ["future"] }
2626
nohash-hasher = "^0.2.0"
2727
num = "^0.4.2"

router/src/batcher.rs

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ use crate::{
5252
validation::RequestSize,
5353
ErrorResponse, GenerateRequest,
5454
};
55+
use crate::metrics::{increment_counter, increment_labeled_counter, observe_histogram, observe_labeled_histogram, set_gauge};
5556

5657
/// Batcher
5758
#[derive(Clone)]
@@ -447,9 +448,9 @@ async fn batching_task<B: BatchType>(
447448
batch_size,
448449
);
449450

450-
metrics::gauge!("tgi_batch_current_size", batch_size as f64);
451-
metrics::gauge!("tgi_batch_input_tokens", batch_tokens as f64);
452-
metrics::gauge!(
451+
set_gauge("tgi_batch_current_size", batch_size as f64);
452+
set_gauge("tgi_batch_input_tokens", batch_tokens as f64);
453+
set_gauge(
453454
"tgi_batch_max_remaining_tokens",
454455
batch_max_remaining_tokens.unwrap() as f64
455456
);
@@ -529,7 +530,7 @@ async fn batching_task<B: BatchType>(
529530
"Extending batch #{} of {} with additional batch #{} of {}",
530531
batch_id, batch_size, new_batch_id, added_batch_size
531532
);
532-
metrics::increment_counter!("tgi_batch_concatenation_count");
533+
increment_counter("tgi_batch_concatenation_count", 1);
533534
}
534535
} else {
535536
combined_batch_id = new_batch_id;
@@ -560,9 +561,9 @@ async fn batching_task<B: BatchType>(
560561
}
561562
}
562563

563-
metrics::gauge!("tgi_batch_current_size", 0.0);
564-
metrics::gauge!("tgi_batch_input_tokens", 0.0);
565-
metrics::gauge!("tgi_batch_max_remaining_tokens", 0.0);
564+
set_gauge("tgi_batch_current_size", 0.0);
565+
set_gauge("tgi_batch_input_tokens", 0.0);
566+
set_gauge("tgi_batch_max_remaining_tokens", 0.0);
566567
}
567568

568569
info!("Batching loop exiting");
@@ -625,9 +626,9 @@ impl<'a> TokenProcessor<'a> {
625626
let batch_size = batch.requests.len();
626627
let batch_tokens = batch.total_tokens;
627628
let start_time = Instant::now();
628-
metrics::histogram!("tgi_batch_next_tokens", batch_tokens as f64);
629-
metrics::histogram!(
630-
"tgi_batch_inference_batch_size", batch_size as f64, "method" => "prefill"
629+
observe_histogram("tgi_batch_next_tokens", batch_tokens as f64);
630+
observe_labeled_histogram(
631+
"tgi_batch_inference_batch_size", &[("method", "prefill")], batch_size as f64
631632
);
632633
let (result, prefill_time) = self
633634
._wrap_future(
@@ -648,8 +649,8 @@ impl<'a> TokenProcessor<'a> {
648649
batches: Vec<CachedBatch>,
649650
queue: &mut Queue<B>,
650651
) -> (Option<CachedBatch>, Duration) {
651-
metrics::histogram!(
652-
"tgi_batch_inference_batch_size", self.entries.len() as f64, "method" => "next_token"
652+
observe_labeled_histogram(
653+
"tgi_batch_inference_batch_size", &[("method", "next_token")], self.entries.len() as f64
653654
);
654655
let start_time = Instant::now();
655656
self._wrap_future(
@@ -672,7 +673,7 @@ impl<'a> TokenProcessor<'a> {
672673
start_id: Option<u64>,
673674
queue: &mut Queue<B>,
674675
) -> (Option<CachedBatch>, Duration) {
675-
metrics::increment_counter!("tgi_batch_inference_count", "method" => method);
676+
increment_labeled_counter("tgi_batch_inference_count", &[("method", method)], 1);
676677

677678
// We process the shared queue while waiting for the response from the python shard(s)
678679
let queue_servicer = queue.service_queue().fuse();
@@ -692,27 +693,27 @@ impl<'a> TokenProcessor<'a> {
692693
let completed_request_ids = self.process_next_tokens(generated_tokens, errors);
693694
// Update health
694695
self.generation_health.store(true, Ordering::SeqCst);
695-
metrics::histogram!(
696+
observe_labeled_histogram(
696697
"tgi_batch_inference_duration",
698+
&[("method", method),
699+
("makeup", "single_only")],
697700
elapsed.as_secs_f64(),
698-
"method" => method,
699-
"makeup" => "single_only", // later will possibly be beam_only or mixed
700701
);
701-
metrics::histogram!(
702+
observe_labeled_histogram(
702703
"tgi_batch_inference_forward_duration",
703-
forward_duration,
704-
"method" => method,
705-
"makeup" => "single_only", // later will possibly be beam_only or mixed
704+
&[("method", method),
705+
("makeup", "single_only")],
706+
forward_duration.as_secs_f64(),
706707
);
707-
metrics::histogram!(
708+
observe_labeled_histogram(
708709
"tgi_batch_inference_tokproc_duration",
710+
&[("method", method),
711+
("makeup", "single_only")],
709712
pre_token_process_time.elapsed().as_secs_f64(),
710-
"method" => method,
711-
"makeup" => "single_only", // later will possibly be beam_only or mixed
712713
);
713714
// Probably don't need this additional counter because the duration histogram
714715
// records a total count
715-
metrics::increment_counter!("tgi_batch_inference_success", "method" => method);
716+
increment_labeled_counter("tgi_batch_inference_success", &[("method", method)], 1);
716717
Some(CachedBatch {
717718
batch_id: next_batch_id,
718719
status: completed_request_ids.map(|c| RequestsStatus { completed_ids: c }),
@@ -729,7 +730,7 @@ impl<'a> TokenProcessor<'a> {
729730
ClientError::Connection(_) => "connection",
730731
_ => "error",
731732
};
732-
metrics::increment_counter!("tgi_batch_inference_failure", "method" => method, "reason" => reason);
733+
increment_labeled_counter("tgi_batch_inference_failure", &[("method", method), ("reason", reason)], 1);
733734
self.send_errors(err, start_id);
734735
None
735736
}
@@ -980,7 +981,7 @@ impl<'a> TokenProcessor<'a> {
980981
// If receiver closed (request cancelled), cancel this entry
981982
let e = self.entries.remove(&request_id).unwrap();
982983
stop_reason = Cancelled;
983-
metrics::increment_counter!("tgi_request_failure", "err" => "cancelled");
984+
increment_labeled_counter("tgi_request_failure", &[("err", "cancelled")], 1);
984985
//TODO include request context in log message
985986
warn!(
986987
"Aborted streaming request {request_id} cancelled by client \
@@ -994,7 +995,7 @@ impl<'a> TokenProcessor<'a> {
994995
// If receiver closed (request cancelled), cancel this entry
995996
let e = self.entries.remove(&request_id).unwrap();
996997
stop_reason = Cancelled;
997-
metrics::increment_counter!("tgi_request_failure", "err" => "cancelled");
998+
increment_labeled_counter("tgi_request_failure", &[("err", "cancelled")], 1);
998999
//TODO include request context in log message
9991000
warn!(
10001001
"Aborted request {request_id} cancelled by client \

router/src/grpc_server.rs

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use crate::{
3232
validation::{RequestSize, ValidationError},
3333
GenerateParameters, GenerateRequest,
3434
};
35+
use crate::metrics::{increment_counter, increment_labeled_counter, observe_histogram};
3536
use crate::pb::fmaas::tokenize_response::Offset;
3637

3738
/// Whether to fail if sampling parameters are provided in greedy-mode requests
@@ -67,8 +68,6 @@ pub(crate) async fn start_grpc_server<F: Future<Output = ()> + Send + 'static>(
6768
let grpc_service = GenerationServicer {
6869
state: shared_state,
6970
tokenizer,
70-
input_counter: metrics::register_counter!("tgi_request_input_count"),
71-
tokenize_input_counter: metrics::register_counter!("tgi_tokenize_request_input_count"),
7271
};
7372
let grpc_server = builder
7473
.add_service(GenerationServiceServer::new(grpc_service))
@@ -92,8 +91,6 @@ async fn load_pem(path: String, name: &str) -> Vec<u8> {
9291
pub struct GenerationServicer {
9392
state: ServerState,
9493
tokenizer: AsyncTokenizer,
95-
input_counter: metrics::Counter,
96-
tokenize_input_counter: metrics::Counter,
9794
}
9895

9996
#[tonic::async_trait]
@@ -124,20 +121,20 @@ impl GenerationService for GenerationServicer {
124121
let br = request.into_inner();
125122
let batch_size = br.requests.len();
126123
let kind = if batch_size == 1 { "single" } else { "batch" };
127-
metrics::increment_counter!("tgi_request_count", "kind" => kind);
124+
increment_labeled_counter("tgi_request_count", &[("kind", kind)], 1);
128125
if batch_size == 0 {
129126
return Ok(Response::new(BatchedGenerationResponse {
130127
responses: vec![],
131128
}));
132129
}
133-
self.input_counter.increment(batch_size as u64);
130+
increment_counter("tgi_request_input_count", batch_size as u64);
134131
// Limit concurrent requests by acquiring a permit from the semaphore
135132
let _permit = self
136133
.state
137134
.limit_concurrent_requests
138135
.try_acquire_many(batch_size as u32)
139136
.map_err(|_| {
140-
metrics::increment_counter!("tgi_request_failure", "err" => "conc_limit");
137+
increment_labeled_counter("tgi_request_failure", &[("err", "conc_limit")], 1);
141138
tracing::error!("Model is overloaded");
142139
Status::resource_exhausted("Model is overloaded")
143140
})?;
@@ -217,11 +214,11 @@ impl GenerationService for GenerationServicer {
217214
}
218215
.map_err(|err| match err {
219216
InferError::RequestQueueFull() => {
220-
metrics::increment_counter!("tgi_request_failure", "err" => "queue_full");
217+
increment_labeled_counter("tgi_request_failure", &[("err", "queue_full")], 1);
221218
Status::resource_exhausted(err.to_string())
222219
}
223220
_ => {
224-
metrics::increment_counter!("tgi_request_failure", "err" => "generate");
221+
increment_labeled_counter("tgi_request_failure", &[("err", "generate")], 1);
225222
tracing::error!("{err}");
226223
Status::from_error(Box::new(err))
227224
}
@@ -254,15 +251,15 @@ impl GenerationService for GenerationServicer {
254251
) -> Result<Response<Self::GenerateStreamStream>, Status> {
255252
let start_time = Instant::now();
256253
let request = request.extract_context();
257-
metrics::increment_counter!("tgi_request_count", "kind" => "stream");
258-
self.input_counter.increment(1);
254+
increment_labeled_counter("tgi_request_count", &[("kind", "stream")], 1);
255+
increment_counter("tgi_request_input_count", 1);
259256
let permit = self
260257
.state
261258
.limit_concurrent_requests
262259
.clone()
263260
.try_acquire_owned()
264261
.map_err(|_| {
265-
metrics::increment_counter!("tgi_request_failure", "err" => "conc_limit");
262+
increment_labeled_counter("tgi_request_failure", &[("err", "conc_limit")], 1);
266263
tracing::error!("Model is overloaded");
267264
Status::resource_exhausted("Model is overloaded")
268265
})?;
@@ -292,7 +289,7 @@ impl GenerationService for GenerationServicer {
292289
|ctx, count, reason, request_id, times, out, err| {
293290
let _enter = ctx.span.enter();
294291
if let Some(e) = err {
295-
metrics::increment_counter!("tgi_request_failure", "err" => "generate");
292+
increment_labeled_counter("tgi_request_failure", &[("err", "generate")], 1);
296293
tracing::error!(
297294
"Streaming response failed after {count} tokens, \
298295
output so far: '{:?}': {e}",
@@ -322,11 +319,11 @@ impl GenerationService for GenerationServicer {
322319
.await
323320
.map_err(|err| match err {
324321
InferError::RequestQueueFull() => {
325-
metrics::increment_counter!("tgi_request_failure", "err" => "queue_full");
322+
increment_labeled_counter("tgi_request_failure", &[("err", "queue_full")], 1);
326323
Status::resource_exhausted(err.to_string())
327324
}
328325
_ => {
329-
metrics::increment_counter!("tgi_request_failure", "err" => "unknown");
326+
increment_labeled_counter("tgi_request_failure", &[("err", "unknown")], 1);
330327
tracing::error!("{err}");
331328
Status::from_error(Box::new(err))
332329
}
@@ -341,9 +338,9 @@ impl GenerationService for GenerationServicer {
341338
request: Request<BatchedTokenizeRequest>,
342339
) -> Result<Response<BatchedTokenizeResponse>, Status> {
343340
let br = request.into_inner();
344-
metrics::increment_counter!("tgi_tokenize_request_count");
341+
increment_counter("tgi_tokenize_request_count", 1);
345342
let start_time = Instant::now();
346-
self.tokenize_input_counter.increment(br.requests.len() as u64);
343+
increment_counter("tgi_tokenize_request_input_count", br.requests.len() as u64);
347344

348345
let truncate_to = match br.truncate_input_tokens {
349346
0 => u32::MAX,
@@ -378,8 +375,8 @@ impl GenerationService for GenerationServicer {
378375
.await?;
379376

380377
let token_total: u32 = responses.iter().map(|tr| tr.token_count).sum();
381-
metrics::histogram!("tgi_tokenize_request_tokens", token_total as f64);
382-
metrics::histogram!(
378+
observe_histogram("tgi_tokenize_request_tokens", token_total as f64);
379+
observe_histogram(
383380
"tgi_tokenize_request_duration",
384381
start_time.elapsed().as_secs_f64()
385382
);
@@ -428,12 +425,12 @@ impl GenerationServicer {
428425
Err(err) => Err(err),
429426
}
430427
.map_err(|err| {
431-
metrics::increment_counter!("tgi_request_failure", "err" => "validation");
428+
increment_labeled_counter("tgi_request_failure", &[("err", "validation")], 1);
432429
tracing::error!("{err}");
433430
Status::invalid_argument(err.to_string())
434431
})
435432
.map(|requests| {
436-
metrics::histogram!(
433+
observe_histogram(
437434
"tgi_request_validation_duration",
438435
start_time.elapsed().as_secs_f64()
439436
);
@@ -474,27 +471,27 @@ fn log_response(
474471
span.record("total_time", format!("{total_time:?}"));
475472
span.record("input_toks", input_tokens);
476473

477-
metrics::histogram!(
474+
observe_histogram(
478475
"tgi_request_inference_duration",
479476
inference_time.as_secs_f64()
480477
);
481-
metrics::histogram!(
478+
observe_histogram(
482479
"tgi_request_mean_time_per_token_duration",
483480
time_per_token.as_secs_f64()
484481
);
485482
}
486483

487484
// Metrics
488485
match reason {
489-
Error => metrics::increment_counter!("tgi_request_failure", "err" => "generate"),
486+
Error => increment_labeled_counter("tgi_request_failure", &[("err", "generate")], 1),
490487
Cancelled => (), // recorded where cancellation is detected
491488
_ => {
492-
metrics::increment_counter!(
493-
"tgi_request_success", "stop_reason" => reason.as_str_name(), "kind" => kind
489+
increment_labeled_counter(
490+
"tgi_request_success", &[("stop_reason", reason.as_str_name()), ("kind", kind)], 1
494491
);
495-
metrics::histogram!("tgi_request_duration", total_time.as_secs_f64());
496-
metrics::histogram!("tgi_request_generated_tokens", generated_tokens as f64);
497-
metrics::histogram!(
492+
observe_histogram("tgi_request_duration", total_time.as_secs_f64());
493+
observe_histogram("tgi_request_generated_tokens", generated_tokens as f64);
494+
observe_histogram(
498495
"tgi_request_total_tokens",
499496
(generated_tokens as usize + input_tokens) as f64
500497
);

router/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub mod server;
1111
mod tokenizer;
1212
mod validation;
1313
mod tracing;
14+
mod metrics;
1415

1516
use batcher::Batcher;
1617
use serde::{Deserialize, Serialize};

0 commit comments

Comments
 (0)