Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions opentelemetry-appender-tracing/benches/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use criterion::{criterion_group, criterion_main, Criterion};
use opentelemetry::InstrumentationScope;
use opentelemetry_appender_tracing::layer as tracing_layer;
use opentelemetry_sdk::logs::LogResult;
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
use opentelemetry_sdk::logs::{LogProcessor, SdkLogRecord, SdkLoggerProvider};
use opentelemetry_sdk::Resource;
Expand All @@ -36,8 +36,8 @@ impl LogExporter for NoopExporter {
fn export(
&self,
_batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
async { LogResult::Ok(()) }
) -> impl std::future::Future<Output = OTelSdkResult> + Send {
async { OTelSdkResult::Ok(()) }
}

fn event_enabled(&self, _: opentelemetry::logs::Severity, _: &str, _: &str) -> bool {
Expand All @@ -61,11 +61,11 @@ impl<E: LogExporter> LogProcessor for NoopProcessor<E> {
// no-op
}

fn force_flush(&self) -> LogResult<()> {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown(&self) -> LogResult<()> {
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}

Expand Down
15 changes: 8 additions & 7 deletions opentelemetry-appender-tracing/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,10 @@
use opentelemetry::trace::TracerProvider;
use opentelemetry::trace::{TraceContextExt, TraceFlags, Tracer};
use opentelemetry::{logs::AnyValue, Key};
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::logs::InMemoryLogExporter;
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
use opentelemetry_sdk::logs::{LogResult, SdkLogRecord, SdkLoggerProvider};
use opentelemetry_sdk::logs::{SdkLogRecord, SdkLoggerProvider};
use opentelemetry_sdk::trace::{Sampler, SdkTracerProvider};
use tracing::{error, warn};
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
Expand Down Expand Up @@ -269,7 +270,7 @@
fn export(
&self,
_batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
) -> impl std::future::Future<Output = OTelSdkResult> + Send {

Check warning on line 273 in opentelemetry-appender-tracing/src/layer.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-appender-tracing/src/layer.rs#L273

Added line #L273 was not covered by tests
async {
// This will cause a deadlock as the export itself creates a log
// while still within the lock of the SimpleLogProcessor.
Expand Down Expand Up @@ -345,7 +346,7 @@

// Act
error!(name: "my-event-name", target: "my-system", event_id = 20, user_name = "otel", user_email = "[email protected]");
logger_provider.force_flush();
assert!(logger_provider.force_flush().is_ok());

// Assert TODO: move to helper methods
let exported_logs = exporter
Expand Down Expand Up @@ -439,7 +440,7 @@
(trace_id, span_id)
});

logger_provider.force_flush();
assert!(logger_provider.force_flush().is_ok());

// Assert TODO: move to helper methods
let exported_logs = exporter
Expand Down Expand Up @@ -553,7 +554,7 @@
});
});

logger_provider.force_flush();
assert!(logger_provider.force_flush().is_ok());

let logs = exporter.get_emitted_logs().expect("No emitted logs");
assert_eq!(logs.len(), 2);
Expand Down Expand Up @@ -593,7 +594,7 @@

// Act
log::error!(target: "my-system", "log from log crate");
logger_provider.force_flush();
assert!(logger_provider.force_flush().is_ok());

// Assert TODO: move to helper methods
let exported_logs = exporter
Expand Down Expand Up @@ -672,7 +673,7 @@
(trace_id, span_id)
});

logger_provider.force_flush();
assert!(logger_provider.force_flush().is_ok());

// Assert TODO: move to helper methods
let exported_logs = exporter
Expand Down
47 changes: 27 additions & 20 deletions opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,67 @@
use std::sync::Arc;

use super::OtlpHttpClient;
use http::{header::CONTENT_TYPE, Method};
use opentelemetry::otel_debug;
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
use opentelemetry_sdk::logs::{LogError, LogResult};

use super::OtlpHttpClient;

