Skip to content

Commit 4cba9b1

Browse files
committed
Upgrade opentelemetry libs to 0.29
1 parent a09b421 commit 4cba9b1

File tree

7 files changed

+128
-144
lines changed

7 files changed

+128
-144
lines changed

server/Cargo.lock

Lines changed: 22 additions & 26 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/svix-server/Cargo.toml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,11 @@ lapin = "3.7.2"
5858
num_enum = "0.7.2"
5959
omniqueue = { git = "https://github.com/svix/omniqueue-rs", rev = "4b1f3a056a0391c2d18bb85cd16a75ee9eddd640", default-features = false, features = ["in_memory", "rabbitmq-with-message-ids", "redis_cluster", "redis_sentinel", "beta"] }
6060
openssl = "0.10.72"
61-
opentelemetry = { version = "0.26.0", features = ["metrics"] }
62-
opentelemetry-http = "0.26.0"
63-
opentelemetry-otlp = { version = "0.26.0", features = ["metrics"] }
64-
opentelemetry-semantic-conventions = "0.26.0"
65-
opentelemetry_sdk = { version = "0.26.0", features = ["rt-tokio"] }
61+
opentelemetry = { version = "0.29.0", features = ["metrics"] }
62+
opentelemetry-http = "0.29.0"
63+
opentelemetry-otlp = { version = "0.29.0", default-features = false, features = ["grpc-tonic", "internal-logs", "metrics", "trace"] }
64+
opentelemetry-semantic-conventions = "0.29.0"
65+
opentelemetry_sdk = { version = "0.29.0", features = ["rt-tokio", "experimental_metrics_periodicreader_with_async_runtime", "experimental_trace_batch_span_processor_with_async_runtime"] }
6666
rand = "0.8.5"
6767
redis = { version = "0.32.5", features = ["tokio-comp", "tokio-native-tls-comp", "streams", "cluster-async", "tcp_nodelay", "connection-manager", "sentinel"] }
6868
regex = "1.5.5"
@@ -84,7 +84,7 @@ tokio-util = "0.7"
8484
tower = "0.5.1"
8585
tower-http = { version = "0.6.1", features = ["trace", "cors", "normalize-path", "request-id"] }
8686
tracing = "0.1.35"
87-
tracing-opentelemetry = "0.27.0"
87+
tracing-opentelemetry = "0.30.0"
8888
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
8989
url = { version = "2.2.2", features = ["serde"] }
9090
urlencoding = "2.1.2"

server/svix-server/src/lib.rs

Lines changed: 74 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,13 @@ use std::{borrow::Cow, sync::LazyLock, time::Duration};
88

99
use aide::axum::ApiRouter;
1010
use cfg::ConfigurationInner;
11-
use opentelemetry::trace::TracerProvider as _;
12-
use opentelemetry_otlp::WithExportConfig;
13-
use opentelemetry_sdk::{metrics::SdkMeterProvider, runtime::Tokio};
11+
use opentelemetry::{InstrumentationScope, trace::TracerProvider as _};
12+
use opentelemetry_otlp::WithExportConfig as _;
13+
use opentelemetry_sdk::{
14+
metrics::{SdkMeterProvider, periodic_reader_with_async_runtime::PeriodicReader},
15+
runtime,
16+
trace::{SdkTracerProvider, span_processor_with_async_runtime::BatchSpanProcessor},
17+
};
1418
use queue::TaskQueueProducer;
1519
use redis::RedisManager;
1620
use sea_orm::DatabaseConnection;
@@ -100,7 +104,7 @@ async fn graceful_shutdown_handler() {
100104
}
101105

102106
pub async fn run(cfg: Configuration) {
103-
let _metrics = setup_metrics(&cfg);
107+
setup_metrics(&cfg);
104108
run_with_prefix(cfg.queue_prefix.clone(), cfg, None).await
105109
}
106110

