Skip to content
Merged
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {

Check warning on line 49 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L49

Added line #L49 was not covered by tests
let mut client_guard = self.client.lock().map_err(|e| {
OTelSdkError::InternalFailure(format!("Failed to acquire client lock: {}", e))
})?;
Expand Down
24 changes: 13 additions & 11 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@
};
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
use tokio::sync::Mutex;
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;

use super::BoxInterceptor;
use tokio::sync::Mutex;

pub(crate) struct TonicLogsClient {
inner: Option<ClientInner>,
inner: Mutex<Option<ClientInner>>,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
}

struct ClientInner {
client: LogsServiceClient<Channel>,
interceptor: Mutex<BoxInterceptor>,
interceptor: BoxInterceptor,
}

impl fmt::Debug for TonicLogsClient {
Expand All @@ -46,23 +46,21 @@
otel_debug!(name: "TonicsLogsClientBuilt");

TonicLogsClient {
inner: Some(ClientInner {
inner: Mutex::new(Some(ClientInner {

Check warning on line 49 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L49

Added line #L49 was not covered by tests
client,
interceptor: Mutex::new(interceptor),
}),
interceptor,
})),

Check warning on line 52 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L51-L52

Added lines #L51 - L52 were not covered by tests
resource: Default::default(),
}
}
}

impl LogExporter for TonicLogsClient {
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
let (mut client, metadata, extensions) = match &self.inner {
let (mut client, metadata, extensions) = match self.inner.lock().await.as_mut() {

Check warning on line 60 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L60

Added line #L60 was not covered by tests
Some(inner) => {
let (m, e, _) = inner
.interceptor
.lock()
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
.call(Request::new(()))
.map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))?
.into_parts();
Expand All @@ -86,8 +84,12 @@
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
match self.inner.take() {
fn shutdown(&self) -> OTelSdkResult {
let handle = tokio::runtime::Handle::try_current()
.unwrap_or_else(|_| tokio::runtime::Runtime::new().unwrap().handle().clone());

let mut inner = handle.block_on(self.inner.lock());
match inner.take() {

Check warning on line 92 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L87-L92

Added lines #L87 - L92 were not covered by tests
Some(_) => Ok(()), // Successfully took `inner`, indicating a successful shutdown.
None => Err(OTelSdkError::AlreadyShutdown), // `inner` was already `None`, meaning it's already shut down.
}
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@
) -> impl std::future::Future<Output = OTelSdkResult> + Send;

/// Shuts down the exporter.
fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {

Check warning on line 139 in opentelemetry-sdk/src/logs/export.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/export.rs#L139

Added line #L139 was not covered by tests
Ok(())
}
#[cfg(feature = "spec_unstable_logs_enabled")]
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl LogExporter for InMemoryLogExporter {
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
self.shutdown_called
.store(true, std::sync::atomic::Ordering::Relaxed);
if self.should_reset_on_shutdown {
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub(crate) mod tests {
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ mod tests {
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/simple_log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
fn shutdown(&self) -> OTelSdkResult {
self.is_shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
if let Ok(mut exporter) = self.exporter.lock() {
if let Ok(exporter) = self.exporter.lock() {
exporter.shutdown()
} else {
Err(OTelSdkError::InternalFailure(
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-stdout/src/logs/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
}
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {

Check warning on line 60 in opentelemetry-stdout/src/logs/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/logs/exporter.rs#L60

Added line #L60 was not covered by tests
self.is_shutdown.store(true, atomic::Ordering::SeqCst);
Ok(())
}
Expand Down