Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion examples/tracing-http-propagator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,8 @@ tokio = { workspace = true, features = ["full"] }
opentelemetry = { path = "../../opentelemetry" }
opentelemetry_sdk = { path = "../../opentelemetry-sdk" }
opentelemetry-http = { path = "../../opentelemetry-http" }
opentelemetry-stdout = { workspace = true, features = ["trace"] }
opentelemetry-stdout = { workspace = true, features = ["trace", "logs"] }
opentelemetry-semantic-conventions = { path = "../../opentelemetry-semantic-conventions" }
opentelemetry-appender-tracing = { workspace = true }
tracing = { workspace = true, features = ["std"]}
tracing-subscriber = { workspace = true, features = ["env-filter","registry", "std", "fmt"] }
48 changes: 38 additions & 10 deletions examples/tracing-http-propagator/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,42 @@ use hyper_util::{client::legacy::Client, rt::TokioExecutor};
use opentelemetry::{
global,
trace::{SpanKind, TraceContextExt, Tracer},
Context, KeyValue,
Context,
};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_http::{Bytes, HeaderInjector};
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::SdkTracerProvider};
use opentelemetry_stdout::SpanExporter;
use opentelemetry_sdk::{
logs::SdkLoggerProvider, propagation::TraceContextPropagator, trace::SdkTracerProvider,
};
use opentelemetry_stdout::{LogExporter, SpanExporter};
use tracing::info;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

fn init_tracer() {
fn init_tracer() -> SdkTracerProvider {
global::set_text_map_propagator(TraceContextPropagator::new());
// Install stdout exporter pipeline to be able to retrieve the collected spans.
// For the demonstration, use `Sampler::AlwaysOn` sampler to sample all traces.
let provider = SdkTracerProvider::builder()
.with_simple_exporter(SpanExporter::default())
.build();

global::set_tracer_provider(provider);
global::set_tracer_provider(provider.clone());
provider
}

fn init_logs() -> SdkLoggerProvider {
// Setup tracerprovider with stdout exporter
// that prints the spans to stdout.
let logger_provider = SdkLoggerProvider::builder()
.with_simple_exporter(LogExporter::default())
.build();
let otel_layer = OpenTelemetryTracingBridge::new(&logger_provider);
tracing_subscriber::registry()
.with(otel_layer)
.with(tracing_subscriber::filter::LevelFilter::INFO)
.init();

logger_provider
}

async fn send_request(
Expand All @@ -37,21 +58,22 @@ async fn send_request(
global::get_text_map_propagator(|propagator| {
propagator.inject_context(&cx, &mut HeaderInjector(req.headers_mut().unwrap()))
});
req.headers_mut()
.unwrap()
.insert("baggage", "is_synthetic=true".parse().unwrap());
let res = client
.request(req.body(Full::new(Bytes::from(body_content.to_string())))?)
.await?;

cx.span().add_event(
"Got response!",
vec![KeyValue::new("status", res.status().to_string())],
);
info!(name: "ResponseReceived", status = res.status().to_string(), message = "Response received");

Ok(())
}

#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
init_tracer();
let tracer_provider = init_tracer();
let logger_provider = init_logs();

send_request(
"http://127.0.0.1:3000/health",
Expand All @@ -66,5 +88,11 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error + Send + Sy
)
.await?;

tracer_provider
.shutdown()
.expect("Shutdown provider failed");
logger_provider
.shutdown()
.expect("Shutdown provider failed");
Ok(())
}
102 changes: 91 additions & 11 deletions examples/tracing-http-propagator/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,27 @@ use http_body_util::{combinators::BoxBody, BodyExt, Full};
use hyper::{body::Incoming, service::service_fn, Request, Response, StatusCode};
use hyper_util::rt::{TokioExecutor, TokioIo};
use opentelemetry::{
baggage::BaggageExt,
global::{self, BoxedTracer},
logs::LogRecord,
propagation::TextMapCompositePropagator,
trace::{FutureExt, Span, SpanKind, TraceContextExt, Tracer},
Context, KeyValue,
Context, InstrumentationScope, KeyValue,
};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_http::{Bytes, HeaderExtractor};
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::SdkTracerProvider};
use opentelemetry_sdk::{
error::OTelSdkResult,
logs::{LogProcessor, SdkLogRecord, SdkLoggerProvider},
propagation::{BaggagePropagator, TraceContextPropagator},
trace::{SdkTracerProvider, SpanProcessor},
};
use opentelemetry_semantic_conventions::trace;
use opentelemetry_stdout::SpanExporter;
use opentelemetry_stdout::{LogExporter, SpanExporter};
use std::{convert::Infallible, net::SocketAddr, sync::OnceLock};
use tokio::net::TcpListener;
use tracing::info;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

