Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions opentelemetry-appender-tracing/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,17 +263,11 @@
struct ReentrantLogExporter;

impl LogExporter for ReentrantLogExporter {
#[allow(clippy::manual_async_fn)]
fn export(
&self,
_batch: LogBatch<'_>,
) -> impl std::future::Future<Output = OTelSdkResult> + Send {
async {
// This will cause a deadlock as the export itself creates a log
// while still within the lock of the SimpleLogProcessor.
warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "[email protected]");
Ok(())
}
async fn export(&self, _batch: LogBatch<'_>) -> OTelSdkResult {
// This will cause a deadlock as the export itself creates a log
// while still within the lock of the SimpleLogProcessor.
warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "[email protected]");
Ok(())

Check warning on line 270 in opentelemetry-appender-tracing/src/layer.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-appender-tracing/src/layer.rs#L266-L270

Added lines #L266 - L270 were not covered by tests
}
}

Expand Down
74 changes: 34 additions & 40 deletions opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,45 @@
use opentelemetry_sdk::logs::{LogBatch, LogExporter};

impl LogExporter for OtlpHttpClient {
#[allow(clippy::manual_async_fn)]
fn export(
&self,
batch: LogBatch<'_>,
) -> impl std::future::Future<Output = OTelSdkResult> + Send {
async move {
let client = self
.client
.lock()
.map_err(|e| OTelSdkError::InternalFailure(format!("Mutex lock failed: {}", e)))?
.clone()
.ok_or(OTelSdkError::AlreadyShutdown)?;
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
let client = self
.client
.lock()
.map_err(|e| OTelSdkError::InternalFailure(format!("Mutex lock failed: {}", e)))?
.clone()
.ok_or(OTelSdkError::AlreadyShutdown)?;

Check warning on line 14 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L8-L14

Added lines #L8 - L14 were not covered by tests

let (body, content_type) = self
.build_logs_export_body(batch)
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
let (body, content_type) = self
.build_logs_export_body(batch)
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;

Check warning on line 18 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L16-L18

Added lines #L16 - L18 were not covered by tests

let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
.header(CONTENT_TYPE, content_type)
.body(body.into())
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
.header(CONTENT_TYPE, content_type)
.body(body.into())
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;

Check warning on line 25 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L20-L25

Added lines #L20 - L25 were not covered by tests

for (k, v) in &self.headers {
request.headers_mut().insert(k.clone(), v.clone());
}
for (k, v) in &self.headers {
request.headers_mut().insert(k.clone(), v.clone());
}

Check warning on line 29 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L27-L29

Added lines #L27 - L29 were not covered by tests

let request_uri = request.uri().to_string();
otel_debug!(name: "HttpLogsClient.CallingExport");
let response = client
.send_bytes(request)
.await
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
if !response.status().is_success() {
let error = format!(
"OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}",
request_uri,
response.status().as_u16(),
response.body()
);
return Err(OTelSdkError::InternalFailure(error));
}
Ok(())
let request_uri = request.uri().to_string();
otel_debug!(name: "HttpLogsClient.CallingExport");
let response = client
.send_bytes(request)
.await
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
if !response.status().is_success() {
let error = format!(
"OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}",
request_uri,
response.status().as_u16(),
response.body()
);
return Err(OTelSdkError::InternalFailure(error));

Check warning on line 44 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L31-L44

Added lines #L31 - L44 were not covered by tests
}
Ok(())

Check warning on line 46 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L46

Added line #L46 was not covered by tests
}

fn shutdown(&mut self) -> OTelSdkResult {
Expand Down
56 changes: 25 additions & 31 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,40 +56,34 @@
}

impl LogExporter for TonicLogsClient {
#[allow(clippy::manual_async_fn)]
fn export(
&self,
batch: LogBatch<'_>,
) -> impl std::future::Future<Output = OTelSdkResult> + Send {
async move {
let (mut client, metadata, extensions) = match &self.inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.lock()
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
.call(Request::new(()))
.map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))?
.into_parts();
(inner.client.clone(), m, e)
}
None => return Err(OTelSdkError::AlreadyShutdown),
};
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
let (mut client, metadata, extensions) = match &self.inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.lock()
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
.call(Request::new(()))
.map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))?
.into_parts();
(inner.client.clone(), m, e)

Check warning on line 69 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L59-L69

Added lines #L59 - L69 were not covered by tests
}
None => return Err(OTelSdkError::AlreadyShutdown),

Check warning on line 71 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L71

Added line #L71 was not covered by tests
};

let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);
let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);

Check warning on line 74 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L74

Added line #L74 was not covered by tests

otel_debug!(name: "TonicsLogsClient.CallingExport");
otel_debug!(name: "TonicsLogsClient.CallingExport");

Check warning on line 76 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L76

Added line #L76 was not covered by tests

client
.export(Request::from_parts(
metadata,
extensions,
ExportLogsServiceRequest { resource_logs },
))
.await
.map_err(|e| OTelSdkError::InternalFailure(format!("export error: {:?}", e)))?;
Ok(())
}
client
.export(Request::from_parts(
metadata,
extensions,
ExportLogsServiceRequest { resource_logs },
))
.await
.map_err(|e| OTelSdkError::InternalFailure(format!("export error: {:?}", e)))?;
Ok(())

Check warning on line 86 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L78-L86

