Skip to content

Commit 0a73cb7

Browse files
Tracing: extract context from incoming request (#77)
#### Motivation When receiving request from upstream component, extract context to read trace parent info and set to current span #### Modifications 1. Extract context from request 2. Set tracing service name through argument or env variable Signed-off-by: Kavya Govindarajan <[email protected]> Co-authored-by: Kavya Govindarajan <[email protected]>
1 parent cfb62ef commit 0a73cb7

File tree

5 files changed

+64
-3
lines changed

5 files changed

+64
-3
lines changed

launcher/src/main.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ struct Args {
9191
default_include_stop_seqs: bool,
9292
#[clap(long, env)]
9393
otlp_endpoint: Option<String>,
94+
#[clap(long, env)]
95+
otlp_service_name: Option<String>,
9496
}
9597

9698
fn main() -> ExitCode {
@@ -331,6 +333,11 @@ fn main() -> ExitCode {
331333
if let Some(otlp_endpoint) = args.otlp_endpoint {
332334
argv.push("--otlp-endpoint".to_string());
333335
argv.push(otlp_endpoint);
336+
337+
if let Some(otlp_service_name) = args.otlp_service_name {
338+
argv.push("--otlp-service-name".to_string());
339+
argv.push(otlp_service_name);
340+
}
334341
}
335342

336343
if args.output_special_tokens {

router/src/grpc_server.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use crate::{
2828
},
2929
server::ServerState,
3030
tokenizer::AsyncTokenizer,
31+
tracing::ExtractTelemetryContext,
3132
validation::{RequestSize, ValidationError},
3233
GenerateParameters, GenerateRequest,
3334
};
@@ -119,6 +120,7 @@ impl GenerationService for GenerationServicer {
119120
request: Request<BatchedGenerationRequest>,
120121
) -> Result<Response<BatchedGenerationResponse>, Status> {
121122
let start_time = Instant::now();
123+
let request = request.extract_context();
122124
let br = request.into_inner();
123125
let batch_size = br.requests.len();
124126
let kind = if batch_size == 1 { "single" } else { "batch" };
@@ -251,6 +253,7 @@ impl GenerationService for GenerationServicer {
251253
request: Request<SingleGenerationRequest>,
252254
) -> Result<Response<Self::GenerateStreamStream>, Status> {
253255
let start_time = Instant::now();
256+
let request = request.extract_context();
254257
metrics::increment_counter!("tgi_request_count", "kind" => "stream");
255258
self.input_counter.increment(1);
256259
let permit = self

router/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ mod queue;
1010
pub mod server;
1111
mod tokenizer;
1212
mod validation;
13+
mod tracing;
1314

1415
use batcher::Batcher;
1516
use serde::{Deserialize, Serialize};

router/src/main.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ struct Args {
6161
default_include_stop_seqs: bool,
6262
#[clap(long, env)]
6363
otlp_endpoint: Option<String>,
64+
#[clap(
65+
long,
66+
env = "OTEL_SERVICE_NAME",
67+
default_value = "text-generation-inference.router"
68+
)]
69+
otlp_service_name: String,
6470
}
6571

6672
fn main() -> Result<(), std::io::Error> {
@@ -106,7 +112,7 @@ fn main() -> Result<(), std::io::Error> {
106112
.build()
107113
.unwrap()
108114
.block_on(async {
109-
init_logging(args.otlp_endpoint, args.json_output);
115+
init_logging(args.otlp_endpoint, args.json_output, args.otlp_service_name);
110116
// Instantiate sharded client from the master unix socket
111117
let mut sharded_client = ShardedClient::connect_uds(args.master_shard_uds_path)
112118
.await
@@ -206,7 +212,7 @@ fn write_termination_log(msg: &str) -> Result<(), io::Error> {
206212
Ok(())
207213
}
208214

209-
fn init_logging(otlp_endpoint: Option<String>, json_output: bool) {
215+
fn init_logging(otlp_endpoint: Option<String>, json_output: bool, otlp_service_name: String) {
210216
let mut layers = Vec::new();
211217

212218
// STDOUT/STDERR layer
@@ -235,7 +241,7 @@ fn init_logging(otlp_endpoint: Option<String>, json_output: bool) {
235241
trace::config()
236242
.with_resource(Resource::new(vec![KeyValue::new(
237243
"service.name",
238-
"text-generation-inference.router",
244+
otlp_service_name,
239245
)]))
240246
.with_sampler(Sampler::AlwaysOn),
241247
)

router/src/tracing.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
//! Inspired by: https://github.com/open-telemetry/opentelemetry-rust gRPC examples
2+
3+
use opentelemetry::{global, propagation::Extractor};
4+
use tonic::Request;
5+
use tracing::Span;
6+
use tracing_opentelemetry::OpenTelemetrySpanExt;
7+
8+
struct MetadataExtractor<'a>(&'a tonic::metadata::MetadataMap);
9+
10+
impl<'a> Extractor for MetadataExtractor<'a> {
11+
/// Get a value for a key from the MetadataMap. If the value can't be converted to &str, returns None
12+
fn get(&self, key: &str) -> Option<&str> {
13+
self.0.get(key).and_then(|metadata| metadata.to_str().ok())
14+
}
15+
16+
/// Collect all the keys from the MetadataMap.
17+
fn keys(&self) -> Vec<&str> {
18+
self.0
19+
.keys()
20+
.map(|key| match key {
21+
tonic::metadata::KeyRef::Ascii(v) => v.as_str(),
22+
tonic::metadata::KeyRef::Binary(v) => v.as_str(),
23+
})
24+
.collect::<Vec<_>>()
25+
}
26+
}
27+
28+
/// Extract context from metadata and set as current span's context
29+
fn extract(metadata: &tonic::metadata::MetadataMap) {
30+
let parent_cx =
31+
global::get_text_map_propagator(|prop| prop.extract(&MetadataExtractor(metadata)));
32+
Span::current().set_parent(parent_cx);
33+
}
34+
35+
pub trait ExtractTelemetryContext {
36+
fn extract_context(self) -> Self;
37+
}
38+
39+
impl<T> ExtractTelemetryContext for Request<T> {
40+
fn extract_context(self) -> Self {
41+
extract(self.metadata());
42+
self
43+
}
44+
}

0 commit comments

Comments
 (0)