Skip to content

Commit d1f3cbb

Browse files
committed
resolve conflicts
1 parent 5c8c644 commit d1f3cbb

File tree

1 file changed

+16
-11
lines changed

1 file changed

+16
-11
lines changed

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ use std::sync::mpsc::{self, RecvTimeoutError, SyncSender};
99
use opentelemetry::logs::Severity;
1010
use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope};
1111

12-
use std::sync::atomic::AtomicBool;
12+
use std::sync::atomic::Ordering;
13+
use std::sync::atomic::{AtomicBool, AtomicUsize};
1314
use std::{cmp::min, env, sync::Mutex};
1415
use std::{
1516
fmt::{self, Debug, Formatter},
@@ -318,7 +319,10 @@ impl LogProcessor for BatchLogProcessor {
318319
}
319320

320321
impl BatchLogProcessor {
321-
pub(crate) fn new(mut exporter: Box<dyn LogExporter>, config: BatchConfig) -> Self {
322+
pub(crate) fn new<E>(mut exporter: E, config: BatchConfig) -> Self
323+
where
324+
E: LogExporter + Send + Sync + 'static,
325+
{
322326
let (message_sender, message_receiver) = mpsc::sync_channel(config.max_queue_size);
323327
let max_queue_size = config.max_queue_size;
324328

@@ -346,7 +350,7 @@ impl BatchLogProcessor {
346350
{
347351
let _ = export_with_timeout_sync(
348352
remaining_time,
349-
exporter.as_mut(),
353+
&mut exporter,
350354
logs.split_off(0),
351355
&mut last_export_time,
352356
);
@@ -355,7 +359,7 @@ impl BatchLogProcessor {
355359
Ok(BatchMessage::ForceFlush(sender)) => {
356360
let result = export_with_timeout_sync(
357361
remaining_time,
358-
exporter.as_mut(),
362+
&mut exporter,
359363
logs.split_off(0),
360364
&mut last_export_time,
361365
);
@@ -364,7 +368,7 @@ impl BatchLogProcessor {
364368
Ok(BatchMessage::Shutdown(sender)) => {
365369
let result = export_with_timeout_sync(
366370
remaining_time,
367-
exporter.as_mut(),
371+
&mut exporter,
368372
logs.split_off(0),
369373
&mut last_export_time,
370374
);
@@ -381,7 +385,7 @@ impl BatchLogProcessor {
381385
Err(RecvTimeoutError::Timeout) => {
382386
let _ = export_with_timeout_sync(
383387
remaining_time,
384-
exporter.as_mut(),
388+
&mut exporter,
385389
logs.split_off(0),
386390
&mut last_export_time,
387391
);
@@ -442,7 +446,8 @@ where
442446
.iter()
443447
.map(|log_data| (&log_data.0, &log_data.1))
444448
.collect();
445-
let export = exporter.export(LogBatch::new(log_vec.as_slice()));
449+
let log_batch = LogBatch::new(log_vec.as_slice());
450+
let export = exporter.export(&log_batch);
446451
let export_result = futures_executor::block_on(export);
447452

448453
match export_result {
@@ -477,7 +482,7 @@ where
477482

478483
/// Build a batch processor
479484
pub fn build(self) -> BatchLogProcessor {
480-
BatchLogProcessor::new(Box::new(self.exporter), self.config)
485+
BatchLogProcessor::new(self.exporter, self.config)
481486
}
482487
}
483488

@@ -569,7 +574,7 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessorWithAsyncRuntime<R> {
569574

570575
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
571576
impl<R: RuntimeChannel> BatchLogProcessorWithAsyncRuntime<R> {
572-
pub(crate) fn new<E>(mut exporter: E, config: BatchConfig, runtime: R) -> Self
577+
pub(crate) fn new<E>(mut exporter: E, config: BatchConfig, runtime: R) -> Self
573578
where
574579
E: LogExporter + Send + Sync + 'static,
575580
{
@@ -1125,7 +1130,7 @@ mod tests {
11251130
let exporter = MockLogExporter {
11261131
resource: Arc::new(Mutex::new(None)),
11271132
};
1128-
let processor = BatchLogProcessor::new(exporter.clone(),, BatchConfig::default());
1133+
let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
11291134
let provider = LoggerProvider::builder()
11301135
.with_log_processor(processor)
11311136
.with_resource(
@@ -1195,7 +1200,7 @@ mod tests {
11951200
#[tokio::test(flavor = "current_thread")]
11961201
async fn test_batch_log_processor_shutdown_under_async_runtime_current_flavor_multi_thread() {
11971202
let exporter = InMemoryLogExporterBuilder::default().build();
1198-
let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default());
1203+
let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
11991204

12001205
processor.shutdown().unwrap();
12011206
}

0 commit comments

Comments
 (0)