Skip to content

Commit 4c3d706

Browse files
authored
internal: Remove the SetKeepalive trait (#363)
In preparation of decoupling the listener logic from TLS discovery, I'm trying to simplify the transport module. This change is intended to make the `listen` and `connect` modules primarily responsible for preparing a `TCPStream` before any other transformations are applied to the IO stream. The `internal::Io` trait can be narrowed and the various `Io` impls no longer need to proxy this API. In follow-up changes, the `listen` module will be decoupled from identity/TLS; and some of the `transport` core types will be split out into subcrates.
1 parent b7faa70 commit 4c3d706

File tree

11 files changed

+72
-241
lines changed

11 files changed

+72
-241
lines changed

src/app/inbound/mod.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::proxy::http::{
55
strip_header,
66
};
77
use crate::proxy::{accept, Server};
8-
use crate::transport::{self, connect, keepalive, tls, Connection};
8+
use crate::transport::{self, connect, tls, Connection};
99
use crate::{core::listen::ServeConnection, svc, trace_context, Addr};
1010
use linkerd2_reconnect as reconnect;
1111
use opencensus_proto::trace::v1 as oc;
@@ -55,9 +55,8 @@ where
5555

5656
// Establishes connections to the local application (for both
5757
// TCP forwarding and HTTP proxying).
58-
let connect = svc::stack(connect::svc())
58+
let connect = svc::stack(connect::svc(config.inbound_connect_keepalive))
5959
.push(tls::client::layer(local_identity))
60-
.push(keepalive::connect::layer(config.inbound_connect_keepalive))
6160
.push_timeout(config.inbound_connect_timeout)
6261
.push(transport_metrics.connect("inbound"))
6362
.push(rewrite_loopback_addr::layer());
@@ -212,9 +211,7 @@ where
212211

213212
// As the inbound proxy accepts connections, we don't do any
214213
// special transport-level handling.
215-
let accept = accept::builder()
216-
.push(keepalive::accept::layer(config.inbound_accept_keepalive))
217-
.push(transport_metrics.accept("inbound"));
214+
let accept = accept::builder().push(transport_metrics.accept("inbound"));
218215

219216
Server::new(
220217
"out",

src/app/main.rs

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use super::{handle_time, inbound, outbound, tap::serve_tap};
88
use crate::opencensus::SpanExporter;
99
use crate::proxy::{self, http::metrics as http_metrics};
1010
use crate::svc::{self, LayerExt};
11-
use crate::transport::{self, connect, keepalive, tls, GetOriginalDst, Listen};
11+
use crate::transport::{self, connect, tls, GetOriginalDst, Listen};
1212
use crate::{dns, drain, logging, metrics::FmtMetrics, tap, task, telemetry, trace, Conditional};
1313
use futures::{self, future, Future};
1414
use linkerd2_reconnect as reconnect;
@@ -70,29 +70,40 @@ where
7070
let local_identity = identity.as_ref().map(|(l, _)| l.clone());
7171

7272
let control_listener = config.control_listener.as_ref().map(|cl| {
73-
let listener = Listen::bind(cl.listener.addr, local_identity.clone())
74-
.expect("dst_svc listener bind");
73+
let listener = Listen::bind(
74+
cl.listener.addr,
75+
local_identity.clone(),
76+
config.inbound_accept_keepalive,
77+
)
78+
.expect("tap listener bind");
7579

7680
(listener, cl.tap_svc_name.clone())
7781
});
7882

79-
let admin_listener = Listen::bind(config.admin_listener.addr, local_identity.clone())
80-
.expect("metrics listener bind");
83+
let admin_listener = Listen::bind(
84+
config.admin_listener.addr,
85+
local_identity.clone(),
86+
config.inbound_accept_keepalive,
87+
)
88+
.expect("tap listener bind");
8189

8290
let outbound_listener = Listen::bind(
8391
config.outbound_listener.addr,
8492
Conditional::None(tls::ReasonForNoPeerName::Loopback.into()),
93+
config.outbound_accept_keepalive,
8594
)
8695
.expect("outbound listener bind")
8796
.with_original_dst(get_original_dst.clone())
8897
.without_protocol_detection_for(config.outbound_ports_disable_protocol_detection.clone());
8998

90-
let inbound_listener = Listen::bind(config.inbound_listener.addr, local_identity)
91-
.expect("inbound listener bind")
92-
.with_original_dst(get_original_dst.clone())
93-
.without_protocol_detection_for(
94-
config.inbound_ports_disable_protocol_detection.clone(),
95-
);
99+
let inbound_listener = Listen::bind(
100+
config.inbound_listener.addr,
101+
local_identity,
102+
config.inbound_accept_keepalive,
103+
)
104+
.expect("inbound listener bind")
105+
.with_original_dst(get_original_dst.clone())
106+
.without_protocol_detection_for(config.inbound_ports_disable_protocol_detection.clone());
96107

97108
let runtime = runtime.into();
98109

@@ -265,11 +276,10 @@ where
265276
config.outbound_connect_keepalive
266277
};
267278

268-
let svc = svc::stack(connect::svc())
279+
let svc = svc::stack(connect::svc(keepalive))
269280
.push(tls::client::layer(Conditional::Some(
270281
id_config.trust_anchors.clone(),
271282
)))
272-
.push(keepalive::connect::layer(keepalive))
273283
.push_timeout(config.control_connect_timeout)
274284
.push(control::client::layer())
275285
.push(control::resolve::layer(dns_resolver.clone()))
@@ -318,9 +328,8 @@ where
318328
config.outbound_connect_keepalive
319329
};
320330

