Skip to content

Commit 261dd53

Browse files
lquereljmacd
andauthored
OTLP Exporter Optimizations (open-telemetry#1474)
Support for multiple simultaneous client connections to improve throughput and the ack/nack system. --------- Co-authored-by: Joshua MacDonald <jmacd@users.noreply.github.com>
1 parent 48d70ac commit 261dd53

File tree

7 files changed

+644
-241
lines changed

7 files changed

+644
-241
lines changed

rust/otap-dataflow/Cargo.toml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,16 +68,16 @@ flume = { version = "0.11.1", default-features = false, features = ["async"] }
6868
futures = "0.3.31"
6969
futures-channel = "0.3"
7070
futures-timer = "3.0"
71-
http = "1.3"
72-
humantime = "2.2.0"
71+
http = "1.4"
72+
humantime = "2.3.0"
7373
humantime-serde = "1.1.1"
7474
itertools = "0.14.0"
75-
linkme = "0.3.33"
75+
linkme = "0.3.35"
7676
local-sync = "0.1.1"
7777
log = "0.4"
7878
miette = { version="7.6.0", features = ["fancy"] }
7979
mimalloc-rust = "0.2.1"
80-
nix = { version = "0.30.0", features = ["process", "signal"] }
80+
nix = { version = "0.30.1", features = ["process", "signal"] }
8181
num_enum = "0.7"
8282
object_store = "0.12.3"
8383
once_cell = "1.20.2"
@@ -102,13 +102,13 @@ replace_with = "0.1.8"
102102
simdutf8 = "0.1.5"
103103
slotmap = "1.0.7"
104104
smallvec = { version = "1.15" , features = ["union"] }
105-
socket2 = { version = "0.6.0", features = ["all"] }
105+
socket2 = { version = "0.6.1", features = ["all"] }
106106
syn = { version = "2.0", features = ["full", "extra-traits"] }
107107
tempfile = "3"
108-
thiserror = "2.0.12"
109-
tokio = { version = "1.46.1", features = ["rt", "time", "net", "io-util", "sync", "macros", "rt-multi-thread", "fs", "io-std", "process"] }
108+
thiserror = "2.0.17"
109+
tokio = { version = "1.48.0", features = ["rt", "time", "net", "io-util", "sync", "macros", "rt-multi-thread", "fs", "io-std", "process"] }
110110
tokio-stream = "0.1.17"
111-
tokio-util = { version = "0.7.16" }
111+
tokio-util = { version = "0.7.17" }
112112
tonic = { version = "0.14", default-features = false, features = [
113113
"channel",
114114
"codegen",

rust/otap-dataflow/crates/otap/src/metrics.rs

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,26 +55,47 @@ pub struct ExporterPDataMetrics {
5555

5656
impl ExporterPDataMetrics {
5757
pub fn inc_consumed(&mut self, st: SignalType) {
58+
self.add_consumed(st, 1);
59+
}
60+
61+
pub fn inc_exported(&mut self, st: SignalType) {
62+
self.add_exported(st, 1);
63+
}
64+
65+
pub fn inc_failed(&mut self, st: SignalType) {
66+
self.add_failed(st, 1);
67+
}
68+
69+
pub fn add_consumed(&mut self, st: SignalType, count: u64) {
70+
if count == 0 {
71+
return;
72+
}
5873
match st {
59-
SignalType::Metrics => self.metrics_consumed.inc(),
60-
SignalType::Logs => self.logs_consumed.inc(),
61-
SignalType::Traces => self.traces_consumed.inc(),
74+
SignalType::Metrics => self.metrics_consumed.add(count),
75+
SignalType::Logs => self.logs_consumed.add(count),
76+
SignalType::Traces => self.traces_consumed.add(count),
6277
}
6378
}
6479

65-
pub fn inc_exported(&mut self, st: SignalType) {
80+
pub fn add_exported(&mut self, st: SignalType, count: u64) {
81+
if count == 0 {
82+
return;
83+
}
6684
match st {
67-
SignalType::Metrics => self.metrics_exported.inc(),
68-
SignalType::Logs => self.logs_exported.inc(),
69-
SignalType::Traces => self.traces_exported.inc(),
85+
SignalType::Metrics => self.metrics_exported.add(count),
86+
SignalType::Logs => self.logs_exported.add(count),
87+
SignalType::Traces => self.traces_exported.add(count),
7088
}
7189
}
7290

73-
pub fn inc_failed(&mut self, st: SignalType) {
91+
pub fn add_failed(&mut self, st: SignalType, count: u64) {
92+
if count == 0 {
93+
return;
94+
}
7495
match st {
75-
SignalType::Metrics => self.metrics_failed.inc(),
76-
SignalType::Logs => self.logs_failed.inc(),
77-
SignalType::Traces => self.traces_failed.inc(),
96+
SignalType::Metrics => self.metrics_failed.add(count),
97+
SignalType::Logs => self.logs_failed.add(count),
98+
SignalType::Traces => self.traces_failed.add(count),
7899
}
79100
}
80101
}

rust/otap-dataflow/crates/otap/src/otap_grpc.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use crate::{
3232
pdata::{Context, OtapPdata},
3333
};
3434

35+
pub mod client_settings;
3536
pub mod middleware;
3637
pub mod otlp;
3738
pub mod server_settings;

rust/otap-dataflow/crates/otap/src/otap_grpc/client_settings.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub struct GrpcClientSettings {
1818
pub grpc_endpoint: String,
1919

2020
/// Compression method to use for outbound requests. Defaults to no compression.
21-
#[serde(default)]
21+
#[serde(default, alias = "compression_method")]
2222
pub compression: Option<CompressionMethod>,
2323

2424
/// Maximum number of concurrent in-flight requests allowed by the transport stack.
@@ -162,35 +162,35 @@ const fn default_concurrency_limit() -> usize {
162162
256
163163
}
164164

165-
fn default_connect_timeout() -> Duration {
165+
const fn default_connect_timeout() -> Duration {
166166
Duration::from_secs(3)
167167
}
168168

169169
const fn default_tcp_nodelay() -> bool {
170170
true
171171
}
172172

173-
fn default_tcp_keepalive() -> Option<Duration> {
173+
const fn default_tcp_keepalive() -> Option<Duration> {
174174
Some(Duration::from_secs(45))
175175
}
176176

177-
fn default_initial_stream_window_size() -> Option<u32> {
177+
const fn default_initial_stream_window_size() -> Option<u32> {
178178
Some(8 * 1024 * 1024)
179179
}
180180

181-
fn default_initial_connection_window_size() -> Option<u32> {
181+
const fn default_initial_connection_window_size() -> Option<u32> {
182182
Some(32 * 1024 * 1024)
183183
}
184184

185185
const fn default_http2_adaptive_window() -> bool {
186186
false
187187
}
188188

189-
fn default_http2_keepalive_interval() -> Option<Duration> {
189+
const fn default_http2_keepalive_interval() -> Option<Duration> {
190190
Some(Duration::from_secs(30))
191191
}
192192

193-
fn default_http2_keepalive_timeout() -> Option<Duration> {
193+
const fn default_http2_keepalive_timeout() -> Option<Duration> {
194194
Some(Duration::from_secs(10))
195195
}
196196

rust/otap-dataflow/crates/otap/src/otap_grpc/server_settings.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -253,43 +253,43 @@ const fn default_tcp_nodelay() -> bool {
253253
true
254254
}
255255

256-
fn default_tcp_keepalive() -> Option<Duration> {
256+
const fn default_tcp_keepalive() -> Option<Duration> {
257257
Some(Duration::from_secs(45))
258258
}
259259

260-
fn default_tcp_keepalive_interval() -> Option<Duration> {
260+
const fn default_tcp_keepalive_interval() -> Option<Duration> {
261261
Some(Duration::from_secs(15))
262262
}
263263

264-
fn default_tcp_keepalive_retries() -> Option<u32> {
264+
const fn default_tcp_keepalive_retries() -> Option<u32> {
265265
Some(5)
266266
}
267267

268268
const fn default_load_shed() -> bool {
269269
true
270270
}
271271

272-
fn default_initial_stream_window_size() -> Option<u32> {
272+
const fn default_initial_stream_window_size() -> Option<u32> {
273273
Some(8 * 1024 * 1024)
274274
}
275275

276-
fn default_initial_connection_window_size() -> Option<u32> {
276+
const fn default_initial_connection_window_size() -> Option<u32> {
277277
Some(24 * 1024 * 1024)
278278
}
279279

280-
fn default_max_frame_size() -> Option<u32> {
280+
const fn default_max_frame_size() -> Option<u32> {
281281
Some(16 * 1024)
282282
}
283283

284-
fn default_max_decoding_message_size() -> Option<u32> {
284+
const fn default_max_decoding_message_size() -> Option<u32> {
285285
Some(4 * 1024 * 1024)
286286
}
287287

288-
fn default_http2_keepalive_interval() -> Option<Duration> {
288+
const fn default_http2_keepalive_interval() -> Option<Duration> {
289289
Some(Duration::from_secs(30))
290290
}
291291

292-
fn default_http2_keepalive_timeout() -> Option<Duration> {
292+
const fn default_http2_keepalive_timeout() -> Option<Duration> {
293293
Some(Duration::from_secs(10))
294294
}
295295

0 commit comments

Comments
 (0)