Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 28 additions & 37 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 15 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -363,41 +363,45 @@ version = "0.3.19"

# OpenTelemetry
[workspace.dependencies.opentelemetry]
version = "0.27.1"
version = "0.28.0"
features = ["trace", "metrics"]

[workspace.dependencies.opentelemetry-http]
version = "0.27.0"
version = "0.28.0"
features = ["reqwest"]

[workspace.dependencies.opentelemetry-jaeger-propagator]
version = "0.27.0"
version = "0.28.0"

[workspace.dependencies.opentelemetry-otlp]
version = "0.27.0"
version = "0.28.0"
default-features = false
features = ["trace", "metrics", "http-proto"]

[workspace.dependencies.opentelemetry-prometheus]
version = "0.27.0"
version = "0.28.0"

[workspace.dependencies.opentelemetry-resource-detectors]
version = "0.6.0"
version = "0.7.0"

[workspace.dependencies.opentelemetry-semantic-conventions]
version = "0.27.0"
version = "0.28.0"
features = ["semconv_experimental"]

[workspace.dependencies.opentelemetry-stdout]
version = "0.27.0"
version = "0.28.0"
features = ["trace", "metrics"]

[workspace.dependencies.opentelemetry_sdk]
version = "0.27.1"
features = ["rt-tokio"]
version = "0.28.0"
features = [
"experimental_trace_batch_span_processor_with_async_runtime",
"experimental_metrics_periodicreader_with_async_runtime",
"rt-tokio",
]

[workspace.dependencies.tracing-opentelemetry]
version = "0.28.0"
version = "0.29.0"
default-features = false

[workspace.dependencies.prometheus]
Expand Down
4 changes: 3 additions & 1 deletion crates/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ async fn main() -> anyhow::Result<ExitCode> {
// chance to shutdown the telemetry exporters regardless of if there was an
// error or not
let res = try_main().await;
self::telemetry::shutdown();
if let Err(err) = self::telemetry::shutdown() {
eprintln!("Failed to shutdown telemetry exporters: {err}");
}
res
}

Expand Down
71 changes: 38 additions & 33 deletions crates/cli/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.

use std::{
sync::{LazyLock, OnceLock},
time::Duration,
};
use std::sync::{LazyLock, OnceLock};

use anyhow::Context as _;
use bytes::Bytes;
Expand All @@ -27,9 +24,11 @@ use opentelemetry_otlp::{WithExportConfig, WithHttpConfig};
use opentelemetry_prometheus::PrometheusExporter;
use opentelemetry_sdk::{
Resource,
metrics::{ManualReader, PeriodicReader, SdkMeterProvider},
metrics::{ManualReader, SdkMeterProvider, periodic_reader_with_async_runtime::PeriodicReader},
propagation::{BaggagePropagator, TraceContextPropagator},
trace::{Sampler, Tracer, TracerProvider},
trace::{
Sampler, SdkTracerProvider, Tracer, span_processor_with_async_runtime::BatchSpanProcessor,
},
};
use opentelemetry_semantic_conventions as semcov;
use prometheus::Registry;
Expand All @@ -47,6 +46,7 @@ pub static METER: LazyLock<Meter> =

pub static TRACER: OnceLock<Tracer> = OnceLock::new();
static METER_PROVIDER: OnceLock<SdkMeterProvider> = OnceLock::new();
static TRACER_PROVIDER: OnceLock<SdkTracerProvider> = OnceLock::new();
static PROMETHEUS_REGISTRY: OnceLock<Registry> = OnceLock::new();