321-
svc::stack(connect::svc())
331+
svc::stack(connect::svc(keepalive))
322332
.push(tls::client::layer(local_identity.clone()))
323-
.push(keepalive::connect::layer(keepalive))
324333
.push_timeout(config.control_connect_timeout)
325334
.push(control::client::layer())
326335
.push(control::resolve::layer(dns_resolver.clone()))
@@ -406,9 +415,13 @@ where
406415
);
407416

408417
let trace_collector_svc = config.trace_collector_addr.as_ref().map(|addr| {
409-
svc::stack(connect::svc())
418+
let keepalive = if addr.addr.is_loopback() {
419+
config.inbound_connect_keepalive
420+
} else {
421+
config.outbound_connect_keepalive
422+
};
423+
svc::stack(connect::svc(keepalive))
410424
.push(tls::client::layer(local_identity.clone()))
411-
.push(keepalive::connect::layer(config.outbound_connect_keepalive))
412425
.push_timeout(config.control_connect_timeout)
413426
// TODO: perhaps rename from "control" to "grpc"
414427
.push(control::client::layer())

src/app/outbound/mod.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::proxy::http::{
88
};
99
use crate::proxy::{self, accept, Server};
1010
use crate::transport::Connection;
11-
use crate::transport::{self, connect, keepalive, tls};
11+
use crate::transport::{self, connect, tls};
1212
use crate::{svc, trace_context, Addr};
1313
use linkerd2_proxy_discover as discover;
1414
use linkerd2_reconnect as reconnect;
@@ -72,9 +72,8 @@ where
7272

7373
// Establishes connections to remote peers (for both TCP
7474
// forwarding and HTTP proxying).
75-
let connect = svc::stack(connect::svc())
75+
let connect = svc::stack(connect::svc(config.outbound_connect_keepalive))
7676
.push(tls::client::layer(local_identity))
77-
.push(keepalive::connect::layer(config.outbound_connect_keepalive))
7877
.push_timeout(config.outbound_connect_timeout)
7978
.push(transport_metrics.connect("outbound"));
8079

@@ -285,9 +284,7 @@ where
285284

286285
// Instantiated for each TCP connection received from the local
287286
// application (including HTTP connections).
288-
let accept = accept::builder()
289-
.push(keepalive::accept::layer(config.outbound_accept_keepalive))
290-
.push(transport_metrics.accept("outbound"));
287+
let accept = accept::builder().push(transport_metrics.accept("outbound"));
291288

292289
Server::new(
293290
"out",

src/transport/connect.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::svc::{mk, Service};
22
use futures::{try_ready, Future, Poll};
3-
use std::{io, net::SocketAddr};
3+
use std::{io, net::SocketAddr, time::Duration};
44
use tokio::net::{tcp, TcpStream};
55
use tracing::debug;
66

@@ -9,15 +9,17 @@ pub trait HasPeerAddr {
99
}
1010

1111
pub fn svc<T>(
12+
keepalive: Option<Duration>,
1213
) -> impl Service<T, Response = TcpStream, Error = io::Error, Future = ConnectFuture> + Clone
1314
where
1415
T: HasPeerAddr,
1516
{
16-
mk(|target: T| {
17+
mk(move |target: T| {
1718
let addr = target.peer_addr();
1819
debug!("connecting to {}", addr);
1920
ConnectFuture {
2021
addr,
22+
keepalive,
2123
future: TcpStream::connect(&addr),
2224
}
2325
})
@@ -26,6 +28,7 @@ where
2628
#[derive(Debug)]
2729
pub struct ConnectFuture {
2830
addr: SocketAddr,
31+
keepalive: Option<Duration>,
2932
future: tcp::ConnectFuture,
3033
}
3134

@@ -48,6 +51,7 @@ impl Future for ConnectFuture {
4851
}));
4952
debug!("connection established to {}", self.addr);
5053
super::set_nodelay_or_warn(&io);
54+
super::set_keepalive_or_warn(&io, self.keepalive);
5155
Ok(io.into())
5256
}
5357
}

src/transport/io.rs

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use futures::Poll;
66
use tokio::io::{AsyncRead, AsyncWrite};
77

88
use self::internal::Io;
9-
use super::{AddrInfo, SetKeepalive};
9+
use super::AddrInfo;
1010

1111
/// A public wrapper around a `Box<Io>`.
1212
///
@@ -76,18 +76,8 @@ impl AddrInfo for BoxedIo {
7676
}
7777
}
7878

79-
impl SetKeepalive for BoxedIo {
80-
fn keepalive(&self) -> io::Result<Option<::std::time::Duration>> {
81-
self.0.keepalive()
82-
}
83-
84-
fn set_keepalive(&mut self, ka: Option<::std::time::Duration>) -> io::Result<()> {
85-
self.0.set_keepalive(ka)
86-
}
87-
}
88-
8979
pub(super) mod internal {
90-
use super::{AddrInfo, AsyncRead, AsyncWrite, Buf, Poll, SetKeepalive, Shutdown};
80+
use super::{AddrInfo, AsyncRead, AsyncWrite, Buf, Poll, Shutdown};
9181
use std::io;
9282
use tokio::net::TcpStream;
9383

@@ -96,7 +86,7 @@ pub(super) mod internal {
9686
/// writes.
9787
///
9888
/// Instead, used the concrete `BoxedIo` type.
99-
pub trait Io: AddrInfo + AsyncRead + AsyncWrite + SetKeepalive + Send {
89+
pub trait Io: AddrInfo + AsyncRead + AsyncWrite + Send {
10090
fn shutdown_write(&mut self) -> io::Result<()>;
10191

10292
/// This method is to allow using `Async::write_buf` even through a
@@ -163,16 +153,6 @@ mod tests {
163153
}
164154
}
165155

166-
impl SetKeepalive for WriteBufDetector {
167-
fn keepalive(&self) -> io::Result<Option<::std::time::Duration>> {
168-
unreachable!("not called in test")
169-
}
170-
171-
fn set_keepalive(&mut self, _: Option<::std::time::Duration>) -> io::Result<()> {
172-
unreachable!("not called in test")
173-
}
174-
}
175-
176156
impl Io for WriteBufDetector {
177157
fn shutdown_write(&mut self) -> Result<(), io::Error> {
178158
unreachable!("not called in test")

src/transport/keepalive.rs

Lines changed: 0 additions & 124 deletions
This file was deleted.

0 commit comments

Comments
 (0)