impl LogExporter for OtlpHttpClient {
#[allow(clippy::manual_async_fn)]
fn export(
&self,
batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
) -> impl std::future::Future<Output = OTelSdkResult> + Send {

Check warning on line 12 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L12

Added line #L12 was not covered by tests
async move {
let client = self
.client
.lock()
.map_err(|e| LogError::Other(e.to_string().into()))
.and_then(|g| match &*g {
Some(client) => Ok(Arc::clone(client)),
_ => Err(LogError::Other("exporter is already shut down".into())),
})?;
.map_err(|e| OTelSdkError::InternalFailure(format!("Mutex lock failed: {}", e)))?
.clone()
.ok_or(OTelSdkError::AlreadyShutdown)?;

Check warning on line 19 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L17-L19

Added lines #L17 - L19 were not covered by tests

let (body, content_type) = self
.build_logs_export_body(batch)
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;

Check warning on line 23 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L21-L23

Added lines #L21 - L23 were not covered by tests

let (body, content_type) = { self.build_logs_export_body(batch)? };
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
.header(CONTENT_TYPE, content_type)
.body(body.into())
.map_err(|e| crate::Error::RequestFailed(Box::new(e)))?;
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;

Check warning on line 30 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L30

Added line #L30 was not covered by tests

for (k, v) in &self.headers {
request.headers_mut().insert(k.clone(), v.clone());
}

let request_uri = request.uri().to_string();
otel_debug!(name: "HttpLogsClient.CallingExport");
let response = client.send_bytes(request).await?;

let response = client
.send_bytes(request)
.await
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;

Check warning on line 41 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L38-L41

Added lines #L38 - L41 were not covered by tests
if !response.status().is_success() {
let error = format!(
"OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}",
response.status().as_u16(),
request_uri,
response.status().as_u16(),

Check warning on line 46 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L46

Added line #L46 was not covered by tests
response.body()
);
return Err(LogError::Other(error.into()));
return Err(OTelSdkError::InternalFailure(error));

Check warning on line 49 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L49

Added line #L49 was not covered by tests
}

Ok(())
}
}

fn shutdown(&mut self) {
let _ = self.client.lock().map(|mut c| c.take());
fn shutdown(&mut self) -> OTelSdkResult {
let mut client_guard = self.client.lock().map_err(|e| {
OTelSdkError::InternalFailure(format!("Failed to acquire client lock: {}", e))
})?;

Check warning on line 58 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L55-L58

Added lines #L55 - L58 were not covered by tests

if client_guard.take().is_none() {
return Err(OTelSdkError::AlreadyShutdown);
}

Ok(())

Check warning on line 64 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L60-L64

Added lines #L60 - L64 were not covered by tests
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
Expand Down
17 changes: 10 additions & 7 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
use opentelemetry_proto::tonic::collector::logs::v1::{
logs_service_client::LogsServiceClient, ExportLogsServiceRequest,
};
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
use opentelemetry_sdk::logs::{LogError, LogResult};
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
Expand Down Expand Up @@ -60,7 +60,7 @@
fn export(
&self,
batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
) -> impl std::future::Future<Output = OTelSdkResult> + Send {

Check warning on line 63 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L63

Added line #L63 was not covered by tests
async move {
let (mut client, metadata, extensions) = match &self.inner {
Some(inner) => {
Expand All @@ -69,11 +69,11 @@
.lock()
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
.call(Request::new(()))
.map_err(|e| LogError::Other(Box::new(e)))?
.map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))?

Check warning on line 72 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L72

Added line #L72 was not covered by tests
.into_parts();
(inner.client.clone(), m, e)
}
None => return Err(LogError::Other("exporter is already shut down".into())),
None => return Err(OTelSdkError::AlreadyShutdown),

Check warning on line 76 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L76

Added line #L76 was not covered by tests
};

let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);
Expand All @@ -87,13 +87,16 @@
ExportLogsServiceRequest { resource_logs },
))
.await
.map_err(crate::Error::from)?;
.map_err(|e| OTelSdkError::InternalFailure(format!("export error: {:?}", e)))?;

Check warning on line 90 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L90

Added line #L90 was not covered by tests
Ok(())
}
}

