Skip to content

Commit 28ad4b0

Browse files
committed
initial commit
1 parent acf16ed commit 28ad4b0

File tree

2 files changed

+200
-0
lines changed

2 files changed

+200
-0
lines changed

opentelemetry-sdk/src/trace/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub use span::Span;
2828
pub use span_limit::SpanLimits;
2929
pub use span_processor::{
3030
BatchConfig, BatchConfigBuilder, BatchSpanProcessor, BatchSpanProcessorBuilder,
31+
BatchSpanProcessorDedicatedThread, BatchSpanProcessorDedicatedThreadBuilder,
3132
SimpleSpanProcessor, SpanProcessor,
3233
};
3334
pub use tracer::Tracer;

opentelemetry-sdk/src/trace/span_processor.rs

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ use std::sync::atomic::{AtomicUsize, Ordering};
5555
use std::sync::{Arc, Mutex};
5656
use std::{env, fmt, str::FromStr, time::Duration};
5757

58+
use std::sync::atomic::AtomicBool;
59+
use std::thread;
60+
use std::time::Instant;
61+
5862
/// Delay interval between two consecutive exports.
5963
const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY";
6064
/// Default delay interval between two consecutive exports.
@@ -166,6 +170,201 @@ impl SpanProcessor for SimpleSpanProcessor {
166170
}
167171
}
168172