pub fn setup(config: &TelemetryConfig) -> anyhow::Result<()> {
Expand All @@ -63,12 +63,16 @@ pub fn setup(config: &TelemetryConfig) -> anyhow::Result<()> {
Ok(())
}

pub fn shutdown() {
opentelemetry::global::shutdown_tracer_provider();
pub fn shutdown() -> opentelemetry_sdk::error::OTelSdkResult {
if let Some(tracer_provider) = TRACER_PROVIDER.get() {
tracer_provider.shutdown()?;
}

if let Some(meter_provider) = METER_PROVIDER.get() {
meter_provider.shutdown().unwrap();
meter_provider.shutdown()?;
}

Ok(())
}

fn match_propagator(propagator: Propagator) -> Box<dyn TextMapPropagator + Send + Sync> {
Expand All @@ -86,14 +90,14 @@ fn propagator(propagators: &[Propagator]) -> TextMapCompositePropagator {
TextMapCompositePropagator::new(propagators)
}

fn stdout_tracer_provider() -> TracerProvider {
fn stdout_tracer_provider() -> SdkTracerProvider {
let exporter = opentelemetry_stdout::SpanExporter::default();
TracerProvider::builder()
SdkTracerProvider::builder()
.with_simple_exporter(exporter)
.build()
}

fn otlp_tracer_provider(endpoint: Option<&Url>) -> anyhow::Result<TracerProvider> {
fn otlp_tracer_provider(endpoint: Option<&Url>) -> anyhow::Result<SdkTracerProvider> {
let mut exporter = opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_http_client(mas_http::reqwest_client());
Expand All @@ -104,8 +108,11 @@ fn otlp_tracer_provider(endpoint: Option<&Url>) -> anyhow::Result<TracerProvider
.build()
.context("Failed to configure OTLP trace exporter")?;

let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder()
.with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
let batch_processor =
BatchSpanProcessor::builder(exporter, opentelemetry_sdk::runtime::Tokio).build();

let tracer_provider = SdkTracerProvider::builder()
.with_span_processor(batch_processor)
.with_resource(resource())
.with_sampler(Sampler::AlwaysOn)
.build();
Expand All @@ -119,6 +126,9 @@ fn init_tracer(config: &TracingConfig) -> anyhow::Result<()> {
TracingExporterKind::Stdout => stdout_tracer_provider(),
TracingExporterKind::Otlp => otlp_tracer_provider(config.endpoint.as_ref())?,
};
TRACER_PROVIDER
.set(tracer_provider.clone())
.map_err(|_| anyhow::anyhow!("TRACER_PROVIDER was set twice"))?;

let tracer = tracer_provider.tracer_with_scope(SCOPE.clone());
TRACER
Expand Down Expand Up @@ -232,25 +242,20 @@ fn init_meter(config: &MetricsConfig) -> anyhow::Result<()> {
}

fn resource() -> Resource {
let resource = Resource::new([
KeyValue::new(semcov::resource::SERVICE_NAME, env!("CARGO_PKG_NAME")),
KeyValue::new(semcov::resource::SERVICE_VERSION, crate::VERSION),
KeyValue::new(semcov::resource::PROCESS_RUNTIME_NAME, "rust"),
KeyValue::new(
semcov::resource::PROCESS_RUNTIME_VERSION,
env!("VERGEN_RUSTC_SEMVER"),
),
]);

let detected = Resource::from_detectors(
Duration::from_secs(5),
vec![
Box::new(opentelemetry_sdk::resource::EnvResourceDetector::new()),
Resource::builder()
.with_service_name(env!("CARGO_PKG_NAME"))
.with_detectors(&[
Box::new(opentelemetry_resource_detectors::HostResourceDetector::default()),
Box::new(opentelemetry_resource_detectors::OsResourceDetector),
Box::new(opentelemetry_resource_detectors::ProcessResourceDetector),
Box::new(opentelemetry_sdk::resource::TelemetryResourceDetector),
],
);

resource.merge(&detected)
])
.with_attributes([
KeyValue::new(semcov::resource::SERVICE_VERSION, crate::VERSION),
KeyValue::new(semcov::resource::PROCESS_RUNTIME_NAME, "rust"),
KeyValue::new(
semcov::resource::PROCESS_RUNTIME_VERSION,
env!("VERGEN_RUSTC_SEMVER"),
),
])
.build()
}
1 change: 1 addition & 0 deletions crates/config/src/sections/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub struct TracingConfig {
pub endpoint: Option<Url>,

/// List of propagation formats to use for incoming and outgoing requests
#[serde(default)]
pub propagators: Vec<Propagator>,
}

Expand Down
Loading
Loading