Skip to content

Commit a60122c

Browse files
authored
Merge branch 'main' into otel-prometheus-0.30
2 parents 17c7fec + 86dfd4b commit a60122c

File tree

6 files changed

+289
-117
lines changed

6 files changed

+289
-117
lines changed

opentelemetry-otlp/src/exporter/tonic/mod.rs

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ impl HasTonicConfig for TonicExporterBuilder {
361361
/// ```
362362
/// # #[cfg(all(feature = "trace", feature = "grpc-tonic"))]
363363
/// # {
364-
/// use crate::opentelemetry_otlp::{WithExportConfig, WithTonicConfig};
364+
/// use opentelemetry_otlp::{WithExportConfig, WithTonicConfig};
365365
/// let exporter_builder = opentelemetry_otlp::SpanExporter::builder()
366366
/// .with_tonic()
367367
/// .with_compression(opentelemetry_otlp::Compression::Gzip);
@@ -373,6 +373,32 @@ pub trait WithTonicConfig {
373373
fn with_tls_config(self, tls_config: ClientTlsConfig) -> Self;
374374

375375
/// Set custom metadata entries to send to the collector.
376+
///
377+
/// **Note**: This method is additive - calling it multiple times will merge
378+
/// the metadata entries. If the same key is provided in multiple calls,
379+
/// the last value will override previous ones.
380+
///
381+
/// # Example
382+
/// ```no_run
383+
/// # #[cfg(feature = "grpc-tonic")]
384+
/// # {
385+
/// use tonic::metadata::MetadataMap;
386+
/// use opentelemetry_otlp::WithTonicConfig;
387+
///
388+
/// let mut metadata1 = MetadataMap::new();
389+
/// metadata1.insert("key1", "value1".parse().unwrap());
390+
///
391+
/// let mut metadata2 = MetadataMap::new();
392+
/// metadata2.insert("key2", "value2".parse().unwrap());
393+
///
394+
/// let exporter = opentelemetry_otlp::SpanExporter::builder()
395+
/// .with_tonic()
396+
/// .with_metadata(metadata1) // Adds key1=value1
397+
/// .with_metadata(metadata2) // Adds key2=value2 (both are present)
398+
/// .build()?;
399+
/// # }
400+
/// # Ok::<(), Box<dyn std::error::Error>>(())
401+
/// ```
376402
fn with_metadata(self, metadata: MetadataMap) -> Self;
377403

378404
/// Set the compression algorithm to use when communicating with the collector.
@@ -387,8 +413,65 @@ pub trait WithTonicConfig {
387413
fn with_channel(self, channel: tonic::transport::Channel) -> Self;
388414

389415
/// Use a custom `interceptor` to modify each outbound request.
390-
/// this can be used to modify the grpc metadata, for example
416+
/// This can be used to modify the gRPC metadata, for example
391417
/// to inject auth tokens.
418+
///
419+
/// **Note**: Calling this method multiple times will replace the previous
420+
/// interceptor. If you need multiple interceptors, chain them together
421+
/// before passing to this method.
422+
///
423+
/// # Examples
424+
///
425+
/// ## Single interceptor
426+
/// ```no_run
427+
/// # #[cfg(feature = "grpc-tonic")]
428+
/// # {
429+
/// use tonic::{Request, Status};
430+
/// use opentelemetry_otlp::WithTonicConfig;
431+
///
432+
/// fn auth_interceptor(mut req: Request<()>) -> Result<Request<()>, Status> {
433+
/// req.metadata_mut().insert("authorization", "Bearer token".parse().unwrap());
434+
/// Ok(req)
435+
/// }
436+
///
437+
/// let exporter = opentelemetry_otlp::SpanExporter::builder()
438+
/// .with_tonic()
439+
/// .with_interceptor(auth_interceptor)
440+
/// .build()?;
441+
/// # }
442+
/// # Ok::<(), Box<dyn std::error::Error>>(())
443+
/// ```
444+
///
445+
/// ## Multiple interceptors (chaining)
446+
/// ```no_run
447+
/// # #[cfg(feature = "grpc-tonic")]
448+
/// # {
449+
/// use tonic::{Request, Status};
450+
/// use opentelemetry_otlp::WithTonicConfig;
451+
///
452+
/// fn auth_interceptor(mut req: Request<()>) -> Result<Request<()>, Status> {
453+
/// req.metadata_mut().insert("authorization", "Bearer token".parse().unwrap());
454+
/// Ok(req)
455+
/// }
456+
///
457+
/// fn logging_interceptor(req: Request<()>) -> Result<Request<()>, Status> {
458+
/// println!("Sending gRPC request with metadata: {:?}", req.metadata());
459+
/// Ok(req)
460+
/// }
461+
///
462+
/// // Chain interceptors by wrapping them
463+
/// fn combined_interceptor(req: Request<()>) -> Result<Request<()>, Status> {
464+
/// let req = logging_interceptor(req)?;
465+
/// auth_interceptor(req)
466+
/// }
467+
///
468+
/// let exporter = opentelemetry_otlp::SpanExporter::builder()
469+
/// .with_tonic()
470+
/// .with_interceptor(combined_interceptor)
471+
/// .build()?;
472+
/// # }
473+
/// # Ok::<(), Box<dyn std::error::Error>>(())
474+
/// ```
392475
fn with_interceptor<I>(self, interceptor: I) -> Self
393476
where
394477
I: tonic::service::Interceptor + Clone + Send + Sync + 'static;

opentelemetry-sdk/src/logs/batch_log_processor.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ impl BatchLogProcessor {
360360
logs: &mut Vec<LogsData>,
361361
last_export_time: &mut Instant,
362362
current_batch_size: &AtomicUsize,
363-
config: &BatchConfig,
363+
max_export_size: usize,
364364
) -> OTelSdkResult
365365
where
366366
E: LogExporter + Send + Sync + 'static,
@@ -373,7 +373,7 @@ impl BatchLogProcessor {
373373
// Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
374374
while let Ok(log) = logs_receiver.try_recv() {
375375
logs.push(log);
376-
if logs.len() == config.max_export_batch_size {
376+
if logs.len() == max_export_size {
377377
break;
378378
}
379379
}
@@ -409,7 +409,7 @@ impl BatchLogProcessor {
409409
&mut logs,
410410
&mut last_export_time,
411411
&current_batch_size,
412-
&config,
412+
max_export_batch_size,
413413
);
414414
}
415415
Ok(BatchMessage::ForceFlush(sender)) => {
@@ -420,7 +420,7 @@ impl BatchLogProcessor {
420420
&mut logs,
421421
&mut last_export_time,
422422
&current_batch_size,
423-
&config,
423+
max_export_batch_size,
424424
);
425425
let _ = sender.send(result);
426426
}
@@ -432,7 +432,7 @@ impl BatchLogProcessor {
432432
&mut logs,
433433
&mut last_export_time,
434434
&current_batch_size,
435-
&config,
435+
max_export_batch_size,
436436
);
437437
let _ = exporter.shutdown();
438438
let _ = sender.send(result);
@@ -460,7 +460,7 @@ impl BatchLogProcessor {
460460
&mut logs,
461461
&mut last_export_time,
462462
&current_batch_size,
463-
&config,
463+
max_export_batch_size,
464464
);
465465
}
466466
Err(RecvTimeoutError::Disconnected) => {

opentelemetry-sdk/src/logs/in_memory_exporter.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ impl InMemoryLogExporterBuilder {
132132

133133
/// If set, the records will not be [`InMemoryLogExporter::reset`] on shutdown.
134134
#[cfg(test)]
135+
#[allow(dead_code)]
135136
pub(crate) fn keep_records_on_shutdown(self) -> Self {
136137
Self {
137138
reset_on_shutdown: false,

opentelemetry-sdk/src/trace/in_memory_exporter.rs

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::error::{OTelSdkError, OTelSdkResult};
22
use crate::resource::Resource;
33
use crate::trace::{SpanData, SpanExporter};
44
use crate::InMemoryExporterError;
5-
use std::sync::{Arc, Mutex};
5+
use std::sync::{atomic::AtomicBool, Arc, Mutex};
66
use std::time::Duration;
77

88
/// An in-memory span exporter that stores span data in memory.
@@ -51,6 +51,8 @@ use std::time::Duration;
5151
pub struct InMemorySpanExporter {
5252
spans: Arc<Mutex<Vec<SpanData>>>,
5353
resource: Arc<Mutex<Resource>>,
54+
should_reset_on_shutdown: bool,
55+
shutdown_called: Arc<AtomicBool>,
5456
}
5557

5658
impl Default for InMemorySpanExporter {
@@ -67,7 +69,9 @@ impl Default for InMemorySpanExporter {
6769
/// let exporter = InMemorySpanExporterBuilder::new().build();
6870
/// ```
6971
#[derive(Clone, Debug)]
70-
pub struct InMemorySpanExporterBuilder {}
72+
pub struct InMemorySpanExporterBuilder {
73+
reset_on_shutdown: bool,
74+
}
7175

7276
impl Default for InMemorySpanExporterBuilder {
7377
fn default() -> Self {
@@ -78,19 +82,38 @@ impl Default for InMemorySpanExporterBuilder {
7882
impl InMemorySpanExporterBuilder {
7983
/// Creates a new instance of the `InMemorySpanExporterBuilder`.
8084
pub fn new() -> Self {
81-
Self {}
85+
Self {
86+
reset_on_shutdown: true,
87+
}
8288
}
8389

8490
/// Creates a new instance of the `InMemorySpanExporter`.
8591
pub fn build(&self) -> InMemorySpanExporter {
8692
InMemorySpanExporter {
8793
spans: Arc::new(Mutex::new(Vec::new())),
8894
resource: Arc::new(Mutex::new(Resource::builder().build())),
95+
should_reset_on_shutdown: self.reset_on_shutdown,
96+
shutdown_called: Arc::new(AtomicBool::new(false)),
97+
}
98+
}
99+
100+
/// If set, the records will not be [`InMemorySpanExporter::reset`] on shutdown.
101+
#[cfg(test)]
102+
#[allow(dead_code)]
103+
pub(crate) fn keep_records_on_shutdown(self) -> Self {
104+
Self {
105+
reset_on_shutdown: false,
89106
}
90107
}
91108
}
92109

93110
impl InMemorySpanExporter {
111+
/// Returns true if shutdown was called.
112+
pub fn is_shutdown_called(&self) -> bool {
113+
self.shutdown_called
114+
.load(std::sync::atomic::Ordering::Relaxed)
115+
}
116+
94117
/// Returns the finished span as a vector of `SpanData`.
95118
///
96119
/// # Errors
@@ -140,7 +163,11 @@ impl SpanExporter for InMemorySpanExporter {
140163
}
141164

142165
fn shutdown_with_timeout(&mut self, _timeout: Duration) -> OTelSdkResult {
143-
self.reset();
166+
self.shutdown_called
167+
.store(true, std::sync::atomic::Ordering::Relaxed);
168+
if self.should_reset_on_shutdown {
169+
self.reset();
170+
}
144171
Ok(())
145172
}
146173

0 commit comments

Comments
 (0)