Skip to content

Commit b15b124

Browse files
committed
fix
1 parent 28ad4b0 commit b15b124

File tree

1 file changed

+151
-7
lines changed

1 file changed

+151
-7
lines changed

opentelemetry-sdk/src/trace/span_processor.rs

Lines changed: 151 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ pub struct BatchSpanProcessorDedicatedThread {
191191
handle: Mutex<Option<thread::JoinHandle<()>>>,
192192
shutdown_timeout: Duration,
193193
is_shutdown: AtomicBool,
194-
dropped_span_count: Arc<AtomicBool>,
194+
dropped_span_count: Arc<AtomicUsize>,
195195
}
196196

197197
impl BatchSpanProcessorDedicatedThread {
@@ -261,7 +261,7 @@ impl BatchSpanProcessorDedicatedThread {
261261
handle: Mutex::new(Some(handle)),
262262
shutdown_timeout,
263263
is_shutdown: AtomicBool::new(false),
264-
dropped_span_count: Arc::new(AtomicBool::new(false)),
264+
dropped_span_count: Arc::new(AtomicUsize::new(0)),
265265
}
266266
}
267267

@@ -271,13 +271,19 @@ impl BatchSpanProcessorDedicatedThread {
271271
eprintln!("Processor is shutdown. Dropping span.");
272272
return;
273273
}
274-
if self
274+
let result = self
275275
.message_sender
276-
.try_send(BatchMessageDedicatedThread::ExportSpan(span))
277-
.is_err() && !self.dropped_span_count.load(Ordering::Relaxed) {
278-
eprintln!("Queue is full, dropping spans.");
279-
self.dropped_span_count.store(true, Ordering::Relaxed);
276+
.try_send(BatchMessageDedicatedThread::ExportSpan(span));
277+
278+
// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
279+
if result.is_err() {
280+
// Increment dropped logs count. The first time we have to drop a log,
281+
// emit a warning.
282+
if self.dropped_span_count.fetch_add(1, Ordering::Relaxed) == 0 {
283+
otel_warn!(name: "BatchSpanProcessorDedicatedThread.SpanDroppingStarted",
284+
message = "BatchSpanProcessorDedicatedThread dropped a Span due to queue full/internal errors. No further span will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
280285
}
286+
}
281287
}
282288

283289
/// Flushes all pending spans.
@@ -1285,4 +1291,142 @@ mod tests {
12851291
let shutdown_res = processor.shutdown();
12861292
assert!(shutdown_res.is_ok());
12871293
}
1294+
1295+
// Helper function to create a default test span
1296+
fn create_test_span(name: &str) -> SpanData {
1297+
SpanData {
1298+
span_context: SpanContext::empty_context(),
1299+
parent_span_id: SpanId::INVALID,
1300+
span_kind: SpanKind::Internal,
1301+
name: name.to_string().into(),
1302+
start_time: opentelemetry::time::now(),
1303+
end_time: opentelemetry::time::now(),
1304+
attributes: Vec::new(),
1305+
dropped_attributes_count: 0,
1306+
events: SpanEvents::default(),
1307+
links: SpanLinks::default(),
1308+
status: Status::Unset,
1309+
instrumentation_scope: Default::default(),
1310+
}
1311+
}
1312+
1313+
use crate::trace::BatchSpanProcessorDedicatedThread;
1314+
use futures_util::future::BoxFuture;
1315+
use futures_util::FutureExt;
1316+
use std::sync::Arc;
1317+
use std::sync::Mutex;
1318+
1319+
// Mock exporter to test functionality
1320+
#[derive(Debug)]
1321+
struct MockSpanExporter {
1322+
exported_spans: Arc<Mutex<Vec<SpanData>>>,
1323+
}
1324+
1325+
impl MockSpanExporter {
1326+
fn new() -> Self {
1327+
Self {
1328+
exported_spans: Arc::new(Mutex::new(Vec::new())),
1329+
}
1330+
}
1331+
}
1332+
1333+
impl SpanExporter for MockSpanExporter {
1334+
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
1335+
let exported_spans = self.exported_spans.clone();
1336+
async move {
1337+
exported_spans.lock().unwrap().extend(batch);
1338+
Ok(())
1339+
}
1340+
.boxed()
1341+
}
1342+
1343+
fn shutdown(&mut self) {}
1344+
}
1345+
1346+
#[test]
1347+
fn batchspanprocessor_dedicatedthread_handles_on_end() {
1348+
let exporter = MockSpanExporter::new();
1349+
let exporter_shared = exporter.exported_spans.clone();
1350+
let processor = BatchSpanProcessorDedicatedThread::new(
1351+
exporter,
1352+
10, // max queue size
1353+
Duration::from_secs(5),
1354+
Duration::from_secs(2),
1355+
);
1356+
1357+
let test_span = create_test_span("test_span");
1358+
processor.on_end(test_span.clone());
1359+
1360+
// Wait for flush interval to ensure the span is processed
1361+
std::thread::sleep(Duration::from_secs(6));
1362+
1363+
let exported_spans = exporter_shared.lock().unwrap();
1364+
assert_eq!(exported_spans.len(), 1);
1365+
assert_eq!(exported_spans[0].name, "test_span");
1366+
}
1367+
1368+
#[test]
1369+
fn batchspanprocessor_deficatedthread_force_flush() {
1370+
let exporter = MockSpanExporter::new();
1371+
let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans
1372+
let processor = BatchSpanProcessorDedicatedThread::new(
1373+
exporter,
1374+
10, // max queue size
1375+
Duration::from_secs(5),
1376+
Duration::from_secs(2),
1377+
);
1378+
1379+
// Create a test span and send it to the processor
1380+
let test_span = create_test_span("force_flush_span");
1381+
processor.on_end(test_span.clone());
1382+
1383+
// Call force_flush to immediately export the spans
1384+
let flush_result = processor.force_flush();
1385+
assert!(flush_result.is_ok(), "Force flush failed unexpectedly");
1386+
1387+
// Verify the exported spans in the mock exporter
1388+
let exported_spans = exporter_shared.lock().unwrap();
1389+
assert_eq!(
1390+
exported_spans.len(),
1391+
1,
1392+
"Unexpected number of exported spans"
1393+
);
1394+
assert_eq!(exported_spans[0].name, "force_flush_span");
1395+
}
1396+
1397+
#[test]
1398+
fn batchspanprocessor_shutdown() {
1399+
let exporter = MockSpanExporter::new();
1400+
let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans
1401+
let processor = BatchSpanProcessorDedicatedThread::new(
1402+
exporter,
1403+
10, // max queue size
1404+
Duration::from_secs(5),
1405+
Duration::from_secs(2),
1406+
);
1407+
1408+
// Create a test span and send it to the processor
1409+
let test_span = create_test_span("shutdown_span");
1410+
processor.on_end(test_span.clone());
1411+
1412+
// Call shutdown to flush and export all pending spans
1413+
let shutdown_result = processor.shutdown();
1414+
assert!(shutdown_result.is_ok(), "Shutdown failed unexpectedly");
1415+
1416+
// Verify the exported spans in the mock exporter
1417+
let exported_spans = exporter_shared.lock().unwrap();
1418+
assert_eq!(
1419+
exported_spans.len(),
1420+
1,
1421+
"Unexpected number of exported spans"
1422+
);
1423+
assert_eq!(exported_spans[0].name, "shutdown_span");
1424+
1425+
// Ensure further calls to shutdown are idempotent
1426+
let second_shutdown_result = processor.shutdown();
1427+
assert!(
1428+
second_shutdown_result.is_err(),
1429+
"Shutdown should fail when called a second time"
1430+
);
1431+
}
12881432
}

0 commit comments

Comments
 (0)