173+
use futures_executor::block_on;
174+
use std::sync::mpsc::sync_channel;
175+
use std::sync::mpsc::RecvTimeoutError;
176+
use std::sync::mpsc::SyncSender;
177+
178+
/// Messages exchanged between the main thread and the background thread.
179+
#[allow(clippy::large_enum_variant)]
180+
#[derive(Debug)]
181+
enum BatchMessageDedicatedThread {
182+
ExportSpan(SpanData),
183+
ForceFlush(SyncSender<TraceResult<()>>),
184+
Shutdown(SyncSender<TraceResult<()>>),
185+
}
186+
187+
/// A batch span processor with a dedicated background thread.
188+
#[derive(Debug)]
189+
pub struct BatchSpanProcessorDedicatedThread {
190+
message_sender: SyncSender<BatchMessageDedicatedThread>,
191+
handle: Mutex<Option<thread::JoinHandle<()>>>,
192+
shutdown_timeout: Duration,
193+
is_shutdown: AtomicBool,
194+
dropped_span_count: Arc<AtomicBool>,
195+
}
196+
197+
impl BatchSpanProcessorDedicatedThread {
198+
/// Creates a new instance of `BatchSpanProcessorDedicatedThread`.
199+
pub fn new<E>(
200+
mut exporter: E,
201+
max_queue_size: usize,
202+
scheduled_delay: Duration,
203+
shutdown_timeout: Duration,
204+
) -> Self
205+
where
206+
E: SpanExporter + Send + 'static,
207+
{
208+
let (message_sender, message_receiver) = sync_channel(max_queue_size);
209+
210+
let handle = thread::Builder::new()
211+
.name("BatchSpanProcessorThread".to_string())
212+
.spawn(move || {
213+
let mut spans = Vec::new();
214+
let mut last_export_time = Instant::now();
215+
216+
loop {
217+
let timeout = scheduled_delay.saturating_sub(last_export_time.elapsed());
218+
match message_receiver.recv_timeout(timeout) {
219+
Ok(message) => match message {
220+
BatchMessageDedicatedThread::ExportSpan(span) => {
221+
spans.push(span);
222+
if spans.len() >= max_queue_size
223+
|| last_export_time.elapsed() >= scheduled_delay
224+
{
225+
if let Err(err) = block_on(exporter.export(spans.split_off(0)))
226+
{
227+
eprintln!("Export error: {:?}", err);
228+
}
229+
last_export_time = Instant::now();
230+
}
231+
}
232+
BatchMessageDedicatedThread::ForceFlush(sender) => {
233+
let result = block_on(exporter.export(spans.split_off(0)));
234+
let _ = sender.send(result);
235+
}
236+
BatchMessageDedicatedThread::Shutdown(sender) => {
237+
let result = block_on(exporter.export(spans.split_off(0)));
238+
let _ = sender.send(result);
239+
break;
240+
}
241+
},
242+
Err(RecvTimeoutError::Timeout) => {
243+
if last_export_time.elapsed() >= scheduled_delay {
244+
if let Err(err) = block_on(exporter.export(spans.split_off(0))) {
245+
eprintln!("Export error: {:?}", err);
246+
}
247+
last_export_time = Instant::now();
248+
}
249+
}
250+
Err(RecvTimeoutError::Disconnected) => {
251+
eprintln!("Channel disconnected, shutting down processor thread.");
252+
break;
253+
}
254+
}
255+
}
256+
})
257+
.expect("Failed to spawn thread");
258+
259+
Self {
260+
message_sender,
261+
handle: Mutex::new(Some(handle)),
262+
shutdown_timeout,
263+
is_shutdown: AtomicBool::new(false),
264+
dropped_span_count: Arc::new(AtomicBool::new(false)),
265+
}
266+
}
267+
268+
/// Handles span end.
269+
pub fn on_end(&self, span: SpanData) {
270+
if self.is_shutdown.load(Ordering::Relaxed) {
271+
eprintln!("Processor is shutdown. Dropping span.");
272+
return;
273+
}
274+
if self
275+
.message_sender
276+
.try_send(BatchMessageDedicatedThread::ExportSpan(span))
277+
.is_err() && !self.dropped_span_count.load(Ordering::Relaxed) {
278+
eprintln!("Queue is full, dropping spans.");
279+
self.dropped_span_count.store(true, Ordering::Relaxed);
280+
}
281+
}
282+
283+
/// Flushes all pending spans.
284+
pub fn force_flush(&self) -> TraceResult<()> {
285+
if self.is_shutdown.load(Ordering::Relaxed) {
286+
return Err(TraceError::Other("Processor already shutdown".into()));
287+
}
288+
let (sender, receiver) = sync_channel(1);
289+
self.message_sender
290+
.try_send(BatchMessageDedicatedThread::ForceFlush(sender))
291+
.map_err(|_| TraceError::Other("Failed to send ForceFlush message".into()))?;
292+
293+
receiver
294+
.recv_timeout(self.shutdown_timeout)
295+
.map_err(|_| TraceError::ExportTimedOut(self.shutdown_timeout))?
296+
}
297+
298+
/// Shuts down the processor.
299+
pub fn shutdown(&self) -> TraceResult<()> {
300+
if self.is_shutdown.swap(true, Ordering::Relaxed) {
301+
return Err(TraceError::Other("Processor already shutdown".into()));
302+
}
303+
let (sender, receiver) = sync_channel(1);
304+
self.message_sender
305+
.try_send(BatchMessageDedicatedThread::Shutdown(sender))
306+
.map_err(|_| TraceError::Other("Failed to send Shutdown message".into()))?;
307+
308+
let result = receiver
309+
.recv_timeout(self.shutdown_timeout)
310+
.map_err(|_| TraceError::ExportTimedOut(self.shutdown_timeout))?;
311+
if let Some(handle) = self.handle.lock().unwrap().take() {
312+
handle.join().expect("Failed to join thread");
313+
}
314+
result
315+
}
316+
}
317+
318+
/// Builder for `BatchSpanProcessorDedicatedThread`.
319+
#[derive(Debug, Default)]
320+
pub struct BatchSpanProcessorDedicatedThreadBuilder {
321+
max_queue_size: usize,
322+
scheduled_delay: Duration,
323+
shutdown_timeout: Duration,
324+
}
325+
326+
impl BatchSpanProcessorDedicatedThreadBuilder {
327+
/// Creates a new builder with default values.
328+
pub fn new() -> Self {
329+
Self {
330+
max_queue_size: 2048,
331+
scheduled_delay: Duration::from_secs(5),
332+
shutdown_timeout: Duration::from_secs(5),
333+
}
334+
}
335+
336+
/// Sets the maximum queue size for spans.
337+
pub fn with_max_queue_size(mut self, size: usize) -> Self {
338+
self.max_queue_size = size;
339+
self
340+
}
341+
342+
/// Sets the delay between exports.
343+
pub fn with_scheduled_delay(mut self, delay: Duration) -> Self {
344+
self.scheduled_delay = delay;
345+
self
346+
}
347+
348+
/// Sets the timeout for shutdown and flush operations.
349+
pub fn with_shutdown_timeout(mut self, timeout: Duration) -> Self {
350+
self.shutdown_timeout = timeout;
351+
self
352+
}
353+
354+
/// Builds the `BatchSpanProcessorDedicatedThread` instance.
355+
pub fn build<E>(self, exporter: E) -> BatchSpanProcessorDedicatedThread
356+
where
357+
E: SpanExporter + Send + 'static,
358+
{
359+
BatchSpanProcessorDedicatedThread::new(
360+
exporter,
361+
self.max_queue_size,
362+
self.scheduled_delay,
363+
self.shutdown_timeout,
364+
)
365+
}
366+
}
367+
169368
/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
170369
/// them at a preconfigured interval.
171370
///

0 commit comments

Comments
 (0)