Added lines #L78 - L86 were not covered by tests
}

fn shutdown(&mut self) -> OTelSdkResult {
Expand Down
18 changes: 6 additions & 12 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,18 +139,12 @@
}

impl opentelemetry_sdk::logs::LogExporter for LogExporter {
#[allow(clippy::manual_async_fn)]
fn export(
&self,
batch: LogBatch<'_>,
) -> impl std::future::Future<Output = OTelSdkResult> + Send {
async move {
match &self.client {
#[cfg(feature = "grpc-tonic")]
SupportedTransportClient::Tonic(client) => client.export(batch).await,
#[cfg(any(feature = "http-proto", feature = "http-json"))]
SupportedTransportClient::Http(client) => client.export(batch).await,
}
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
match &self.client {

Check warning on line 143 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L142-L143

Added lines #L142 - L143 were not covered by tests
#[cfg(feature = "grpc-tonic")]
SupportedTransportClient::Tonic(client) => client.export(batch).await,

Check warning on line 145 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L145

Added line #L145 was not covered by tests
#[cfg(any(feature = "http-proto", feature = "http-json"))]
SupportedTransportClient::Http(client) => client.export(batch).await,

Check warning on line 147 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L147

Added line #L147 was not covered by tests
}
}

Expand Down
28 changes: 11 additions & 17 deletions opentelemetry-sdk/src/logs/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,24 +188,18 @@
}

impl LogExporter for InMemoryLogExporter {
#[allow(clippy::manual_async_fn)]
fn export(
&self,
batch: LogBatch<'_>,
) -> impl std::future::Future<Output = OTelSdkResult> + Send {
async move {
let mut logs_guard = self.logs.lock().map_err(|e| {
OTelSdkError::InternalFailure(format!("Failed to lock logs for export: {}", e))
})?;
for (log_record, instrumentation) in batch.iter() {
let owned_log = OwnedLogData {
record: (*log_record).clone(),
instrumentation: (*instrumentation).clone(),
};
logs_guard.push(owned_log);
}
Ok(())
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
let mut logs_guard = self.logs.lock().map_err(|e| {
OTelSdkError::InternalFailure(format!("Failed to lock logs for export: {}", e))

Check warning on line 193 in opentelemetry-sdk/src/logs/in_memory_exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/in_memory_exporter.rs#L193

Added line #L193 was not covered by tests
})?;
for (log_record, instrumentation) in batch.iter() {
let owned_log = OwnedLogData {
record: (*log_record).clone(),
instrumentation: (*instrumentation).clone(),
};
logs_guard.push(owned_log);
}
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
Expand Down
48 changes: 21 additions & 27 deletions opentelemetry-stdout/src/logs/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,36 +30,30 @@
}

impl opentelemetry_sdk::logs::LogExporter for LogExporter {
/// Export spans to stdout
#[allow(clippy::manual_async_fn)]
fn export(
&self,
batch: LogBatch<'_>,
) -> impl std::future::Future<Output = OTelSdkResult> + Send {
async move {
if self.is_shutdown.load(atomic::Ordering::SeqCst) {
Err(OTelSdkError::AlreadyShutdown)
/// Export logs to stdout
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
if self.is_shutdown.load(atomic::Ordering::SeqCst) {
Err(OTelSdkError::AlreadyShutdown)

Check warning on line 36 in opentelemetry-stdout/src/logs/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/logs/exporter.rs#L34-L36

Added lines #L34 - L36 were not covered by tests
} else {
println!("Logs");
if self
.resource_emitted
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
print_logs(batch);

Check warning on line 44 in opentelemetry-stdout/src/logs/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/logs/exporter.rs#L38-L44

Added lines #L38 - L44 were not covered by tests
} else {
println!("Logs");
if self
.resource_emitted
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
print_logs(batch);
} else {
println!("Resource");
if let Some(schema_url) = self.resource.schema_url() {
println!("\t Resource SchemaUrl: {:?}", schema_url);
}
self.resource.iter().for_each(|(k, v)| {
println!("\t -> {}={:?}", k, v);
});
print_logs(batch);
println!("Resource");
if let Some(schema_url) = self.resource.schema_url() {
println!("\t Resource SchemaUrl: {:?}", schema_url);

Check warning on line 48 in opentelemetry-stdout/src/logs/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/logs/exporter.rs#L46-L48

Added lines #L46 - L48 were not covered by tests
}

Ok(())
self.resource.iter().for_each(|(k, v)| {
println!("\t -> {}={:?}", k, v);
});
print_logs(batch);

Check warning on line 53 in opentelemetry-stdout/src/logs/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/logs/exporter.rs#L50-L53

Added lines #L50 - L53 were not covered by tests
}

Ok(())

Check warning on line 56 in opentelemetry-stdout/src/logs/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/logs/exporter.rs#L56

Added line #L56 was not covered by tests
}
}

Expand Down
7 changes: 2 additions & 5 deletions stress/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@ mod throughput;
struct MockLogExporter;

impl LogExporter for MockLogExporter {
fn export(
&self,
_batch: LogBatch<'_>,
) -> impl std::future::Future<Output = OTelSdkResult> + Send {
async { Ok(()) }
async fn export(&self, _batch: LogBatch<'_>) -> OTelSdkResult {
Ok(())
}
}

Expand Down
Loading