Skip to content

Commit 12011fb

Browse files
authored
Make SkipDetect more generic as stack::MakeSwitch (#657)
SkipDetect is needlessly coupled to its `listen::Addrs` target type and the Accept stack's shape. In reality, it's pretty similar to the fallback module, except that decisions are made on the target rather than the response future. This change replaces the `core::proxy::skip_detct` module with `stack::switch`. Above all, this allows us to use other targets with this module, which is needed to do discovery before protoocl detection (and to use the results of discovery to inform detection).
1 parent b59172a commit 12011fb

File tree

8 files changed

+108
-114
lines changed

8 files changed

+108
-114
lines changed

linkerd/app/core/src/config.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
pub use crate::exp_backoff::ExponentialBackoff;
22
pub use crate::proxy::http::h2;
33
pub use crate::transport::{Bind, DefaultOrigDstAddr, NoOrigDstAddr, OrigDstAddr};
4-
use indexmap::IndexSet;
5-
use std::sync::Arc;
64
use std::time::Duration;
75

86
#[derive(Clone, Debug)]
@@ -25,7 +23,7 @@ pub struct ProxyConfig {
2523
pub connect: ConnectConfig,
2624
pub buffer_capacity: usize,
2725
pub cache_max_idle_age: Duration,
28-
pub disable_protocol_detection_for_ports: Arc<IndexSet<u16>>,
26+
pub disable_protocol_detection_for_ports: crate::SkipByPort,
2927
pub dispatch_timeout: Duration,
3028
pub max_in_flight_requests: usize,
3129
pub detect_protocol_timeout: Duration,

linkerd/app/core/src/lib.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,3 +121,21 @@ impl From<Addr> for DiscoveryRejected {
121121
Self::new()
122122
}
123123
}
124+
125+
#[derive(Clone, Debug)]
126+
pub struct SkipByPort(std::sync::Arc<indexmap::IndexSet<u16>>);
127+
128+
impl From<indexmap::IndexSet<u16>> for SkipByPort {
129+
fn from(ports: indexmap::IndexSet<u16>) -> Self {
130+
SkipByPort(ports.into())
131+
}
132+
}
133+
134+
impl<T> linkerd2_stack::Switch<T> for SkipByPort
135+
where
136+
for<'t> &'t T: Into<std::net::SocketAddr>,
137+
{
138+
fn use_primary(&self, addrs: &T) -> bool {
139+
!self.0.contains(&addrs.into().port())
140+
}
141+
}

linkerd/app/core/src/proxy/mod.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,3 @@ pub use linkerd2_proxy_identity as identity;
99
pub use linkerd2_proxy_resolve as resolve;
1010
pub use linkerd2_proxy_tap as tap;
1111
pub use linkerd2_proxy_tcp as tcp;
12-
13-
mod skip_detect;
14-
15-
pub use self::skip_detect::SkipDetect;

linkerd/app/core/src/proxy/skip_detect.rs

Lines changed: 0 additions & 103 deletions
This file was deleted.

linkerd/app/inbound/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use linkerd2_app_core::{
1717
profiles,
1818
proxy::{
1919
http::{self, orig_proto, strip_header, DetectHttp},
20-
identity, tap, tcp, SkipDetect,
20+
identity, tap, tcp,
2121
},
2222
reconnect, router, serve,
2323
spans::SpanConverter,
@@ -392,7 +392,7 @@ impl Config {
392392
.push_map_target(TcpEndpoint::from)
393393
.push(metrics.transport.layer_accept(TransportLabels))
394394
.into_inner();
395-
let accept = SkipDetect::new(skip_detect, tls, accept_fwd);
395+
let accept = svc::stack::MakeSwitch::new(skip_detect, tls, accept_fwd);
396396

397397
info!(addr = %listen_addr, "Serving");
398398
serve::serve(listen, accept, drain.signal()).await

linkerd/app/outbound/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use linkerd2_app_core::{
1616
profiles,
1717
proxy::{
1818
self, core::resolve::Resolve, discover, http, identity, resolve::map_endpoint, tap, tcp,
19-
SkipDetect,
2019
},
2120
reconnect, retry, router, serve,
2221
spans::SpanConverter,
@@ -500,7 +499,7 @@ impl Config {
500499
.push(admit::AdmitLayer::new(prevent_loop))
501500
.push_map_target(TcpEndpoint::from);
502501

503-
let accept = svc::stack(SkipDetect::new(skip_detect, http, tcp_forward))
502+
let accept = svc::stack(svc::stack::MakeSwitch::new(skip_detect, http, tcp_forward))
504503
.push(metrics.transport.layer_accept(TransportLabels));
505504

506505
info!(addr = %listen_addr, "Serving");

linkerd/stack/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub mod new_service;
1313
pub mod on_response;
1414
mod oneshot;
1515
mod proxy;
16+
mod switch;
1617

1718
pub use self::fallback::{Fallback, FallbackLayer};
1819
pub use self::future_service::FutureService;
@@ -24,3 +25,4 @@ pub use self::new_service::NewService;
2425
pub use self::on_response::{OnResponse, OnResponseLayer};
2526
pub use self::oneshot::{Oneshot, OneshotLayer};
2627
pub use self::proxy::{Proxy, ProxyService};
28+
pub use self::switch::{MakeSwitch, Switch};

linkerd/stack/src/switch.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
//! A middleware that switches between two underlying stacks, depending on the
2+
//! target type.
3+
4+
use futures::{future, prelude::*};
5+
use linkerd2_error::Error;
6+
use std::task::{Context, Poll};
7+
use tower::util::ServiceExt;
8+
9+
/// Determines whether the primary stack should be used.
10+
pub trait Switch<T> {
11+
fn use_primary(&self, target: &T) -> bool;
12+
}
13+
14+
/// Makes either the primary or fallback stack, as determined by an `S`-typed
15+
/// `Switch`.
16+
pub struct MakeSwitch<S, P, F> {
17+
switch: S,
18+
primary: P,
19+
fallback: F,
20+
}
21+
22+
impl<S, P, F> MakeSwitch<S, P, F> {
23+
pub fn new(switch: S, primary: P, fallback: F) -> Self {
24+
MakeSwitch {
25+
switch,
26+
primary,
27+
fallback,
28+
}
29+
}
30+
31+
pub fn layer(switch: S, fallback: F) -> impl super::layer::Layer<P, Service = Self> + Clone
32+
where
33+
S: Clone,
34+
F: Clone,
35+
{
36+
super::layer::mk(move |primary| Self::new(switch.clone(), primary, fallback.clone()))
37+
}
38+
}
39+
40+
impl<T, S, P, F> tower::Service<T> for MakeSwitch<S, P, F>
41+
where
42+
S: Switch<T>,
43+
P: tower::Service<T> + Clone,
44+
P::Error: Into<Error>,
45+
F: tower::Service<T> + Clone,
46+
F::Error: Into<Error>,
47+
{
48+
type Response = tower::util::Either<P::Response, F::Response>;
49+
type Error = Error;
50+
type Future = future::Either<
51+
future::MapOk<
52+
future::ErrInto<tower::util::Oneshot<P, T>, Error>,
53+
fn(P::Response) -> tower::util::Either<P::Response, F::Response>,
54+
>,
55+
future::MapOk<
56+
future::ErrInto<tower::util::Oneshot<F, T>, Error>,
57+
fn(F::Response) -> tower::util::Either<P::Response, F::Response>,
58+
>,
59+
>;
60+
61+
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Error>> {
62+
Poll::Ready(Ok(()))
63+
}
64+
65+
fn call(&mut self, target: T) -> Self::Future {
66+
if self.switch.use_primary(&target) {
67+
future::Either::Left(
68+
self.primary
69+
.clone()
70+
.oneshot(target)
71+
.err_into::<Error>()
72+
.map_ok(tower::util::Either::A),
73+
)
74+
} else {
75+
future::Either::Right(
76+
self.fallback
77+
.clone()
78+
.oneshot(target)
79+
.err_into::<Error>()
80+
.map_ok(tower::util::Either::B),
81+
)
82+
}
83+
}
84+
}

0 commit comments

Comments
 (0)