Skip to content
Merged
4 changes: 4 additions & 0 deletions opentelemetry-otlp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

- The `OTEL_EXPORTER_OTLP_TIMEOUT`, `OTEL_EXPORTER_OTLP_TRACES_TIMEOUT`, `OTEL_EXPORTER_OTLP_METRICS_TIMEOUT` and `OTEL_EXPORTER_OTLP_LOGS_TIMEOUT` are changed from seconds to miliseconds.
- Fixed `.with_headers()` in `HttpExporterBuilder` to correctly support multiple key/value pairs. [#2699](https://github.com/open-telemetry/opentelemetry-rust/pull/2699)
- Fixed
[#2770](https://github.com/open-telemetry/opentelemetry-rust/issues/2770)
partially to properly handle `shutdown()` when using `http`. (`tonic` still
does not do proper shutdown)

## 0.28.0

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {

Check warning on line 49 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L49

Added line #L49 was not covered by tests
let mut client_guard = self.client.lock().map_err(|e| {
OTelSdkError::InternalFailure(format!("Failed to acquire client lock: {}", e))
})?;
Expand Down
30 changes: 16 additions & 14 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@
};
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
use tokio::sync::Mutex;
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;

use super::BoxInterceptor;
use tokio::sync::Mutex;

pub(crate) struct TonicLogsClient {
inner: Option<ClientInner>,
inner: Mutex<Option<ClientInner>>,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
}

struct ClientInner {
client: LogsServiceClient<Channel>,
interceptor: Mutex<BoxInterceptor>,
interceptor: BoxInterceptor,
}

impl fmt::Debug for TonicLogsClient {
Expand All @@ -46,23 +46,21 @@
otel_debug!(name: "TonicsLogsClientBuilt");

TonicLogsClient {
inner: Some(ClientInner {
inner: Mutex::new(Some(ClientInner {

Check warning on line 49 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L49

Added line #L49 was not covered by tests
client,
interceptor: Mutex::new(interceptor),
}),
interceptor,
})),

Check warning on line 52 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L51-L52

Added lines #L51 - L52 were not covered by tests
resource: Default::default(),
}
}
}

impl LogExporter for TonicLogsClient {
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
let (mut client, metadata, extensions) = match &self.inner {
let (mut client, metadata, extensions) = match self.inner.lock().await.as_mut() {

Check warning on line 60 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L60

Added line #L60 was not covered by tests
Some(inner) => {
let (m, e, _) = inner
.interceptor
.lock()
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
.call(Request::new(()))
.map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))?
.into_parts();
Expand All @@ -86,11 +84,15 @@
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
match self.inner.take() {
Some(_) => Ok(()), // Successfully took `inner`, indicating a successful shutdown.
None => Err(OTelSdkError::AlreadyShutdown), // `inner` was already `None`, meaning it's already shut down.
}
fn shutdown(&self) -> OTelSdkResult {
// TODO: Implement actual shutdown
// Due to the use to tokio::sync::Mutex to guard
// the inner client, we need to lock the mutex
// and that requires async runtime.
// It is possible to fix this by using
// a dedicated thread just to handle shutdown.
// But for now, we just return Ok.
Ok(())

Check warning on line 95 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L87-L95

Added lines #L87 - L95 were not covered by tests
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
Expand Down
9 changes: 9 additions & 0 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,13 @@
SupportedTransportClient::Http(client) => client.set_resource(resource),
}
}

fn shutdown(&self) -> OTelSdkResult {
match &self.client {

Check warning on line 161 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L160-L161

Added lines #L160 - L161 were not covered by tests
#[cfg(feature = "grpc-tonic")]
SupportedTransportClient::Tonic(client) => client.shutdown(),

Check warning on line 163 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L163

Added line #L163 was not covered by tests
#[cfg(any(feature = "http-proto", feature = "http-json"))]
SupportedTransportClient::Http(client) => client.shutdown(),

Check warning on line 165 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L165

Added line #L165 was not covered by tests
}
}

Check warning on line 167 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L167

Added line #L167 was not covered by tests
}
4 changes: 4 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@
`LogProcessor` and `LogExporter` traits. `SdkLogger` no longer passes its
`scope` name but instead passes the incoming `name` when invoking
`event_enabled` on processors.
- **Breaking** for custom LogExporter authors: `shutdown()` method in
`LogExporter` trait no longer requires a mutable ref to self. If the exporter
needs to mutate state, it should rely on interior mutability.
[2764](https://github.com/open-telemetry/opentelemetry-rust/pull/2764)

## 0.28.0

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@
) -> impl std::future::Future<Output = OTelSdkResult> + Send;

/// Shuts down the exporter.
fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {

Check warning on line 139 in opentelemetry-sdk/src/logs/export.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/export.rs#L139

Added line #L139 was not covered by tests
Ok(())
}
#[cfg(feature = "spec_unstable_logs_enabled")]
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl LogExporter for InMemoryLogExporter {
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
self.shutdown_called
.store(true, std::sync::atomic::Ordering::Relaxed);
if self.should_reset_on_shutdown {
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub(crate) mod tests {
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ mod tests {
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/simple_log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
fn shutdown(&self) -> OTelSdkResult {
self.is_shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
if let Ok(mut exporter) = self.exporter.lock() {
if let Ok(exporter) = self.exporter.lock() {
exporter.shutdown()
} else {
Err(OTelSdkError::InternalFailure(
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-stdout/src/logs/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
}
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {

Check warning on line 60 in opentelemetry-stdout/src/logs/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/logs/exporter.rs#L60

Added line #L60 was not covered by tests
self.is_shutdown.store(true, atomic::Ordering::SeqCst);
Ok(())
}
Expand Down