Skip to content

Commit 4400ccf

Browse files
authored
Merge branch 'main' into drewrelmas/split-log-processor-impls
2 parents 6327115 + 00ebc98 commit 4400ccf

File tree

10 files changed

+55
-62
lines changed

10 files changed

+55
-62
lines changed

opentelemetry-otlp/src/metric.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,6 @@ impl Debug for MetricExporter {
139139
}
140140
}
141141

142-
#[async_trait]
143142
impl PushMetricExporter for MetricExporter {
144143
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
145144
self.client.export(metrics).await

opentelemetry-otlp/tests/integration_test/src/metric_helpers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::test_utils;
33
use anyhow::Result;
44
use anyhow::{Context, Ok};
55
use opentelemetry_otlp::MetricExporter;
6-
use opentelemetry_sdk::metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider};
6+
use opentelemetry_sdk::metrics::{MeterProviderBuilder, SdkMeterProvider};
77
use opentelemetry_sdk::Resource;
88
use serde_json::Value;
99
use std::fs;

opentelemetry-sdk/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ rust-version = "1.75.0"
1313
opentelemetry = { version = "0.28", path = "../opentelemetry/" }
1414
opentelemetry-http = { version = "0.28", path = "../opentelemetry-http", optional = true }
1515
async-std = { workspace = true, features = ["unstable"], optional = true }
16-
async-trait = { workspace = true, optional = true }
1716
futures-channel = "0.3"
1817
futures-executor = { workspace = true }
1918
futures-util = { workspace = true, features = ["std", "sink", "async-await-macro"] }
@@ -47,7 +46,7 @@ trace = ["opentelemetry/trace", "rand", "percent-encoding"]
4746
jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url"]
4847
logs = ["opentelemetry/logs", "serde_json"]
4948
spec_unstable_logs_enabled = ["logs", "opentelemetry/spec_unstable_logs_enabled"]
50-
metrics = ["opentelemetry/metrics", "glob", "async-trait"]
49+
metrics = ["opentelemetry/metrics", "glob"]
5150
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"]
5251
experimental_async_runtime = []
5352
rt-tokio = ["tokio", "tokio-stream", "experimental_async_runtime"]

opentelemetry-sdk/benches/log_exporter.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use opentelemetry::time::now;
1414
use opentelemetry_sdk::error::OTelSdkResult;
1515
use std::sync::Mutex;
1616

17-
use async_trait::async_trait;
1817
use criterion::{criterion_group, criterion_main, Criterion};
1918

2019
use opentelemetry::logs::{LogRecord as _, Logger, LoggerProvider, Severity};
@@ -29,9 +28,8 @@ use std::fmt::Debug;
2928

