Skip to content

Commit 9d351b6

Browse files
authored
Move metrics::Io to io::SensorIo (#610)
This change decouples the I/O stream wrapper from the metrics implementation so that it can implement the Io trait (so that it can be boxed).
1 parent eee08f9 commit 9d351b6

File tree

7 files changed

+133
-109
lines changed

7 files changed

+133
-109
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,6 +1122,7 @@ version = "0.1.0"
11221122
dependencies = [
11231123
"bytes 0.5.4",
11241124
"futures 0.3.5",
1125+
"linkerd2-errno",
11251126
"pin-project",
11261127
"tokio",
11271128
"tokio-rustls",

linkerd/app/core/src/proxy/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ where
144144
impl<L, F, H, B> Service<Connection> for Server<L, F, H, B>
145145
where
146146
L: TransportLabels<Protocol, Labels = TransportKey>,
147-
F: Accept<(tls::accept::Meta, transport::metrics::Io<BoxedIo>)> + Clone + Send + 'static,
147+
F: Accept<(tls::accept::Meta, transport::metrics::SensorIo<BoxedIo>)> + Clone + Send + 'static,
148148
F::Future: Send + 'static,
149149
F::ConnectionFuture: Send + 'static,
150150
H: NewService<tls::accept::Meta> + Send + 'static,

linkerd/io/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ General I/O primitives.
1111
[dependencies]
1212
futures = "0.3"
1313
bytes = "0.5"
14+
linkerd2-errno = { path = "../errno" }
1415
tokio = { version = "0.2", features = ["io-util", "net", "macros"] }
1516
tokio-rustls = "0.13"
1617
pin-project = "0.4"

linkerd/io/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
mod boxed;
22
mod peek;
33
mod prefixed;
4+
mod sensor;
5+
46
pub use self::{
57
boxed::BoxedIo,
68
peek::{Peek, Peekable},
79
prefixed::PrefixedIo,
10+
sensor::{Sensor, SensorIo},
811
};
912
pub use std::io::{Error, Read, Result, Write};
1013
pub use tokio::io::{AsyncRead, AsyncWrite};

linkerd/io/src/sensor.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
use crate::{internal::Io, Poll};
2+
use bytes::{Buf, BufMut};
3+
use futures::ready;
4+
use linkerd2_errno::Errno;
5+
use pin_project::pin_project;
6+
use std::mem::MaybeUninit;
7+
use std::pin::Pin;
8+
use std::task::Context;
9+
use tokio::io::{AsyncRead, AsyncWrite};
10+
11+
pub trait Sensor {
12+
fn record_read(&mut self, sz: usize);
13+
fn record_write(&mut self, sz: usize);
14+
fn record_close(&mut self, eos: Option<Errno>);
15+
fn record_error<T>(&mut self, op: Poll<T>) -> Poll<T>;
16+
}
17+
18+
/// Wraps a transport with telemetry.
19+
#[pin_project]
20+
#[derive(Debug)]
21+
pub struct SensorIo<T, S> {
22+
#[pin]
23+
io: T,
24+
25+
sensor: S,
26+
}
27+
28+
// === impl SensorIo ===
29+
30+
impl<T, S: Sensor> SensorIo<T, S> {
31+
pub fn new(io: T, sensor: S) -> Self {
32+
Self { io, sensor }
33+
}
34+
}
35+
36+
impl<T: AsyncRead + AsyncWrite, S: Sensor> AsyncRead for SensorIo<T, S> {
37+
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<usize> {
38+
let this = self.project();
39+
let bytes = ready!(this.sensor.record_error(this.io.poll_read(cx, buf)))?;
40+
this.sensor.record_read(bytes);
41+
Poll::Ready(Ok(bytes))
42+
}
43+
44+
fn poll_read_buf<B: BufMut>(
45+
self: Pin<&mut Self>,
46+
cx: &mut Context<'_>,
47+
buf: &mut B,
48+
) -> Poll<usize> {
49+
let this = self.project();
50+
let bytes = ready!(this.sensor.record_error(this.io.poll_read_buf(cx, buf)))?;
51+
this.sensor.record_read(bytes);
52+
Poll::Ready(Ok(bytes))
53+
}
54+
55+
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
56+
self.io.prepare_uninitialized_buffer(buf)
57+
}
58+
}
59+
60+
impl<T: AsyncRead + AsyncWrite, S: Sensor> AsyncWrite for SensorIo<T, S> {
61+
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
62+
let this = self.project();
63+
this.sensor.record_error(this.io.poll_shutdown(cx))
64+
}
65+
66+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
67+
let this = self.project();
68+
this.sensor.record_error(this.io.poll_flush(cx))
69+
}
70+
71+
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<usize> {
72+
let this = self.project();
73+
let bytes = ready!(this.sensor.record_error(this.io.poll_write(cx, buf)))?;
74+
this.sensor.record_write(bytes);
75+
Poll::Ready(Ok(bytes))
76+
}
77+
78+
fn poll_write_buf<B: Buf>(
79+
self: Pin<&mut Self>,
80+
cx: &mut Context<'_>,
81+
buf: &mut B,
82+
) -> Poll<usize> {
83+
let this = self.project();
84+
let bytes = ready!(this.sensor.record_error(this.io.poll_write_buf(cx, buf)))?;
85+
this.sensor.record_write(bytes);
86+
Poll::Ready(Ok(bytes))
87+
}
88+
}
89+
90+
impl<T: Io, S: Sensor + Send> Io for SensorIo<T, S> {
91+
fn poll_write_buf_erased(
92+
self: Pin<&mut Self>,
93+
cx: &mut Context<'_>,
94+
mut buf: &mut dyn Buf,
95+
) -> Poll<usize> {
96+
self.poll_write_buf(cx, &mut buf)
97+
}
98+
99+
fn poll_read_buf_erased(
100+
self: Pin<&mut Self>,
101+
cx: &mut Context<'_>,
102+
mut buf: &mut dyn BufMut,
103+
) -> Poll<usize> {
104+
self.poll_read_buf(cx, &mut buf)
105+
}
106+
}

