Skip to content

Commit c18c006

Browse files
committed
add force_flush_with_timeout to processors and exporters
1 parent b64f632 commit c18c006

File tree

10 files changed

+55
-38
lines changed

10 files changed

+55
-38
lines changed

examples/tracing-http-propagator/src/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ impl LogProcessor for EnrichWithBaggageLogProcessor {
132132
#[derive(Debug)]
133133
struct EnrichWithBaggageSpanProcessor;
134134
impl SpanProcessor for EnrichWithBaggageSpanProcessor {
135-
fn force_flush(&self) -> OTelSdkResult {
135+
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
136136
Ok(())
137137
}
138138

opentelemetry-sdk/src/logs/batch_log_processor.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ pub struct BatchLogProcessor {
129129
logs_sender: SyncSender<LogsData>, // Data channel to store log records and instrumentation scopes
130130
message_sender: SyncSender<BatchMessage>, // Control channel to store control messages for the worker thread
131131
handle: Mutex<Option<thread::JoinHandle<()>>>,
132-
forceflush_timeout: Duration,
133132
export_log_message_sent: Arc<AtomicBool>,
134133
current_batch_size: Arc<AtomicUsize>,
135134
max_export_batch_size: usize,
@@ -221,21 +220,19 @@ impl LogProcessor for BatchLogProcessor {
221220
}
222221
}
223222

224-
fn force_flush(&self) -> OTelSdkResult {
223+
fn force_flush_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
225224
let (sender, receiver) = mpsc::sync_channel(1);
226225
match self
227226
.message_sender
228227
.try_send(BatchMessage::ForceFlush(sender))
229228
{
230-
Ok(_) => receiver
231-
.recv_timeout(self.forceflush_timeout)
232-
.map_err(|err| {
233-
if err == RecvTimeoutError::Timeout {
234-
OTelSdkError::Timeout(self.forceflush_timeout)
235-
} else {
236-
OTelSdkError::InternalFailure(format!("{err}"))
237-
}
238-
})?,
229+
Ok(_) => receiver.recv_timeout(timeout).map_err(|err| {
230+
if err == RecvTimeoutError::Timeout {
231+
OTelSdkError::Timeout(timeout)
232+
} else {
233+
OTelSdkError::InternalFailure(format!("{err}"))
234+
}
235+
})?,
239236
Err(mpsc::TrySendError::Full(_)) => {
240237
// If the control message could not be sent, emit a warning.
241238
otel_debug!(
@@ -489,7 +486,6 @@ impl BatchLogProcessor {
489486
logs_sender,
490487
message_sender,
491488
handle: Mutex::new(Some(handle)),
492-
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
493489
dropped_logs_count: AtomicUsize::new(0),
494490
max_queue_size,
495491
export_log_message_sent: Arc::new(AtomicBool::new(false)),

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,13 @@ pub trait LogProcessor: Send + Sync + Debug {
5353
/// - `instrumentation`: The instrumentation scope associated with the log record.
5454
fn emit(&self, data: &mut SdkLogRecord, instrumentation: &InstrumentationScope);
5555
/// Force the logs lying in the cache to be exported.
56-
fn force_flush(&self) -> OTelSdkResult;
56+
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
57+
Ok(())
58+
}
59+
/// Force the logs lying in the cache to be exported with default timeout.
60+
fn force_flush(&self) -> OTelSdkResult {
61+
self.force_flush_with_timeout(Duration::from_secs(5))
62+
}
5763
/// Shuts down the processor.
5864
/// After shutdown returns the log processor should stop processing any logs.
5965
/// It's up to the implementation on when to drop the LogProcessor.

opentelemetry-sdk/src/logs/logger_provider.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,11 @@ impl SdkLoggerProvider {
8383
}
8484

8585
/// Force flush all remaining logs in log processors and return results.
86-
pub fn force_flush(&self) -> OTelSdkResult {
86+
pub fn force_flush_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
8787
let result: Vec<_> = self
8888
.log_processors()
8989
.iter()
90-
.map(|processor| processor.force_flush())
90+
.map(|processor| processor.force_flush_with_timeout(timeout))
9191
.collect();
9292
if result.iter().all(|r| r.is_ok()) {
9393
Ok(())
@@ -96,6 +96,11 @@ impl SdkLoggerProvider {
9696
}
9797
}
9898

99+
/// Force flush all remaining logs with default timeout.
100+
pub fn force_flush(&self) -> OTelSdkResult {
101+
self.force_flush_with_timeout(Duration::from_secs(5))
102+
}
103+
99104
/// Shuts down this `LoggerProvider`
100105
pub fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
101106
otel_debug!(

opentelemetry-sdk/src/trace/export.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,15 @@ pub trait SpanExporter: Send + Sync + Debug {
6666
/// implemented as a blocking API or an asynchronous API which notifies the caller via
6767
/// a callback or an event. OpenTelemetry client authors can decide if they want to
6868
/// make the flush timeout configurable.
69-
fn force_flush(&mut self) -> OTelSdkResult {
69+
fn force_flush_with_timeout(&mut self, _timeout: Duration) -> OTelSdkResult {
7070
Ok(())
7171
}
7272

73+
/// Force flush the exporter with default timeout.
74+
fn force_flush(&mut self) -> OTelSdkResult {
75+
self.force_flush_with_timeout(Duration::from_secs(5))
76+
}
77+
7378
/// Set the resource for the exporter.
7479
fn set_resource(&mut self, _resource: &Resource) {}
7580
}

opentelemetry-sdk/src/trace/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ mod tests {
143143
// let _c = Context::current();
144144
}
145145

146-
fn force_flush(&self) -> crate::error::OTelSdkResult {
146+
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
147147
Ok(())
148148
}
149149

opentelemetry-sdk/src/trace/provider.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ impl SdkTracerProvider {
194194
self.inner.is_shutdown.load(Ordering::Relaxed)
195195
}
196196

197-
/// Force flush all remaining spans in span processors and return results.
197+
/// Force flush all remaining spans in span processors with a default timeout and return results.
198198
///
199199
/// # Examples
200200
///
@@ -228,10 +228,15 @@ impl SdkTracerProvider {
228228
/// }
229229
/// ```
230230
pub fn force_flush(&self) -> OTelSdkResult {
231+
self.force_flush_with_timeout(Duration::from_secs(5))
232+
}
233+
234+
/// force flush processors with a specified timeout
235+
pub fn force_flush_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
231236
let result: Vec<_> = self
232237
.span_processors()
233238
.iter()
234-
.map(|processor| processor.force_flush())
239+
.map(|processor| processor.force_flush_with_timeout(timeout))
235240
.collect();
236241
if result.iter().all(|r| r.is_ok()) {
237242
Ok(())
@@ -530,7 +535,7 @@ mod tests {
530535
// ignore
531536
}
532537

533-
fn force_flush(&self) -> OTelSdkResult {
538+
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
534539
if self.success {
535540
Ok(())
536541
} else {
@@ -793,7 +798,7 @@ mod tests {
793798
// No operation needed for this processor
794799
}
795800

796-
fn force_flush(&self) -> OTelSdkResult {
801+
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
797802
Ok(())
798803
}
799804

opentelemetry-sdk/src/trace/span_processor.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,11 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
8585
/// TODO - This method should take reference to `SpanData`
8686
fn on_end(&self, span: SpanData);
8787
/// Force the spans lying in the cache to be exported.
88-
fn force_flush(&self) -> OTelSdkResult;
88+
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult;
89+
/// Force flush the spans with a default timeout.
90+
fn force_flush(&self) -> OTelSdkResult {
91+
self.force_flush_with_timeout(Duration::from_secs(5))
92+
}
8993
/// Shuts down the processor. Called when SDK is shut down. This is an
9094
/// opportunity for processors to do any cleanup required.
9195
///
@@ -153,7 +157,7 @@ impl<T: SpanExporter> SpanProcessor for SimpleSpanProcessor<T> {
153157
}
154158
}
155159

156-
fn force_flush(&self) -> OTelSdkResult {
160+
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
157161
// Nothing to flush for simple span processor.
158162
Ok(())
159163
}
@@ -286,7 +290,6 @@ pub struct BatchSpanProcessor {
286290
span_sender: SyncSender<SpanData>, // Data channel to store spans
287291
message_sender: SyncSender<BatchMessage>, // Control channel to store control messages.
288292
handle: Mutex<Option<thread::JoinHandle<()>>>,
289-
forceflush_timeout: Duration,
290293
export_span_message_sent: Arc<AtomicBool>,
291294
current_batch_size: Arc<AtomicUsize>,
292295
max_export_batch_size: usize,
@@ -424,7 +427,6 @@ impl BatchSpanProcessor {
424427
span_sender,
425428
message_sender,
426429
handle: Mutex::new(Some(handle)),
427-
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
428430
dropped_spans_count: AtomicUsize::new(0),
429431
max_queue_size,
430432
export_span_message_sent: Arc::new(AtomicBool::new(false)),
@@ -593,21 +595,19 @@ impl SpanProcessor for BatchSpanProcessor {
593595
}
594596

595597
/// Flushes all pending spans.
596-
fn force_flush(&self) -> OTelSdkResult {
598+
fn force_flush_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
597599
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
598600
match self
599601
.message_sender
600602
.try_send(BatchMessage::ForceFlush(sender))
601603
{
602-
Ok(_) => receiver
603-
.recv_timeout(self.forceflush_timeout)
604-
.map_err(|err| {
605-
if err == std::sync::mpsc::RecvTimeoutError::Timeout {
606-
OTelSdkError::Timeout(self.forceflush_timeout)
607-
} else {
608-
OTelSdkError::InternalFailure(format!("{err}"))
609-
}
610-
})?,
604+
Ok(_) => receiver.recv_timeout(timeout).map_err(|err| {
605+
if err == std::sync::mpsc::RecvTimeoutError::Timeout {
606+
OTelSdkError::Timeout(timeout)
607+
} else {
608+
OTelSdkError::InternalFailure(format!("{err}"))
609+
}
610+
})?,
611611
Err(std::sync::mpsc::TrySendError::Full(_)) => {
612612
// If the control message could not be sent, emit a warning.
613613
otel_debug!(

opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R> {
123123
}
124124
}
125125

126-
fn force_flush(&self) -> OTelSdkResult {
126+
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
127127
let (res_sender, res_receiver) = oneshot::channel();
128128
self.message_sender
129129
.try_send(BatchMessage::Flush(Some(res_sender)))

stress/src/traces.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ impl SpanProcessor for NoOpSpanProcessor {
4141
// No-op
4242
}
4343

44-
fn force_flush(&self) -> OTelSdkResult {
44+
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
4545
Ok(())
4646
}
4747

0 commit comments

Comments
 (0)