Skip to content

Commit a2aa648

Browse files
committed
changes..
1 parent 3ed482d commit a2aa648

File tree

4 files changed

+31
-35
lines changed

4 files changed

+31
-35
lines changed

opentelemetry-sdk/src/export/logs/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,10 @@ pub trait LogExporter: Send + Sync + Debug {
8181
/// `Ok(())`) or an error (`Err(LogError)`) if the export operation failed.
8282
///
8383
fn export<'a>(
84-
&mut self,
85-
batch: LogBatch<'a>,
86-
) -> impl std::future::Future<Output = LogResult<()>> + Send;
84+
&'a mut self,
85+
batch: &'a LogBatch<'a>,
86+
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a;
87+
8788
/// Shuts down the exporter.
8889
fn shutdown(&mut self) {}
8990
#[cfg(feature = "spec_unstable_logs_enabled")]

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
108108
.map_err(|_| LogError::MutexPoisoned("SimpleLogProcessor".into()))
109109
.and_then(|mut exporter| {
110110
let log_tuple = &[(record as &LogRecord, instrumentation)];
111-
futures_executor::block_on(exporter.export(LogBatch::new(log_tuple)))
111+
let log_batch = LogBatch::new(log_tuple);
112+
futures_executor::block_on(exporter.export(&log_batch))
112113
});
113114
// Handle errors with specific static names
114115
match result {
@@ -217,7 +218,6 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
217218
{
218219
let (message_sender, message_receiver) =
219220
runtime.batch_message_channel(config.max_queue_size);
220-
//let exporter = Arc::new(Mutex::new(exporter));
221221
let inner_runtime = runtime.clone();
222222

223223
// Spawn worker process via user-defined spawn function.
@@ -340,7 +340,8 @@ where
340340
.iter()
341341
.map(|log_data| (&log_data.0, &log_data.1))
342342
.collect();
343-
let export = exporter.export(LogBatch::new(log_vec.as_slice()));
343+
let log_batch = LogBatch::new(log_vec.as_slice());
344+
let export = exporter.export(&log_batch);
344345
let timeout = runtime.delay(time_out);
345346
pin_mut!(export);
346347
pin_mut!(timeout);
@@ -567,10 +568,10 @@ mod tests {
567568
}
568569

569570
impl LogExporter for MockLogExporter {
570-
fn export(
571-
&mut self,
572-
_batch: LogBatch<'_>,
573-
) -> impl std::future::Future<Output = LogResult<()>> + Send + '_ {
571+
fn export<'a>(
572+
&'a mut self,
573+
_batch: &'a LogBatch<'a>,
574+
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a {
574575
async { Ok(()) }
575576
}
576577

@@ -1064,10 +1065,10 @@ mod tests {
10641065
}
10651066

10661067
impl LogExporter for LogExporterThatRequiresTokio {
1067-
fn export(
1068-
&mut self,
1069-
batch: LogBatch<'_>,
1070-
) -> impl std::future::Future<Output = LogResult<()>> + Send {
1068+
fn export<'a>(
1069+
&'a mut self,
1070+
batch: &'a LogBatch<'a>,
1071+
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a {
10711072
// Simulate minimal dependency on tokio by sleeping asynchronously for a short duration
10721073
async move {
10731074
tokio::time::sleep(Duration::from_millis(50)).await;

opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,10 +182,10 @@ impl InMemoryLogExporter {
182182
}
183183

184184
impl LogExporter for InMemoryLogExporter {
185-
fn export(
186-
&mut self,
187-
batch: LogBatch<'_>,
188-
) -> impl std::future::Future<Output = LogResult<()>> + Send {
185+
fn export<'a>(
186+
&'a mut self,
187+
batch: &'a LogBatch<'a>,
188+
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a {
189189
async move {
190190
let mut logs_guard = self.logs.lock().map_err(LogError::from)?;
191191
for (log_record, instrumentation) in batch.iter() {

opentelemetry-stdout/src/logs/exporter.rs

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,33 +32,27 @@ impl fmt::Debug for LogExporter {
3232

3333
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
3434
/// Export spans to stdout
35-
fn export(
36-
&mut self,
37-
batch: LogBatch<'_>,
38-
) -> impl std::future::Future<Output = LogResult<()>> + Send {
39-
let is_shutdown = self.is_shutdown.load(atomic::Ordering::SeqCst);
40-
let resource_emitted_arc = Arc::new(Mutex::new(self.resource_emitted));
41-
let resource_emitted_arc_clone = Arc::clone(&resource_emitted_arc);
42-
let resource = self.resource.clone();
35+
fn export<'a>(
36+
&'a mut self,
37+
batch: &'a LogBatch<'a>,
38+
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a {
4339
async move {
44-
if is_shutdown {
45-
Err("exporter is shut down".into())
40+
if self.is_shutdown.load(atomic::Ordering::SeqCst) {
41+
return Err("exporter is shut down".into());
4642
} else {
4743
println!("Logs");
48-
let mut resource_emitted_guard = resource_emitted_arc_clone.lock().unwrap();
49-
if *resource_emitted_guard {
44+
if self.resource_emitted {
5045
print_logs(batch);
5146
} else {
47+
self.resource_emitted = true;
5248
println!("Resource");
53-
if let Some(schema_url) = resource.schema_url() {
49+
if let Some(schema_url) = self.resource.schema_url() {
5450
println!("\t Resource SchemaUrl: {:?}", schema_url);
5551
}
56-
resource.iter().for_each(|(k, v)| {
52+
self.resource.iter().for_each(|(k, v)| {
5753
println!("\t -> {}={:?}", k, v);
5854
});
59-
6055
print_logs(batch);
61-
*resource_emitted_guard = true;
6256
}
6357

6458
Ok(())
@@ -75,7 +69,7 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
7569
}
7670
}
7771

78-
fn print_logs(batch: LogBatch<'_>) {
72+
fn print_logs(batch: &LogBatch<'_>) {
7973
for (i, log) in batch.iter().enumerate() {
8074
println!("Log #{}", i);
8175
let (record, _library) = log;

0 commit comments

Comments
 (0)