fn get_tracer() -> &'static BoxedTracer {
static TRACER: OnceLock<BoxedTracer> = OnceLock::new();
Expand All @@ -30,11 +41,11 @@ async fn handle_health_check(
_req: Request<Incoming>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, Infallible> {
let tracer = get_tracer();
let mut span = tracer
let _ = tracer
.span_builder("health_check")
.with_kind(SpanKind::Internal)
.start(tracer);
span.add_event("Health check accessed", vec![]);
info!(name: "health_check", message = "Health check endpoint hit");

let res = Response::new(
Full::new(Bytes::from_static(b"Server is up and running!"))
Expand All @@ -50,11 +61,11 @@ async fn handle_echo(
req: Request<Incoming>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, Infallible> {
let tracer = get_tracer();
let mut span = tracer
let _ = tracer
.span_builder("echo")
.with_kind(SpanKind::Internal)
.start(tracer);
span.add_event("Echoing back the request", vec![]);
info!(name = "echo", message = "Echo endpoint hit");

let res = Response::new(req.into_body().boxed());

Expand All @@ -69,14 +80,14 @@ async fn router(
let response = {
// Create a span parenting the remote client span.
let tracer = get_tracer();
let mut span = tracer
let span = tracer
.span_builder("router")
.with_kind(SpanKind::Server)
.start_with_context(tracer, &parent_cx);

span.add_event("dispatching request", vec![]);
info!(name = "router", message = "Dispatching request");

let cx = Context::default().with_span(span);
let cx = parent_cx.with_span(span);
match (req.method(), req.uri().path()) {
(&hyper::Method::GET, "/health") => handle_health_check(req).with_context(cx).await,
(&hyper::Method::GET, "/echo") => handle_echo(req).with_context(cx).await,
Expand All @@ -93,24 +104,90 @@ async fn router(
response
}

/// A custom log processor that enriches LogRecords with baggage attributes.
/// Baggage information is not added automatically without this processor.
#[derive(Debug)]
struct EnrichWithBaggageLogProcessor;
impl LogProcessor for EnrichWithBaggageLogProcessor {
fn emit(&self, data: &mut SdkLogRecord, _instrumentation: &InstrumentationScope) {
Context::map_current(|cx| {
for (kk, vv) in cx.baggage().iter() {
data.add_attribute(kk.clone(), vv.0.clone());
}
});
}

fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
Ok(())
}
}

/// A custom span processor that enriches spans with baggage attributes. Baggage
/// information is not added automatically without this processor.
#[derive(Debug)]
struct EnrichWithBaggageSpanProcessor;
impl SpanProcessor for EnrichWithBaggageSpanProcessor {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
Ok(())
}

fn on_start(&self, span: &mut opentelemetry_sdk::trace::Span, cx: &Context) {
for (kk, vv) in cx.baggage().iter() {
span.set_attribute(KeyValue::new(kk.clone(), vv.0.clone()));
}
}

fn on_end(&self, _span: opentelemetry_sdk::trace::SpanData) {}
}

fn init_tracer() -> SdkTracerProvider {
global::set_text_map_propagator(TraceContextPropagator::new());
let baggage_propagator = BaggagePropagator::new();
let trace_context_propagator = TraceContextPropagator::new();
let composite_propagator = TextMapCompositePropagator::new(vec![
Box::new(baggage_propagator),
Box::new(trace_context_propagator),
]);

global::set_text_map_propagator(composite_propagator);

// Setup tracerprovider with stdout exporter
// that prints the spans to stdout.
let provider = SdkTracerProvider::builder()
.with_span_processor(EnrichWithBaggageSpanProcessor)
.with_simple_exporter(SpanExporter::default())
.build();

global::set_tracer_provider(provider.clone());
provider
}

fn init_logs() -> SdkLoggerProvider {
// Setup tracerprovider with stdout exporter
// that prints the spans to stdout.
let logger_provider = SdkLoggerProvider::builder()
.with_log_processor(EnrichWithBaggageLogProcessor)
.with_simple_exporter(LogExporter::default())
.build();
let otel_layer = OpenTelemetryTracingBridge::new(&logger_provider);
tracing_subscriber::registry().with(otel_layer).init();

logger_provider
}

#[tokio::main]
async fn main() {
use hyper_util::server::conn::auto::Builder;

let provider = init_tracer();
let logger_provider = init_logs();
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let listener = TcpListener::bind(addr).await.unwrap();

Expand All @@ -124,4 +201,7 @@ async fn main() {
}

provider.shutdown().expect("Shutdown provider failed");
logger_provider
.shutdown()
.expect("Shutdown provider failed");
}