Skip to content

Commit 2f7097a

Browse files
authored
feat(metrics): add support for metrics to the fake-opentelemetry-collector (#302)
1 parent 822de32 commit 2f7097a

File tree

5 files changed

+708
-5
lines changed

5 files changed

+708
-5
lines changed

fake-opentelemetry-collector/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ opentelemetry-otlp = { workspace = true, features = [
1818
"grpc-tonic",
1919
"logs",
2020
"trace",
21+
"metrics",
2122
] }
2223
opentelemetry-proto = { workspace = true, features = [
2324
"gen-tonic",

fake-opentelemetry-collector/src/lib.rs

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,24 @@
11
mod common;
22
mod logs;
3+
mod metrics;
34
mod trace;
4-
pub use logs::ExportedLog;
5-
pub use trace::ExportedSpan;
65

76
use logs::*;
7+
use metrics::*;
88
use trace::*;
99

10-
use std::net::SocketAddr;
11-
use std::time::{Duration, Instant};
10+
pub use logs::ExportedLog;
11+
pub use metrics::ExportedMetric;
12+
pub use trace::ExportedSpan;
1213

1314
use futures::StreamExt;
14-
use opentelemetry_otlp::{LogExporter, SpanExporter, WithExportConfig};
15+
use opentelemetry_otlp::{LogExporter, MetricExporter, SpanExporter, WithExportConfig};
1516
use opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer;
17+
use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer;
1618
use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer;
19+
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
20+
use std::net::SocketAddr;
21+
use std::time::{Duration, Instant};
1722
use tokio::sync::mpsc;
1823
use tokio::sync::mpsc::Receiver;
1924
use tokio_stream::wrappers::TcpListenerStream;
@@ -23,6 +28,7 @@ pub struct FakeCollectorServer {
2328
address: SocketAddr,
2429
req_rx: mpsc::Receiver<ExportedSpan>,
2530
log_rx: mpsc::Receiver<ExportedLog>,
31+
metrics_rx: mpsc::Receiver<ExportedMetric>,
2632
handle: tokio::task::JoinHandle<()>,
2733
}
2834

@@ -40,13 +46,16 @@ impl FakeCollectorServer {
4046

4147
let (req_tx, req_rx) = mpsc::channel::<ExportedSpan>(64);
4248
let (log_tx, log_rx) = mpsc::channel::<ExportedLog>(64);
49+
let (metrics_tx, metrics_rx) = mpsc::channel::<ExportedMetric>(64);
4350
let trace_service = TraceServiceServer::new(FakeTraceService::new(req_tx));
4451
let logs_service = LogsServiceServer::new(FakeLogsService::new(log_tx));
52+
let metrics_service = MetricsServiceServer::new(FakeMetricsService::new(metrics_tx));
4553
let handle = tokio::task::spawn(async move {
4654
debug!("start FakeCollectorServer http://{addr}"); //Devskim: ignore DS137138)
4755
tonic::transport::Server::builder()
4856
.add_service(trace_service)
4957
.add_service(logs_service)
58+
.add_service(metrics_service)
5059
.serve_with_incoming(stream)
5160
.await
5261
.expect("Server failed");
@@ -56,6 +65,7 @@ impl FakeCollectorServer {
5665
address: addr,
5766
req_rx,
5867
log_rx,
68+
metrics_rx,
5969
handle,
6070
})
6171
}
@@ -80,6 +90,14 @@ impl FakeCollectorServer {
8090
recv_many(&mut self.log_rx, at_least, timeout).await
8191
}
8292

93+
pub async fn exported_metrics(
94+
&mut self,
95+
at_least: usize,
96+
timeout: Duration,
97+
) -> Vec<ExportedMetric> {
98+
recv_many(&mut self.metrics_rx, at_least, timeout).await
99+
}
100+
83101
pub fn abort(self) {
84102
self.handle.abort()
85103
}
@@ -125,3 +143,17 @@ pub async fn setup_logger_provider(
125143
)
126144
.build()
127145
}
146+
147+
pub async fn setup_meter_provider(
148+
fake_server: &FakeCollectorServer,
149+
) -> opentelemetry_sdk::metrics::SdkMeterProvider {
150+
let exporter = MetricExporter::builder()
151+
.with_tonic()
152+
.with_endpoint(fake_server.endpoint())
153+
.build()
154+
.expect("failed to install metrics");
155+
156+
let reader = PeriodicReader::builder(exporter).build();
157+
158+
SdkMeterProvider::builder().with_reader(reader).build()
159+
}

0 commit comments

Comments
 (0)