Skip to content

Commit 9533feb

Browse files
committed
Introduce load-balanced channel for OpenTelemetry exporters
Add client-side load balancing to OTLP gRPC connections using ginepro. When NL_OTEL_ENDPOINT is set, the telemetry system creates a load-balanced channel shared across log, trace, and metric exporters. This enables better distribution of telemetry traffic across multiple OTLP collector instances and improves overall system resilience. - Add ginepro dependency for gRPC load balancing - Upgrade OpenTelemetry dependencies from 0.29 to 0.30 - Change init_tracing() to async to support channel initialization - Add NL_OTEL_ENDPOINT environment variable for configuration - Update all OTLP exporters to use shared load-balanced channel
1 parent 23611ca commit 9533feb

File tree

10 files changed

+340
-108
lines changed

10 files changed

+340
-108
lines changed

Cargo.lock

Lines changed: 266 additions & 85 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

nativelink-scheduler/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ bytes = { version = "1.10.1", default-features = false }
2020
futures = { version = "0.3.31", default-features = false }
2121
lru = { version = "0.16.0", default-features = false }
2222
mock_instant = { version = "0.5.3", default-features = false }
23-
opentelemetry = { version = "0.29.1", default-features = false }
24-
opentelemetry-semantic-conventions = { version = "0.29.0", default-features = false, features = [
23+
opentelemetry = { version = "0.30.0", default-features = false }
24+
opentelemetry-semantic-conventions = { version = "0.30.0", default-features = false, features = [
2525
"default",
2626
"semconv_experimental",
2727
] }

nativelink-service/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ bytes = { version = "1.10.1", default-features = false }
2020
futures = { version = "0.3.31", default-features = false }
2121
http-body-util = { version = "0.1.3", default-features = false }
2222
hyper = { version = "1.6.0", default-features = false }
23-
opentelemetry = { version = "0.29.1", default-features = false }
24-
opentelemetry-semantic-conventions = { version = "0.29.0", default-features = false, features = [
23+
opentelemetry = { version = "0.30.0", default-features = false }
24+
opentelemetry-semantic-conventions = { version = "0.30.0", default-features = false, features = [
2525
"default",
2626
"semconv_experimental",
2727
] }

nativelink-store/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ mongodb = { version = "3", features = [
6464
"compat-3-0-0",
6565
"rustls-tls",
6666
], default-features = false }
67-
opentelemetry = { version = "0.29.1", default-features = false }
67+
opentelemetry = { version = "0.30.0", default-features = false }
6868
parking_lot = { version = "0.12.3", features = [
6969
"arc_lock",
7070
"send_guard",

nativelink-util/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ rust_library(
5555
"@crates//:blake3",
5656
"@crates//:bytes",
5757
"@crates//:futures",
58+
"@crates//:ginepro",
5859
"@crates//:hex",
5960
"@crates//:humantime",
6061
"@crates//:hyper-1.7.0",
@@ -84,6 +85,7 @@ rust_library(
8485
"@crates//:tracing",
8586
"@crates//:tracing-opentelemetry",
8687
"@crates//:tracing-subscriber",
88+
"@crates//:url",
8789
"@crates//:uuid",
8890
"@crates//:walkdir",
8991
],

nativelink-util/Cargo.toml

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,21 @@ hyper-util = { version = "0.1.11", default-features = false }
2727
libc = { version = "0.2.177", default-features = false }
2828
lru = { version = "0.16.0", default-features = false }
2929
mock_instant = { version = "0.5.3", default-features = false }
30-
opentelemetry = { version = "0.29.0", default-features = false }
31-
opentelemetry-appender-tracing = { version = "0.29.1", default-features = false }
32-
opentelemetry-http = { version = "0.29.0", default-features = false }
33-
opentelemetry-otlp = { version = "0.29.0", default-features = false, features = [
30+
opentelemetry = { version = "0.30.0", default-features = false }
31+
opentelemetry-appender-tracing = { version = "0.30.0", default-features = false }
32+
opentelemetry-http = { version = "0.30.0", default-features = false }
33+
opentelemetry-otlp = { version = "0.30.0", default-features = false, features = [
3434
"grpc-tonic",
3535
"logs",
3636
"metrics",
3737
"trace",
3838
"zstd-tonic",
3939
] }
40-
opentelemetry-semantic-conventions = { version = "0.29.0", default-features = false, features = [
40+
opentelemetry-semantic-conventions = { version = "0.30.0", default-features = false, features = [
4141
"default",
4242
"semconv_experimental",
4343
] }
44-
opentelemetry_sdk = { version = "0.29.0", default-features = false }
44+
opentelemetry_sdk = { version = "0.30.0", default-features = false }
4545
parking_lot = { version = "0.12.3", features = [
4646
"arc_lock",
4747
"send_guard",
@@ -76,7 +76,7 @@ tonic = { version = "0.13.0", features = [
7676
], default-features = false }
7777
tower = { version = "0.5.2", default-features = false }
7878
tracing = { version = "0.1.41", default-features = false }
79-
tracing-opentelemetry = { version = "0.30.0", default-features = false, features = [
79+
tracing-opentelemetry = { version = "0.31.0", default-features = false, features = [
8080
"metrics",
8181
] }
8282
tracing-subscriber = { version = "0.3.19", features = [
@@ -86,6 +86,8 @@ tracing-subscriber = { version = "0.3.19", features = [
8686
], default-features = false }
8787
tracing-test = { version = "0.2.5", default-features = false, features = [] }
8888

89+
ginepro = "0.9.0"
90+
url = "2.5.7"
8991
uuid = { version = "1.16.0", default-features = false, features = [
9092
"serde",
9193
"v4",

nativelink-util/src/telemetry.rs

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::sync::OnceLock;
1818

1919
use base64::Engine;
2020
use base64::prelude::BASE64_STANDARD_NO_PAD;
21+
use ginepro::LoadBalancedChannel;
2122
use hyper::http::Response;
2223
use nativelink_error::{Code, ResultExt, make_err};
2324
use nativelink_proto::build::bazel::remote::execution::v2::RequestMetadata;
@@ -26,7 +27,9 @@ use opentelemetry::trace::{TraceContextExt, Tracer, TracerProvider};
2627
use opentelemetry::{KeyValue, global};
2728
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
2829
use opentelemetry_http::HeaderExtractor;
29-
use opentelemetry_otlp::{LogExporter, MetricExporter, Protocol, SpanExporter, WithExportConfig};
30+
use opentelemetry_otlp::{
31+
LogExporter, MetricExporter, Protocol, SpanExporter, WithExportConfig, WithTonicConfig,
32+
};
3033
use opentelemetry_sdk::Resource;
3134
use opentelemetry_sdk::logs::SdkLoggerProvider;
3235
use opentelemetry_sdk::metrics::SdkMeterProvider;
@@ -103,7 +106,7 @@ fn tracing_stdout_layer() -> impl Layer<Registry> {
103106
///
104107
/// Returns `Err` if logging was already initialized or if the exporters can't
105108
/// be initialized.
106-
pub fn init_tracing() -> Result<(), nativelink_error::Error> {
109+
pub async fn init_tracing() -> Result<(), nativelink_error::Error> {
107110
static INITIALIZED: OnceLock<()> = OnceLock::new();
108111

109112
if INITIALIZED.get().is_some() {
@@ -128,13 +131,18 @@ pub fn init_tracing() -> Result<(), nativelink_error::Error> {
128131
]);
129132
global::set_text_map_propagator(propagator);
130133

134+
let maybe_channel = maybe_load_balanced_channel().await;
135+
131136
// Logs
137+
let mut log_exporter_builder = LogExporter::builder().with_tonic();
138+
if let Some(channel) = maybe_channel.clone() {
139+
log_exporter_builder = log_exporter_builder.with_channel(channel.into());
140+
}
132141
let otlp_log_layer = OpenTelemetryTracingBridge::new(
133142
&SdkLoggerProvider::builder()
134143
.with_resource(resource.clone())
135144
.with_batch_exporter(
136-
LogExporter::builder()
137-
.with_tonic()
145+
log_exporter_builder
138146
.with_protocol(Protocol::Grpc)
139147
.build()
140148
.map_err(|e| make_err!(Code::Internal, "{e}"))
@@ -145,13 +153,16 @@ pub fn init_tracing() -> Result<(), nativelink_error::Error> {
145153
.with_filter(otlp_filter());
146154

147155
// Traces
156+
let mut span_exporter_builder = SpanExporter::builder().with_tonic();
157+
if let Some(channel) = maybe_channel.clone() {
158+
span_exporter_builder = span_exporter_builder.with_channel(channel.into());
159+
}
148160
let otlp_trace_layer = layer()
149161
.with_tracer(
150162
SdkTracerProvider::builder()
151163
.with_resource(resource.clone())
152164
.with_batch_exporter(
153-
SpanExporter::builder()
154-
.with_tonic()
165+
span_exporter_builder
155166
.with_protocol(Protocol::Grpc)
156167
.build()
157168
.map_err(|e| make_err!(Code::Internal, "{e}"))
@@ -163,11 +174,14 @@ pub fn init_tracing() -> Result<(), nativelink_error::Error> {
163174
.with_filter(otlp_filter());
164175

165176
// Metrics
177+
let mut metric_exporter_builder = MetricExporter::builder().with_tonic();
178+
if let Some(channel) = maybe_channel {
179+
metric_exporter_builder = metric_exporter_builder.with_channel(channel.into());
180+
}
166181
let meter_provider = SdkMeterProvider::builder()
167182
.with_resource(resource)
168183
.with_periodic_exporter(
169-
MetricExporter::builder()
170-
.with_tonic()
184+
metric_exporter_builder
171185
.with_protocol(Protocol::Grpc)
172186
.build()
173187
.map_err(|e| make_err!(Code::Internal, "{e}"))
@@ -191,6 +205,38 @@ pub fn init_tracing() -> Result<(), nativelink_error::Error> {
191205
Ok(())
192206
}
193207

208+
const NL_OTEL_ENDPOINT: &str = "NL_OTEL_ENDPOINT";
209+
210+
async fn maybe_load_balanced_channel() -> Option<LoadBalancedChannel> {
211+
match env::var(NL_OTEL_ENDPOINT) {
212+
Ok(endpoint) => {
213+
let url = Url::parse(endpoint.as_str())
214+
.map_err(|e| {
215+
make_err!(Code::Internal, "Unable to parse endpoint {endpoint}: {e:?}")
216+
})
217+
.unwrap();
218+
219+
let host = url
220+
.host()
221+
.err_tip(|| format!("Unable to get host from endpoint {endpoint}"))
222+
.unwrap();
223+
let port = url
224+
.port()
225+
.err_tip(|| format!("Unable to get port from endpoint {endpoint}"))
226+
.unwrap();
227+
228+
Some(
229+
LoadBalancedChannel::builder((host.to_string(), port))
230+
.channel()
231+
.await
232+
.map_err(|e| make_err!(Code::Internal, "Invalid hostname '{endpoint}': {e}"))
233+
.unwrap(),
234+
)
235+
}
236+
Err(_) => None,
237+
}
238+
}
239+
194240
/// Custom metadata key field for Bazel metadata.
195241
const BAZEL_METADATA_KEY: &str = "bazel.metadata";
196242

@@ -201,6 +247,7 @@ const BAZEL_REQUESTMETADATA_HEADER: &str = "build.bazel.remote.execution.v2.requ
201247

202248
use opentelemetry::baggage::BaggageExt;
203249
use opentelemetry::context::FutureExt;
250+
use url::Url;
204251

205252
#[derive(Debug, Clone)]
206253
pub struct OtlpMiddleware<S> {

nativelink-worker/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ bytes = { version = "1.10.1", default-features = false }
2222
filetime = { version = "0.2.25", default-features = false }
2323
formatx = { version = "0.2.3", default-features = false }
2424
futures = { version = "0.3.31", default-features = false }
25-
opentelemetry = { version = "0.29.1", default-features = false }
25+
opentelemetry = { version = "0.30.0", default-features = false }
2626
parking_lot = { version = "0.12.3", default-features = false }
2727
prost = { version = "0.13.5", default-features = false }
2828
relative-path = { version = "2.0.0", default-features = false, features = [

src/bin/nativelink.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -719,7 +719,7 @@ fn main() -> Result<(), Box<dyn core::error::Error>> {
719719
// The OTLP exporters need to run in a Tokio context
720720
// Do this first so all the other logging works
721721
#[expect(clippy::disallowed_methods, reason = "tracing init on main runtime")]
722-
runtime.block_on(async { tokio::spawn(async { init_tracing() }).await? })?;
722+
runtime.block_on(async { tokio::spawn(async { init_tracing().await }).await? })?;
723723

724724
let mut cfg = get_config()?;
725725

src/bin/redis_store_tester.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ fn main() -> Result<(), Box<dyn core::error::Error>> {
305305
.unwrap()
306306
.block_on(async {
307307
// The OTLP exporters need to run in a Tokio context.
308-
spawn!("init tracing", async { init_tracing() })
308+
spawn!("init tracing", async { init_tracing().await })
309309
.await?
310310
.expect("Init tracing should work");
311311

0 commit comments

Comments
 (0)