linkerd/proxy/transport/src/metrics/mod.rs renamed to linkerd/proxy/transport/src/metrics.rs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use futures::{ready, TryFuture};
22
use indexmap::IndexMap;
33
use linkerd2_errno::Errno;
4+
use linkerd2_io as io;
45
use linkerd2_metrics::{
56
latency, metrics, Counter, FmtLabels, FmtMetric, FmtMetrics, Gauge, Histogram, Metric,
67
};
@@ -15,10 +16,6 @@ use std::time::Instant;
1516
use tokio::io::{AsyncRead, AsyncWrite};
1617
use tracing::debug;
1718

18-
mod io;
19-
20-
pub use self::io::Io;
21-
2219
metrics! {
2320
tcp_open_total: Counter { "Total count of opened connections" },
2421
tcp_open_connections: Gauge { "Number of currently-open connections" },
@@ -68,9 +65,6 @@ pub struct Connecting<F> {
6865
}
6966

7067
/// Stores a class of transport's metrics.
71-
///
72-
/// TODO We should probaby use AtomicUsize for most of these counters so that
73-
/// simple increments don't require a lock. Especially for read|write_bytes_total.
7468
#[derive(Debug, Default)]
7569
struct Metrics {
7670
open_total: Counter,
@@ -98,11 +92,13 @@ struct EosMetrics {
9892

9993
/// Tracks the state of a single instance of `Io` throughout its lifetime.
10094
#[derive(Debug)]
101-
struct Sensor {
95+
pub struct Sensor {
10296
metrics: Option<Arc<Metrics>>,
10397
opened_at: Instant,
10498
}
10599

100+
pub type SensorIo<T> = io::SensorIo<T, Sensor>;
101+
106102
/// Lazily builds instances of `Sensor`.
107103
#[derive(Clone, Debug)]
108104
struct NewSensor(Arc<Metrics>);
@@ -182,14 +178,18 @@ impl<K: Eq + Hash + FmtLabels> Registry<K> {
182178
ConnectLayer::new(label, self.0.clone())
183179
}
184180

185-
pub fn wrap_server_transport<T: AsyncRead + AsyncWrite>(&self, labels: K, io: T) -> Io<T> {
181+
pub fn wrap_server_transport<T: AsyncRead + AsyncWrite>(
182+
&self,
183+
labels: K,
184+
io: T,
185+
) -> SensorIo<T> {
186186
let metrics = self
187187
.0
188188
.lock()
189189
.expect("metrics registry poisoned")
190190
.get_or_default(labels)
191191
.clone();
192-
Io::new(io, Sensor::open(metrics))
192+
SensorIo::new(io, Sensor::open(metrics))
193193
}
194194
}
195195

@@ -239,7 +239,7 @@ where
239239
L: TransportLabels<T>,
240240
M: tower::make::MakeConnection<T>,
241241
{
242-
type Response = Io<M::Connection>;
242+
type Response = SensorIo<M::Connection>;
243243
type Error = M::Error;
244244
type Future = Connecting<M::Future>;
245245

@@ -270,7 +270,7 @@ where
270270
F: TryFuture,
271271
F::Ok: AsyncRead + AsyncWrite,
272272
{
273-
type Output = Result<Io<F::Ok>, F::Error>;
273+
type Output = Result<SensorIo<F::Ok>, F::Error>;
274274

275275
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
276276
let this = self.project();
@@ -282,7 +282,7 @@ where
282282
.take()
283283
.expect("future must not be polled after ready")
284284
.new_sensor();
285-
let t = Io::new(io, sensor);
285+
let t = SensorIo::new(io, sensor);
286286
Poll::Ready(Ok(t))
287287
}
288288
}
@@ -321,28 +321,30 @@ impl<K: Eq + Hash + FmtLabels> FmtMetrics for Report<K> {
321321
// ===== impl Sensor =====
322322

323323
impl Sensor {
324-
pub fn open(metrics: Arc<Metrics>) -> Self {
324+
fn open(metrics: Arc<Metrics>) -> Self {
325325
metrics.open_total.incr();
326326
metrics.open_connections.incr();
327327
Self {
328328
metrics: Some(metrics),
329329
opened_at: Instant::now(),
330330
}
331331
}
332+
}
332333

333-
pub fn record_read(&mut self, sz: usize) {
334+
impl io::Sensor for Sensor {
335+
fn record_read(&mut self, sz: usize) {
334336
if let Some(ref m) = self.metrics {
335337
m.read_bytes_total.add(sz as u64);
336338
}
337339
}
338340

339-
pub fn record_write(&mut self, sz: usize) {
341+
fn record_write(&mut self, sz: usize) {
340342
if let Some(ref m) = self.metrics {
341343
m.write_bytes_total.add(sz as u64);
342344
}
343345
}
344346

345-
pub fn record_close(&mut self, eos: Option<Errno>) {
347+
fn record_close(&mut self, eos: Option<Errno>) {
346348
let duration = self.opened_at.elapsed();
347349
// When closed, the metrics structure is dropped so that no further
348350
// updates can occur (i.e. so that an additional close won't be recorded
@@ -361,7 +363,7 @@ impl Sensor {
361363
///
362364
/// If the transport operation results in a non-recoverable error, record a
363365
/// transport closure.
364-
fn sense_err<T>(&mut self, op: Poll<std::io::Result<T>>) -> Poll<std::io::Result<T>> {
366+
fn record_error<T>(&mut self, op: Poll<std::io::Result<T>>) -> Poll<std::io::Result<T>> {
365367
match op {
366368
Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
367369
Poll::Ready(Err(e)) => {
@@ -379,7 +381,7 @@ impl Sensor {
379381

380382
impl Drop for Sensor {
381383
fn drop(&mut self) {
382-
self.record_close(None)
384+
io::Sensor::record_close(self, None)
383385
}
384386
}
385387

linkerd/proxy/transport/src/metrics/io.rs

Lines changed: 0 additions & 89 deletions
This file was deleted.

0 commit comments

Comments
 (0)