@@ -246,7 +250,11 @@ pub async fn run_with_prefix(
246250
pub fn setup_tracing(
247251
cfg: &ConfigurationInner,
248252
for_test: bool,
249-
) -> (tracing::Dispatch, sentry::ClientInitGuard) {
253+
) -> (
254+
tracing::Dispatch,
255+
sentry::ClientInitGuard,
256+
Option<SdkTracerProvider>,
257+
) {
250258
let filter_directives = std::env::var("RUST_LOG").unwrap_or_else(|e| {
251259
if let std::env::VarError::NotUnicode(_) = e {
252260
eprintln!("RUST_LOG environment variable has non-utf8 contents, ignoring!");
@@ -265,45 +273,47 @@ pub fn setup_tracing(
265273
var.join(",")
266274
});
267275

268-
let otel_layer = cfg.opentelemetry_address.as_ref().map(|addr| {
276+
let mapped = cfg.opentelemetry_address.as_ref().map(|addr| {
269277
// Configure the OpenTelemetry tracing layer
270278
opentelemetry::global::set_text_map_propagator(
271279
opentelemetry_sdk::propagation::TraceContextPropagator::new(),
272280
);
273281

274-
let exporter = opentelemetry_otlp::new_exporter()
275-
.tonic()
276-
.with_endpoint(addr);
277-
278-
let provider = opentelemetry_otlp::new_pipeline()
279-
.tracing()
280-
.with_exporter(exporter)
281-
.with_trace_config(
282-
opentelemetry_sdk::trace::Config::default()
283-
.with_sampler(
284-
cfg.opentelemetry_sample_ratio
285-
.map(opentelemetry_sdk::trace::Sampler::TraceIdRatioBased)
286-
.unwrap_or(opentelemetry_sdk::trace::Sampler::AlwaysOn),
287-
)
288-
.with_resource(opentelemetry_sdk::Resource::new(vec![
289-
opentelemetry::KeyValue::new(
290-
"service.name",
291-
cfg.opentelemetry_service_name.clone(),
292-
),
293-
])),
294-
)
295-
.install_batch(Tokio)
282+
let exporter = opentelemetry_otlp::SpanExporter::builder()
283+
.with_tonic()
284+
.with_endpoint(addr)
285+
.build()
296286
.unwrap();
297287

298-
// Based on the private `build_batch_with_exporter` method from opentelemetry-otlp
299-
let tracer = provider
300-
.tracer_builder("opentelemetry-otlp")
301-
.with_schema_url(opentelemetry_semantic_conventions::SCHEMA_URL)
288+
let provider = SdkTracerProvider::builder()
289+
.with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build())
290+
.with_sampler(
291+
cfg.opentelemetry_sample_ratio
292+
.map(opentelemetry_sdk::trace::Sampler::TraceIdRatioBased)
293+
.unwrap_or(opentelemetry_sdk::trace::Sampler::AlwaysOn),
294+
)
295+
.with_resource(
296+
opentelemetry_sdk::Resource::builder()
297+
.with_service_name(cfg.opentelemetry_service_name.clone())
298+
.build(),
299+
)
302300
.build();
303301

304-
tracing_opentelemetry::layer().with_tracer(tracer)
302+
// Based on the private `build_batch_with_exporter` method from opentelemetry-otlp
303+
let layer = tracing_opentelemetry::layer().with_tracer(
304+
provider.tracer_with_scope(
305+
InstrumentationScope::builder("opentelemetry-otlp")
306+
.with_schema_url(opentelemetry_semantic_conventions::SCHEMA_URL)
307+
.build(),
308+
),
309+
);
310+
311+
_ = opentelemetry::global::set_tracer_provider(provider.clone());
312+
(layer, provider)
305313
});
306314

315+
let (otel_layer, otel_tracer_provider) = mapped.unzip();
316+
307317
let sentry_guard = sentry::init(sentry::ClientOptions {
308318
dsn: cfg.sentry_dsn.clone(),
309319
environment: Some(Cow::Owned(cfg.environment.to_string())),
@@ -336,40 +346,46 @@ pub fn setup_tracing(
336346
}
337347
};
338348

339-
let registry = tracing_subscriber::Registry::default()
349+
let dispatch = tracing_subscriber::Registry::default()
340350
.with(otel_layer)
341351
.with(sentry_layer)
342352
.with(stdout_layer)
343353
.with(tracing_subscriber::EnvFilter::new(filter_directives))
344354
.into();
345355

346-
(registry, sentry_guard)
356+
(dispatch, sentry_guard, otel_tracer_provider)
347357
}
348358

349-
pub fn setup_metrics(cfg: &ConfigurationInner) -> Option<SdkMeterProvider> {
350-
cfg.opentelemetry_address.as_ref().map(|addr| {
351-
let exporter = opentelemetry_otlp::new_exporter()
352-
.tonic()
353-
.with_endpoint(addr);
354-
355-
opentelemetry_otlp::new_pipeline()
356-
.metrics(Tokio)
357-
.with_delta_temporality()
358-
.with_exporter(exporter)
359-
.with_resource(opentelemetry_sdk::Resource::new(vec![
360-
opentelemetry::KeyValue::new(
361-
"service.name",
362-
cfg.opentelemetry_service_name.clone(),
363-
),
364-
opentelemetry::KeyValue::new("instance_id", INSTANCE_ID.to_owned()),
365-
opentelemetry::KeyValue::new(
366-
"service.version",
367-
option_env!("GITHUB_SHA").unwrap_or("unknown"),
368-
),
369-
]))
359+
pub fn setup_metrics(cfg: &ConfigurationInner) {
360+
if let Some(addr) = &cfg.opentelemetry_address {
361+
let exporter = opentelemetry_otlp::MetricExporter::builder()
362+
.with_tonic()
363+
.with_endpoint(addr)
364+
.with_temporality(opentelemetry_sdk::metrics::Temporality::Delta)
370365
.build()
371-
.unwrap()
372-
})
366+
.unwrap();
367+
368+
let reader = PeriodicReader::builder(exporter, runtime::Tokio).build();
369+
370+
let provider = SdkMeterProvider::builder()
371+
.with_reader(reader)
372+
.with_resource(
373+
opentelemetry_sdk::Resource::builder()
374+
.with_service_name(cfg.opentelemetry_service_name.clone())
375+
.with_attribute(opentelemetry::KeyValue::new(
376+
"instance_id",
377+
INSTANCE_ID.as_str(),
378+
))
379+
.with_attribute(opentelemetry::KeyValue::new(
380+
"service.version",
381+
option_env!("GITHUB_SHA").unwrap_or("unknown"),
382+
))
383+
.build(),
384+
)
385+
.build();
386+
387+
opentelemetry::global::set_meter_provider(provider);
388+
}
373389
}
374390

375391
pub fn setup_tracing_for_tests() {

server/svix-server/src/main.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ async fn main() -> anyhow::Result<()> {
141141

142142
let cfg = cfg::load()?;
143143

144-
let (tracing_subscriber, _guard) = setup_tracing(&cfg, /* for_test = */ false);
144+
let (tracing_subscriber, _guard, otel_tracer_provider) =
145+
setup_tracing(&cfg, /* for_test = */ false);
145146
tracing_subscriber.init();
146147

147148
if let Some(wait_for_seconds) = args.wait_for {
@@ -232,6 +233,12 @@ async fn main() -> anyhow::Result<()> {
232233
}
233234
};
234235

235-
opentelemetry::global::shutdown_tracer_provider();
236+
if let Some(provider) = otel_tracer_provider {
237+
_ = tokio::task::spawn_blocking(move || {
238+
_ = provider.shutdown();
239+
})
240+
.await;
241+
}
242+
236243
Ok(())
237244
}
Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,3 @@
11
mod redis;
22

3-
pub fn init_metric<T, E: std::fmt::Debug>(result: Result<T, E>) -> Option<T> {
4-
match result {
5-
Ok(t) => Some(t),
6-
Err(e) => {
7-
tracing::error!(error = ?e, "Failed to initialize metric");
8-
None
9-
}
10-
}
11-
}
12-
133
pub use self::redis::{RedisQueueMetrics, RedisQueueType};

0 commit comments

Comments
 (0)