3029
// Run this benchmark with:
3130
// cargo bench --bench log_exporter
32-
#[async_trait]
3331
pub trait LogExporterWithFuture: Send + Sync + Debug {
34-
async fn export(&mut self, batch: LogBatch<'_>);
32+
fn export(&mut self, batch: LogBatch<'_>) -> impl std::future::Future<Output = ()> + Send;
3533
}
3634

3735
pub trait LogExporterWithoutFuture: Send + Sync + Debug {
@@ -41,7 +39,6 @@ pub trait LogExporterWithoutFuture: Send + Sync + Debug {
4139
#[derive(Debug)]
4240
struct NoOpExporterWithFuture {}
4341

44-
#[async_trait]
4542
impl LogExporterWithFuture for NoOpExporterWithFuture {
4643
async fn export(&mut self, _batch: LogBatch<'_>) {}
4744
}

opentelemetry-sdk/src/metrics/exporter.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
//! Interfaces for exporting metrics
2-
use async_trait::async_trait;
3-
42
use crate::error::OTelSdkResult;
53

64
use crate::metrics::data::ResourceMetrics;
@@ -10,17 +8,19 @@ use super::Temporality;
108
/// Exporter handles the delivery of metric data to external receivers.
119
///
1210
/// This is the final component in the metric push pipeline.
13-
#[async_trait]
1411
pub trait PushMetricExporter: Send + Sync + 'static {
1512
/// Export serializes and transmits metric data to a receiver.
1613
///
1714
/// All retry logic must be contained in this function. The SDK does not
1815
/// implement any retry logic. All errors returned by this function are
1916
/// considered unrecoverable and will be logged.
20-
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult;
17+
fn export(
18+
&self,
19+
metrics: &mut ResourceMetrics,
20+
) -> impl std::future::Future<Output = OTelSdkResult> + Send;
2121

2222
/// Flushes any metric data held by an exporter.
23-
async fn force_flush(&self) -> OTelSdkResult;
23+
fn force_flush(&self) -> impl std::future::Future<Output = OTelSdkResult> + Send;
2424

2525
/// Releases any held computational resources.
2626
///

opentelemetry-sdk/src/metrics/in_memory_exporter.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use crate::metrics::exporter::PushMetricExporter;
55
use crate::metrics::MetricError;
66
use crate::metrics::MetricResult;
77
use crate::metrics::Temporality;
8-
use async_trait::async_trait;
98
use std::collections::VecDeque;
109
use std::fmt;
1110
use std::sync::{Arc, Mutex};
@@ -263,7 +262,6 @@ impl InMemoryMetricExporter {
263262
}
264263
}
265264

266-
#[async_trait]
267265
impl PushMetricExporter for InMemoryMetricExporter {
268266
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
269267
self.metrics

opentelemetry-sdk/src/metrics/periodic_reader.rs

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ where
5959
}
6060

6161
/// Create a [PeriodicReader] with the given config.
62-
pub fn build(self) -> PeriodicReader {
62+
pub fn build(self) -> PeriodicReader<E> {
6363
PeriodicReader::new(self.exporter, self.interval)
6464
}
6565
}
@@ -124,24 +124,25 @@ where
124124
/// # drop(reader);
125125
/// # }
126126
/// ```
127-
#[derive(Clone)]
128-
pub struct PeriodicReader {
129-
inner: Arc<PeriodicReaderInner>,
127+
pub struct PeriodicReader<E: PushMetricExporter> {
128+
inner: Arc<PeriodicReaderInner<E>>,
130129
}
131130

132-
impl PeriodicReader {
131+
impl<E: PushMetricExporter> Clone for PeriodicReader<E> {
132+
fn clone(&self) -> Self {
133+
Self {
134+
inner: Arc::clone(&self.inner),
135+
}
136+
}
137+
}
138+
139+
impl<E: PushMetricExporter> PeriodicReader<E> {
133140
/// Configuration options for a periodic reader with own thread
134-
pub fn builder<E>(exporter: E) -> PeriodicReaderBuilder<E>
135-
where
136-
E: PushMetricExporter,
137-
{
141+
pub fn builder(exporter: E) -> PeriodicReaderBuilder<E> {
138142
PeriodicReaderBuilder::new(exporter)
139143
}
140144

141-
fn new<E>(exporter: E, interval: Duration) -> Self
142-
where
143-
E: PushMetricExporter,
144-
{
145+
fn new(exporter: E, interval: Duration) -> Self {
145146
let (message_sender, message_receiver): (Sender<Message>, Receiver<Message>) =
146147
mpsc::channel();
147148
let exporter_arc = Arc::new(exporter);
@@ -333,19 +334,19 @@ impl PeriodicReader {
333334
}
334335
}
335336

336-
impl fmt::Debug for PeriodicReader {
337+
impl<E: PushMetricExporter> fmt::Debug for PeriodicReader<E> {
337338
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
338339
f.debug_struct("PeriodicReader").finish()
339340
}
340341
}
341342

342-
struct PeriodicReaderInner {
343-
exporter: Arc<dyn PushMetricExporter>,
343+
struct PeriodicReaderInner<E: PushMetricExporter> {
344+
exporter: Arc<E>,
344345
message_sender: mpsc::Sender<Message>,
345346
producer: Mutex<Option<Weak<dyn SdkProducer>>>,
346347
}
347348

348-
impl PeriodicReaderInner {
349+
impl<E: PushMetricExporter> PeriodicReaderInner<E> {
349350
fn register_pipeline(&self, producer: Weak<dyn SdkProducer>) {
350351
let mut inner = self.producer.lock().expect("lock poisoned");
351352
*inner = Some(producer);
@@ -472,7 +473,7 @@ enum Message {
472473
Shutdown(Sender<bool>),
473474
}
474475

475-
impl MetricReader for PeriodicReader {
476+
impl<E: PushMetricExporter> MetricReader for PeriodicReader<E> {
476477
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
477478
self.inner.register_pipeline(pipeline);
478479
}
@@ -516,7 +517,6 @@ mod tests {
516517
},
517518
Resource,
518519
};
519-
use async_trait::async_trait;
520520
use opentelemetry::metrics::MeterProvider;
521521
use std::{
522522
sync::{
@@ -548,7 +548,6 @@ mod tests {
548548
}
549549
}
550550

551-
#[async_trait]
552551
impl PushMetricExporter for MetricExporterThatFailsOnlyOnFirst {
553552
async fn export(&self, _metrics: &mut ResourceMetrics) -> OTelSdkResult {
554553
if self.count.fetch_add(1, Ordering::Relaxed) == 0 {
@@ -576,7 +575,6 @@ mod tests {
576575
is_shutdown: Arc<AtomicBool>,
577576
}
578577

579-
#[async_trait]
580578
impl PushMetricExporter for MockMetricExporter {
581579
async fn export(&self, _metrics: &mut ResourceMetrics) -> OTelSdkResult {
582580
Ok(())

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,10 @@ where
103103
}
104104

105105
/// Create a [PeriodicReader] with the given config.
106-
pub fn build(self) -> PeriodicReader {
106+
pub fn build(self) -> PeriodicReader<E> {
107107
let (message_sender, message_receiver) = mpsc::channel(256);
108108

109-
let worker = move |reader: &PeriodicReader| {
109+
let worker = move |reader: &PeriodicReader<E>| {
110110
let runtime = self.runtime.clone();
111111
let reader = reader.clone();
112112
self.runtime.spawn(Box::pin(async move {
@@ -184,33 +184,40 @@ where
184184
/// # drop(reader);
185185
/// # }
186186
/// ```
187-
#[derive(Clone)]
188-
pub struct PeriodicReader {
189-
exporter: Arc<dyn PushMetricExporter>,
190-
inner: Arc<Mutex<PeriodicReaderInner>>,
187+
pub struct PeriodicReader<E: PushMetricExporter> {
188+
exporter: Arc<E>,
189+
inner: Arc<Mutex<PeriodicReaderInner<E>>>,
191190
}
192191

193-
impl PeriodicReader {
192+
impl<E: PushMetricExporter> Clone for PeriodicReader<E> {
193+
fn clone(&self) -> Self {
194+
Self {
195+
exporter: Arc::clone(&self.exporter),
196+
inner: Arc::clone(&self.inner),
197+
}
198+
}
199+
}
200+
201+
impl<E: PushMetricExporter> PeriodicReader<E> {
194202
/// Configuration options for a periodic reader
195-
pub fn builder<E, RT>(exporter: E, runtime: RT) -> PeriodicReaderBuilder<E, RT>
203+
pub fn builder<RT>(exporter: E, runtime: RT) -> PeriodicReaderBuilder<E, RT>
196204
where
197-
E: PushMetricExporter,
198205
RT: Runtime,
199206
{
200207
PeriodicReaderBuilder::new(exporter, runtime)
201208
}
202209
}
203210

204-
impl fmt::Debug for PeriodicReader {
211+
impl<E: PushMetricExporter> fmt::Debug for PeriodicReader<E> {
205212
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
206213
f.debug_struct("PeriodicReader").finish()
207214
}
208215
}
209216

210-
struct PeriodicReaderInner {
217+
struct PeriodicReaderInner<E: PushMetricExporter> {
211218
message_sender: mpsc::Sender<Message>,
212219
is_shutdown: bool,
213-
sdk_producer_or_worker: ProducerOrWorker,
220+
sdk_producer_or_worker: ProducerOrWorker<E>,
214221
}
215222

216223
#[derive(Debug)]
@@ -220,19 +227,20 @@ enum Message {
220227
Shutdown(oneshot::Sender<OTelSdkResult>),
221228
}
222229

223-
enum ProducerOrWorker {
230+
enum ProducerOrWorker<E: PushMetricExporter> {
224231
Producer(Weak<dyn SdkProducer>),
225-
Worker(Box<dyn FnOnce(&PeriodicReader) + Send + Sync>),
232+
#[allow(clippy::type_complexity)]
233+
Worker(Box<dyn FnOnce(&PeriodicReader<E>) + Send + Sync>),
226234
}
227235

228-
struct PeriodicReaderWorker<RT: Runtime> {
229-
reader: PeriodicReader,
236+
struct PeriodicReaderWorker<E: PushMetricExporter, RT: Runtime> {
237+
reader: PeriodicReader<E>,
230238
timeout: Duration,
231239
runtime: RT,
232240
rm: ResourceMetrics,
233241
}
234242

235-
impl<RT: Runtime> PeriodicReaderWorker<RT> {
243+
impl<E: PushMetricExporter, RT: Runtime> PeriodicReaderWorker<E, RT> {
236244
async fn collect_and_export(&mut self) -> OTelSdkResult {
237245
self.reader
238246
.collect(&mut self.rm)
@@ -323,7 +331,7 @@ impl<RT: Runtime> PeriodicReaderWorker<RT> {
323331
}
324332
}
325333

326-
impl MetricReader for PeriodicReader {
334+
impl<E: PushMetricExporter> MetricReader for PeriodicReader<E> {
327335
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
328336
let mut inner = match self.inner.lock() {
329337
Ok(guard) => guard,

opentelemetry-stdout/Cargo.toml

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,19 @@ rustdoc-args = ["--cfg", "docsrs"]
2222
[features]
2323
default = ["trace", "metrics", "logs"]
2424
trace = ["opentelemetry/trace", "opentelemetry_sdk/trace", "futures-util"]
25-
metrics = ["async-trait", "opentelemetry/metrics", "opentelemetry_sdk/metrics"]
26-
logs = ["opentelemetry/logs", "opentelemetry_sdk/logs", "async-trait", "thiserror", "opentelemetry_sdk/spec_unstable_logs_enabled"]
25+
metrics = ["opentelemetry/metrics", "opentelemetry_sdk/metrics"]
26+
logs = ["opentelemetry/logs", "opentelemetry_sdk/logs", "opentelemetry_sdk/spec_unstable_logs_enabled"]
2727

2828
[dependencies]
29-
async-trait = { workspace = true, optional = true }
3029
chrono = { version = "0.4.34", default-features = false, features = ["now"] }
31-
thiserror = { workspace = true, optional = true }
3230
futures-util = { workspace = true, optional = true }
3331
opentelemetry = { version = "0.28", path = "../opentelemetry" }
3432
opentelemetry_sdk = { version = "0.28", path = "../opentelemetry-sdk" }
35-
serde = { workspace = true, features = ["derive"] }
3633

3734
[dev-dependencies]
3835
opentelemetry = { path = "../opentelemetry", features = ["metrics"] }
3936
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["rt-tokio", "metrics"] }
4037
opentelemetry-appender-tracing = { path = "../opentelemetry-appender-tracing"}
41-
opentelemetry-semantic-conventions = { path = "../opentelemetry-semantic-conventions" }
4238
tracing = { workspace = true, features = ["std"]}
4339
tracing-subscriber = { workspace = true, features = ["registry", "std"] }
4440
tokio = { workspace = true, features = ["full"] }

opentelemetry-stdout/src/metrics/exporter.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use async_trait::async_trait;
21
use chrono::{DateTime, Utc};
32
use core::{f64, fmt};
43
use opentelemetry_sdk::metrics::Temporality;
@@ -39,7 +38,6 @@ impl fmt::Debug for MetricExporter {
3938
}
4039
}
4140

42-
#[async_trait]
4341
impl PushMetricExporter for MetricExporter {
4442
/// Write Metrics to stdout
4543
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {

0 commit comments

Comments
 (0)