fn shutdown(&mut self) {
let _ = self.inner.take();
fn shutdown(&mut self) -> OTelSdkResult {
match self.inner.take() {
Some(_) => Ok(()), // Successfully took `inner`, indicating a successful shutdown.
None => Err(OTelSdkError::AlreadyShutdown), // `inner` was already `None`, meaning it's already shut down.

Check warning on line 98 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L95-L98

Added lines #L95 - L98 were not covered by tests
}
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
Expand Down
6 changes: 2 additions & 4 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
use opentelemetry::otel_debug;
use std::fmt::Debug;

use opentelemetry_sdk::logs::LogResult;

use opentelemetry_sdk::logs::LogBatch;
use opentelemetry_sdk::{error::OTelSdkResult, logs::LogBatch};

use crate::{HasExportConfig, NoExporterBuilderSet};

Expand Down Expand Up @@ -145,7 +143,7 @@
fn export(
&self,
batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
) -> impl std::future::Future<Output = OTelSdkResult> + Send {

Check warning on line 146 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L146

Added line #L146 was not covered by tests
async move {
match &self.client {
#[cfg(feature = "grpc-tonic")]
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/tests/integration_test/tests/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ async fn logs_tokio_helper(
.await
.unwrap();
} else {
let _ = logger_provider.shutdown();
assert!(logger_provider.shutdown().is_ok());
}
tokio::time::sleep(Duration::from_secs(5)).await;
assert_logs_results_contains(test_utils::LOGS_FILE, expected_uuid.as_str())?;
Expand Down Expand Up @@ -126,7 +126,7 @@ fn logs_non_tokio_helper(is_simple: bool, init_logs_inside_rt: bool) -> Result<(
);
}

let _ = logger_provider.shutdown();
assert!(logger_provider.shutdown().is_ok());
std::thread::sleep(Duration::from_secs(5));
assert_logs_results_contains(test_utils::LOGS_FILE, expected_uuid.as_str())?;
Ok(())
Expand Down
7 changes: 4 additions & 3 deletions opentelemetry-proto/src/transform/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,9 @@
use opentelemetry::logs::LoggerProvider;
use opentelemetry::time::now;
use opentelemetry::InstrumentationScope;
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::logs::LogProcessor;
use opentelemetry_sdk::logs::{LogResult, SdkLoggerProvider};
use opentelemetry_sdk::logs::SdkLoggerProvider;
use opentelemetry_sdk::{logs::LogBatch, logs::SdkLogRecord, Resource};

#[derive(Debug)]
Expand All @@ -235,11 +236,11 @@
impl LogProcessor for MockProcessor {
fn emit(&self, _record: &mut SdkLogRecord, _instrumentation: &InstrumentationScope) {}

fn force_flush(&self) -> LogResult<()> {
fn force_flush(&self) -> OTelSdkResult {

Check warning on line 239 in opentelemetry-proto/src/transform/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-proto/src/transform/logs.rs#L239

Added line #L239 was not covered by tests
Ok(())
}

fn shutdown(&self) -> LogResult<()> {
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}
}
Expand Down
21 changes: 19 additions & 2 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ let processor = BatchSpanProcessor::builder(exporter)
- The `opentelemetry_otlp` API, via `.with_tonic().with_timeout()` or `.with_http().with_timeout()`.

- **Breaking**
- The public API changes in the Tracing:
- The public API changes in the Trace SDK:
- Before:
```rust
fn SpanExporter::export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult>;
Expand All @@ -400,7 +400,7 @@ let processor = BatchSpanProcessor::builder(exporter)
fn SpanExporter::export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult>;
fn SpanExporter::shutdown(&mut self) -> OTelSdkResult;
fn SpanExporter::force_flush(&mut self) -> BoxFuture<'static, OTelSdkResult>
fn TraerProvider::shutdown(&self) -> OTelSdkResult;
fn TracerProvider::shutdown(&self) -> OTelSdkResult;
fn TracerProvider::force_flush(&self) -> OTelSdkResult;
```
- **Breaking** Renamed `LoggerProvider`, `Logger` and `LogRecord' to
Expand All @@ -413,6 +413,23 @@ let processor = BatchSpanProcessor::builder(exporter)
type-aliased to `SdkTracer` to keep back-compat with tracing-opentelemetry.
[#2614](https://github.com/open-telemetry/opentelemetry-rust/pull/2614)

- **Breaking**
- The public API changes in the Logs SDK:
- Before:
```rust
fn LogExporter::export(&self, _batch: LogBatch<'_>,) -> impl std::future::Future<Output = LogResult<()>> + Send
fn LogExporter::shutdown(&mut self);
fn LoggerProvider::shutdown(&self) -> LogResult<()>
fn LoggerProvider::force_flush(&self) -> Vec<LogResult<()>>
```
- After:
```rust
fn LogExporter::export(&self, _batch: LogBatch<'_>,) -> impl std::future::Future<Output = OTelSdkResult<()>> + Send
fn LogExporter::shutdown(&mut self) -> OTelSdkResult;
fn LoggerProvider::shutdown(&self) -> OTelSdkResult;
fn LoggerProvider::force_flush(&self) -> OTelSdkResult;
```

## 0.27.1

Released 2024-Nov-27
Expand Down
9 changes: 4 additions & 5 deletions opentelemetry-sdk/benches/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ use opentelemetry::logs::{AnyValue, LogRecord as _, Logger, LoggerProvider, Seve
use opentelemetry::trace::Tracer;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{InstrumentationScope, Key};
use opentelemetry_sdk::logs::{
LogProcessor, LogResult, SdkLogRecord, SdkLogger, SdkLoggerProvider,
};
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::logs::{LogProcessor, SdkLogRecord, SdkLogger, SdkLoggerProvider};
use opentelemetry_sdk::trace::{Sampler, SdkTracerProvider};

#[derive(Debug)]
Expand All @@ -35,11 +34,11 @@ struct NoopProcessor;
impl LogProcessor for NoopProcessor {
fn emit(&self, _data: &mut SdkLogRecord, _scope: &InstrumentationScope) {}

fn force_flush(&self) -> LogResult<()> {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown(&self) -> LogResult<()> {
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}
}
Expand Down
Loading