Skip to content

Commit 2a87f89

Browse files
authored
Introduce the tonic-watch crate (#1034)
The service-profiles client includes some complicated manual-polling machinery for publish a `tokio::sync::watch::Receiver` from a streaming gRPC endpoint. This change moves this logic into a `linkerd-tonic-watch` crate that is generic over the watch type and the underlying gRPC lookup functionality. Furthermore, this is code is ported to use async/await syntax to dramatically simplify state management. This is being done in anticipation of upcoming inbound-discovery infrastructure, where a similar watcher pattern will be used.
1 parent 2057ba9 commit 2a87f89

File tree

14 files changed

+740
-485
lines changed

14 files changed

+740
-485
lines changed

Cargo.lock

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1314,6 +1314,7 @@ dependencies = [
13141314
"linkerd-error",
13151315
"linkerd-proxy-api-resolve",
13161316
"linkerd-stack",
1317+
"linkerd-tonic-watch",
13171318
"linkerd2-proxy-api",
13181319
"pin-project",
13191320
"prost-types",
@@ -1423,6 +1424,22 @@ dependencies = [
14231424
"webpki",
14241425
]
14251426

1427+
[[package]]
1428+
name = "linkerd-tonic-watch"
1429+
version = "0.1.0"
1430+
dependencies = [
1431+
"futures",
1432+
"linkerd-error",
1433+
"linkerd-stack",
1434+
"linkerd-tracing",
1435+
"tokio",
1436+
"tokio-stream",
1437+
"tokio-test",
1438+
"tonic",
1439+
"tower-test",
1440+
"tracing",
1441+
]
1442+
14261443
[[package]]
14271444
name = "linkerd-trace-context"
14281445
version = "0.1.0"

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ members = [
5353
"linkerd/stack/tracing",
5454
"linkerd/system",
5555
"linkerd/timeout",
56+
"linkerd/tonic-watch",
5657
"linkerd/tls",
5758
"linkerd/tracing",
5859
"linkerd2-proxy",

linkerd/app/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ linkerd-app-core = { path = "./core" }
2323
linkerd-app-gateway = { path = "./gateway" }
2424
linkerd-app-inbound = { path = "./inbound" }
2525
linkerd-app-outbound = { path = "./outbound" }
26-
linkerd-opencensus = { path = "../opencensus" }
2726
linkerd-error = { path = "../error" }
27+
linkerd-opencensus = { path = "../opencensus" }
2828
regex = "1.0.0"
2929
thiserror = "1.0"
3030
tokio = { version = "1", features = ["rt"] }

linkerd/app/core/src/svc.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use linkerd_error::Recover;
77
use linkerd_exp_backoff::{ExponentialBackoff, ExponentialBackoffStream};
88
pub use linkerd_reconnect::NewReconnect;
99
pub use linkerd_stack::{
10-
self as stack, layer, BoxNewService, BoxService, BoxServiceLayer, Fail, Filter, MapTargetLayer,
11-
NewRouter, NewService, Param, Predicate, UnwrapOr,
10+
self as stack, layer, BoxNewService, BoxService, BoxServiceLayer, Either, Fail, Filter,
11+
MapErrLayer, MapTargetLayer, NewRouter, NewService, Param, Predicate, UnwrapOr,
1212
};
1313
pub use linkerd_stack_tracing::{NewInstrument, NewInstrumentLayer};
1414
pub use linkerd_timeout::{self as timeout, FailFast};
@@ -21,13 +21,7 @@ use tower::{
2121
layer::util::{Identity, Stack as Pair},
2222
make::MakeService,
2323
};
24-
pub use tower::{
25-
layer::Layer,
26-
service_fn as mk,
27-
spawn_ready::SpawnReady,
28-
util::{Either, MapErrLayer},
29-
Service, ServiceExt,
30-
};
24+
pub use tower::{layer::Layer, service_fn as mk, spawn_ready::SpawnReady, Service, ServiceExt};
3125

3226
#[derive(Copy, Clone, Debug)]
3327
pub struct AlwaysReconnect(ExponentialBackoff);

linkerd/app/inbound/fuzz/Cargo.lock

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1143,6 +1143,7 @@ dependencies = [
11431143
"linkerd-error",
11441144
"linkerd-proxy-api-resolve",
11451145
"linkerd-stack",
1146+
"linkerd-tonic-watch",
11461147
"linkerd2-proxy-api",
11471148
"pin-project",
11481149
"rand",
@@ -1235,6 +1236,18 @@ dependencies = [
12351236
"webpki",
12361237
]
12371238

1239+
[[package]]
1240+
name = "linkerd-tonic-watch"
1241+
version = "0.1.0"
1242+
dependencies = [
1243+
"futures",
1244+
"linkerd-error",
1245+
"linkerd-stack",
1246+
"tokio",
1247+
"tonic",
1248+
"tracing",
1249+
]
1250+
12381251
[[package]]
12391252
name = "linkerd-trace-context"
12401253
version = "0.1.0"

linkerd/app/src/dst.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ pub struct Dst {
2121
pub addr: control::ControlAddr,
2222

2323
/// Resolves profiles.
24-
pub profiles: profiles::Client<control::Client<BoxBody>, BackoffUnlessInvalidArgument>,
24+
pub profiles: profiles::Client<BackoffUnlessInvalidArgument, control::Client<BoxBody>>,
2525

2626
/// Resolves endpoints.
2727
pub resolve:
@@ -46,7 +46,7 @@ impl Config {
4646

4747
Ok(Dst {
4848
addr,
49-
profiles: profiles::Client::new(svc.clone(), backoff, self.context.clone()),
49+
profiles: profiles::Client::new(backoff, svc.clone(), self.context.clone()),
5050
resolve: recover::Resolve::new(backoff, api::Resolve::new(svc, self.context)),
5151
})
5252
}
@@ -66,3 +66,20 @@ impl Recover<Error> for BackoffUnlessInvalidArgument {
6666
Ok(self.0.stream())
6767
}
6868
}
69+
70+
impl Recover<tonic::Status> for BackoffUnlessInvalidArgument {
71+
type Backoff = ExponentialBackoffStream;
72+
73+
fn recover(&self, status: tonic::Status) -> Result<Self::Backoff, tonic::Status> {
74+
// Address is not resolvable
75+
if status.code() == tonic::Code::InvalidArgument
76+
// Unexpected cluster state
77+
|| status.code() == tonic::Code::FailedPrecondition
78+
{
79+
return Err(status);
80+
}
81+
82+
tracing::trace!(%status, "Recovering");
83+
Ok(self.0.stream())
84+
}
85+
}

linkerd/error/src/recover.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ pub trait Recover<E: Into<Error> = Error> {
99
///
1010
/// If it is, a backoff stream is returned. When the backoff becomes ready,
1111
/// it signals that the caller should retry its operation. If the backoff is
12-
/// polled agian, it is assumed that the operation failed and a new (possibly
13-
/// longer) backoff is initated.
12+
/// polled again, it is assumed that the operation failed and a new (possibly
13+
/// longer) backoff is initiated.
1414
///
1515
/// If the error is not recoverable, it is returned immediately.
1616
fn recover(&self, err: E) -> Result<Self::Backoff, E>;

linkerd/service-profiles/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ linkerd-error = { path = "../error" }
2121
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", branch = "main", features = ["destination", "client"] }
2222
linkerd-proxy-api-resolve = { path = "../proxy/api-resolve" }
2323
linkerd-stack = { path = "../stack" }
24+
linkerd-tonic-watch = { path = "../tonic-watch" }
2425
rand = { version = "0.8", features = ["small_rng"] }
2526
regex = "1.0.0"
2627
tokio = { version = "1", features = ["macros", "rt", "sync", "time"] }

0 commit comments

Comments
 (0)