Skip to content

Commit 3ed482d

Browse files
committed
futher changes..
1 parent d454536 commit 3ed482d

File tree

6 files changed

+47
-37
lines changed

6 files changed

+47
-37
lines changed

opentelemetry-sdk/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ pprof = { version = "0.13", features = ["flamegraph", "criterion"] }
4444
default = ["trace", "metrics", "logs", "internal-logs"]
4545
trace = ["opentelemetry/trace", "rand", "percent-encoding"]
4646
jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url"]
47-
logs = ["opentelemetry/logs", "async-trait", "serde_json"]
47+
logs = ["opentelemetry/logs", "serde_json"]
4848
spec_unstable_logs_enabled = ["logs", "opentelemetry/spec_unstable_logs_enabled"]
4949
metrics = ["opentelemetry/metrics", "glob", "async-trait"]
5050
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"]

opentelemetry-sdk/src/logs/log_emitter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ impl Builder {
188188
/// The `LogExporter` that this provider should use.
189189
pub fn with_simple_exporter<T: LogExporter + 'static>(self, exporter: T) -> Self {
190190
let mut processors = self.processors;
191-
processors.push(Box::new(SimpleLogProcessor::new(Box::new(exporter))));
191+
processors.push(Box::new(SimpleLogProcessor::new(exporter)));
192192

193193
Builder { processors, ..self }
194194
}

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,6 @@ mod tests {
552552
testing::logs::InMemoryLogExporter,
553553
Resource,
554554
};
555-
use async_trait::async_trait;
556555
use opentelemetry::logs::AnyValue;
557556
use opentelemetry::logs::LogRecord as _;
558557
use opentelemetry::logs::{Logger, LoggerProvider as _};
@@ -567,12 +566,11 @@ mod tests {
567566
resource: Arc<Mutex<Option<Resource>>>,
568567
}
569568

570-
#[async_trait]
571569
impl LogExporter for MockLogExporter {
572570
fn export(
573571
&mut self,
574572
_batch: LogBatch<'_>,
575-
) -> impl std::future::Future<Output = LogResult<()>> + Send {
573+
) -> impl std::future::Future<Output = LogResult<()>> + Send + '_ {
576574
async { Ok(()) }
577575
}
578576

@@ -1071,12 +1069,11 @@ mod tests {
10711069
batch: LogBatch<'_>,
10721070
) -> impl std::future::Future<Output = LogResult<()>> + Send {
10731071
// Simulate minimal dependency on tokio by sleeping asynchronously for a short duration
1074-
let export_count = Arc::clone(&self.export_count);
10751072
async move {
10761073
tokio::time::sleep(Duration::from_millis(50)).await;
10771074

10781075
for _ in batch.iter() {
1079-
export_count.fetch_add(1, Ordering::Acquire);
1076+
self.export_count.fetch_add(1, Ordering::Acquire);
10801077
}
10811078
Ok(())
10821079
}

opentelemetry-sdk/src/logs/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ mod tests {
4949
let exporter: InMemoryLogExporter = InMemoryLogExporter::default();
5050
let logger_provider = LoggerProvider::builder()
5151
.with_resource(resource.clone())
52-
.with_log_processor(SimpleLogProcessor::new(Box::new(exporter.clone())))
52+
.with_log_processor(SimpleLogProcessor::new(exporter.clone()))
5353
.build();
5454

5555
// Act

opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use crate::export::logs::{LogBatch, LogExporter};
22
use crate::logs::LogRecord;
33
use crate::logs::{LogError, LogResult};
44
use crate::Resource;
5-
use async_trait::async_trait;
65
use opentelemetry::InstrumentationScope;
76
use std::borrow::Cow;
87
use std::sync::{Arc, Mutex};
@@ -182,18 +181,22 @@ impl InMemoryLogExporter {
182181
}
183182
}
184183

185-
#[async_trait]
186184
impl LogExporter for InMemoryLogExporter {
187-
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
188-
let mut logs_guard = self.logs.lock().map_err(LogError::from)?;
189-
for (log_record, instrumentation) in batch.iter() {
190-
let owned_log = OwnedLogData {
191-
record: (*log_record).clone(),
192-
instrumentation: (*instrumentation).clone(),
193-
};
194-
logs_guard.push(owned_log);
185+
fn export(
186+
&mut self,
187+
batch: LogBatch<'_>,
188+
) -> impl std::future::Future<Output = LogResult<()>> + Send {
189+
async move {
190+
let mut logs_guard = self.logs.lock().map_err(LogError::from)?;
191+
for (log_record, instrumentation) in batch.iter() {
192+
let owned_log = OwnedLogData {
193+
record: (*log_record).clone(),
194+
instrumentation: (*instrumentation).clone(),
195+
};
196+
logs_guard.push(owned_log);
197+
}
198+
Ok(())
195199
}
196-
Ok(())
197200
}
198201

199202
fn shutdown(&mut self) {

opentelemetry-stdout/src/logs/exporter.rs

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use opentelemetry_sdk::export::logs::LogBatch;
55
use opentelemetry_sdk::logs::LogResult;
66
use opentelemetry_sdk::Resource;
77
use std::sync::atomic;
8+
use std::sync::{Arc, Mutex};
89

910
/// An OpenTelemetry exporter that writes Logs to stdout on export.
1011
pub struct LogExporter {
@@ -29,30 +30,39 @@ impl fmt::Debug for LogExporter {
2930
}
3031
}
3132

32-
#[async_trait]
3333
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
3434
/// Export spans to stdout
35-
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
36-
if self.is_shutdown.load(atomic::Ordering::SeqCst) {
37-
return Err("exporter is shut down".into());
38-
} else {
39-
println!("Logs");
40-
if self.resource_emitted {
41-
print_logs(batch);
35+
fn export(
36+
&mut self,
37+
batch: LogBatch<'_>,
38+
) -> impl std::future::Future<Output = LogResult<()>> + Send {
39+
let is_shutdown = self.is_shutdown.load(atomic::Ordering::SeqCst);
40+
let resource_emitted_arc = Arc::new(Mutex::new(self.resource_emitted));
41+
let resource_emitted_arc_clone = Arc::clone(&resource_emitted_arc);
42+
let resource = self.resource.clone();
43+
async move {
44+
if is_shutdown {
45+
Err("exporter is shut down".into())
4246
} else {
43-
self.resource_emitted = true;
44-
println!("Resource");
45-
if let Some(schema_url) = self.resource.schema_url() {
46-
println!("\t Resource SchemaUrl: {:?}", schema_url);
47+
println!("Logs");
48+
let mut resource_emitted_guard = resource_emitted_arc_clone.lock().unwrap();
49+
if *resource_emitted_guard {
50+
print_logs(batch);
51+
} else {
52+
println!("Resource");
53+
if let Some(schema_url) = resource.schema_url() {
54+
println!("\t Resource SchemaUrl: {:?}", schema_url);
55+
}
56+
resource.iter().for_each(|(k, v)| {
57+
println!("\t -> {}={:?}", k, v);
58+
});
59+
60+
print_logs(batch);
61+
*resource_emitted_guard = true;
4762
}
48-
self.resource.iter().for_each(|(k, v)| {
49-
println!("\t -> {}={:?}", k, v);
50-
});
5163

52-
print_logs(batch);
64+
Ok(())
5365
}
54-
55-
Ok(())
5666
}
5767
}
5868

0 commit comments

Comments
 (0)