Skip to content

Commit 67088e5

Browse files
authored
Simplify tap server (#582)
## Description Remove the Tap daemon that is responsible for handling service registrations and tap subscriptions. There is now a registry that contains all active taps. The tap service has access to this registry and it uses it to retrieve an updated set of active taps for every request. This set is used to determine whether the request should be tapped. New taps are added when the gRPC server receives an observe request. If the request is valid, the server adds the tap to the registry. Inactive taps are cleared by a background task. This background task is much simpler than the one before; it loops on an interval and checks whether each tap still has an actively used response stream, and if it has not reached its limit. ### Changes The primary change is the addition of the `Registry` in `linkerd/proxy/tap/src/registry.rs`. The tap service holds on to a `Registry` and uses the `taps_recv` field to get an updated list of taps for each request. The tap service is not responsible for adding or removing any taps from the registry. The gRPC service also holds on to a `Registry` and uses the `inner` field to register new taps with the registry. The field is locked giving access to an `Inner`. With this, the gRPC server can add new taps to the registry and then send updates with the `taps_send` sender. Neither the tap service or the gRPC is responsible for clearing out inactive taps. To solve this, we spawn a background task with the `async fn clean` method which loops on an interval and checks that each tap still has an actively used response stream. ### Traits The `tap::iface` module now only contains the `Tap`, `TapPayload`, and `TapResponse` traits. These are helpful for traits that are implemented in `tap::grpc`. The `Subscribe` and `Register` traits have been removed. Services no longer *register* with the tap server. Taps still do subscribe (now called `register`) with the registry, but it was only implemented by one object and did not simplify the implementation in terms of understanding or save on duplicated code. Signed-off-by: Kevin Leimkuhler <[email protected]>
1 parent 022ecfc commit 67088e5

File tree

9 files changed

+149
-363
lines changed

9 files changed

+149
-363
lines changed

linkerd/app/src/env.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
547547
let tap = tap?
548548
.map(|(addr, ids)| super::tap::Config::Enabled {
549549
permitted_peer_identities: ids,
550-
server: ServerConfig {
550+
config: ServerConfig {
551551
bind: listen::Bind::new(addr, inbound.proxy.server.bind.keepalive()),
552552
h2_settings,
553553
},

linkerd/app/src/lib.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
//! Configures and executes the proxy
2-
32
#![recursion_limit = "256"]
43
//#![deny(warnings, rust_2018_idioms)]
54

@@ -25,6 +24,7 @@ use linkerd2_app_inbound as inbound;
2524
use linkerd2_app_outbound as outbound;
2625
use std::net::SocketAddr;
2726
use std::pin::Pin;
27+
use tokio::time::Duration;
2828
use tracing::{debug, error, info, info_span};
2929
use tracing_futures::Instrument;
3030

@@ -343,8 +343,15 @@ impl App {
343343
// Spawn the DNS resolver background task.
344344
tokio::spawn(dns.instrument(info_span!("dns")));
345345

346-
if let tap::Tap::Enabled { daemon, serve, .. } = tap {
347-
tokio::spawn(daemon.instrument(info_span!("tap")));
346+
if let tap::Tap::Enabled {
347+
registry, serve, ..
348+
} = tap
349+
{
350+
tokio::spawn(
351+
registry
352+
.clean(tokio::time::interval(Duration::from_secs(60)))
353+
.instrument(info_span!("tap_clean")),
354+
);
348355
tokio::spawn(
349356
serve
350357
.map_err(|error| error!(%error, "server died"))

linkerd/app/src/tap.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::pin::Pin;
1414
pub enum Config {
1515
Disabled,
1616
Enabled {
17-
server: ServerConfig,
17+
config: ServerConfig,
1818
permitted_peer_identities: IndexSet<identity::Name>,
1919
},
2020
}
@@ -26,7 +26,7 @@ pub enum Tap {
2626
Enabled {
2727
listen_addr: SocketAddr,
2828
layer: tap::Layer,
29-
daemon: tap::Daemon,
29+
registry: tap::Registry,
3030
serve: Pin<Box<dyn std::future::Future<Output = Result<(), Error>> + Send + 'static>>,
3131
},
3232
}
@@ -37,31 +37,30 @@ impl Config {
3737
identity: tls::Conditional<identity::Local>,
3838
drain: drain::Watch,
3939
) -> Result<Tap, Error> {
40-
let (layer, grpc, daemon) = tap::new();
40+
let (registry, layer, server) = tap::new();
4141
match self {
4242
Config::Disabled => {
43-
drop((grpc, daemon));
43+
drop((registry, server));
4444
Ok(Tap::Disabled { layer })
4545
}
46-
4746
Config::Enabled {
48-
server,
47+
config,
4948
permitted_peer_identities,
5049
} => {
51-
let (listen_addr, listen) = server.bind.bind()?;
50+
let (listen_addr, listen) = config.bind.bind()?;
5251

5352
let accept = tls::AcceptTls::new(
5453
identity,
55-
tap::AcceptPermittedClients::new(permitted_peer_identities.into(), grpc),
54+
tap::AcceptPermittedClients::new(permitted_peer_identities.into(), server),
5655
);
5756

5857
let serve = Box::pin(serve::serve(listen, accept, drain.signal()));
5958

6059
Ok(Tap::Enabled {
60+
listen_addr,
6161
layer,
62-
daemon,
62+
registry,
6363
serve,
64-
listen_addr,
6564
})
6665
}
6766
}

linkerd/proxy/tap/src/accept.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::grpc::Server;
12
use futures::future;
23
use indexmap::IndexSet;
34
use linkerd2_error::Error;
@@ -17,13 +18,13 @@ use tower::Service;
1718
#[derive(Clone, Debug)]
1819
pub struct AcceptPermittedClients {
1920
permitted_client_ids: Arc<IndexSet<identity::Name>>,
20-
server: super::Server,
21+
server: Server,
2122
}
2223

2324
pub type ServeFuture = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>;
2425

2526
impl AcceptPermittedClients {
26-
pub fn new(permitted_client_ids: Arc<IndexSet<identity::Name>>, server: super::Server) -> Self {
27+
pub fn new(permitted_client_ids: Arc<IndexSet<identity::Name>>, server: Server) -> Self {
2728
Self {
2829
permitted_client_ids,
2930
server,

linkerd/proxy/tap/src/daemon.rs

Lines changed: 0 additions & 217 deletions
This file was deleted.

linkerd/proxy/tap/src/grpc/server.rs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::match_::Match;
2-
use crate::{iface, Inspect};
2+
use crate::{iface, Inspect, Registry};
33
use bytes::Buf;
44
use futures::ready;
55
use hyper::body::HttpBody;
@@ -20,9 +20,9 @@ use tonic::{self as grpc, Response};
2020
use tracing::{debug, trace, warn};
2121

2222
#[derive(Clone, Debug)]
23-
pub struct Server<T> {
24-
subscribe: T,
23+
pub struct Server {
2524
base_id: Arc<AtomicUsize>,
25+
registry: Registry,
2626
}
2727

2828
#[pin_project]
@@ -94,10 +94,10 @@ enum ExtractKind {
9494

9595
// === impl Server ===
9696

97-
impl<T: iface::Subscribe<Tap>> Server<T> {
98-
pub(in crate) fn new(subscribe: T) -> Self {
97+
impl Server {
98+
pub(in crate) fn new(registry: Registry) -> Self {
9999
let base_id = Arc::new(0.into());
100-
Self { base_id, subscribe }
100+
Self { base_id, registry }
101101
}
102102

103103
fn invalid_arg(message: String) -> grpc::Status {
@@ -106,11 +106,7 @@ impl<T: iface::Subscribe<Tap>> Server<T> {
106106
}
107107

108108
#[tonic::async_trait]
109-
impl<T> api::tap_server::Tap for Server<T>
110-
where
111-
T: iface::Subscribe<Tap> + Clone + Send + Sync + 'static,
112-
T::Future: Send,
113-
{
109+
impl api::tap_server::Tap for Server {
114110
type ObserveStream = ResponseStream;
115111

116112
#[tracing::instrument(skip(self), level = "debug")]
@@ -178,10 +174,8 @@ where
178174
shared: Arc::downgrade(&shared),
179175
};
180176

181-
// Ensure that tap registers successfully.
182-
self.subscribe.subscribe(tap).await.map_err(|_| {
183-
grpc::Status::new(grpc::Code::ResourceExhausted, "Too many active taps")
184-
})?;
177+
// Register the tap with the server's tap registry
178+
self.registry.register(tap);
185179

186180
let rsp = ResponseStream {
187181
shared: Some(shared),

0 commit comments

Comments
 (0)