Skip to content

Commit a7373cf

Browse files
authored
app: Block proxy initialization on identity readiness (#1202)
Currently, the proxy may start serving connections even before identity is provisioned. This won't be feasible once we introduce policy discovery--the proxy will have to establish its inbound policy before serving inbound connections. This change modifies proxy initialization to block proxy initialization on identity. When identity isn't established, warnings are logged every 15s. While this is a behavior change from the proxy's point of view, this shouldn't be a user-facing change, as container initialization is blocked on the proxy's readiness, which is dependent on identity initialization.
1 parent a066fa2 commit a7373cf

File tree

5 files changed

+43
-7
lines changed

5 files changed

+43
-7
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1168,6 +1168,7 @@ dependencies = [
11681168
"linkerd-tls",
11691169
"linkerd2-proxy-api",
11701170
"pin-project",
1171+
"thiserror",
11711172
"tokio",
11721173
"tonic",
11731174
"tracing",

linkerd/app/src/identity.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ use linkerd_app_core::{
88
metrics::ControlHttp as Metrics,
99
Error,
1010
};
11-
use std::future::Future;
12-
use std::pin::Pin;
11+
use std::{future::Future, pin::Pin};
1312
use tracing::Instrument;
1413

1514
// The Disabled case is extraordinarily rare.
@@ -39,6 +38,8 @@ struct Recover(ExponentialBackoff);
3938

4039
pub type Task = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
4140

41+
// === impl Config ===
42+
4243
impl Config {
4344
pub fn build(self, dns: dns::Resolver, metrics: Metrics) -> Result<Identity, Error> {
4445
match self {
@@ -55,7 +56,7 @@ impl Config {
5556
Box::pin(
5657
daemon
5758
.run(svc)
58-
.instrument(tracing::debug_span!("identity_daemon", peer.addr = %addr)),
59+
.instrument(tracing::debug_span!("identity", server.addr = %addr)),
5960
)
6061
};
6162

@@ -65,6 +66,8 @@ impl Config {
6566
}
6667
}
6768

69+
// === impl Identity ===
70+
6871
impl Identity {
6972
pub fn local(&self) -> Option<LocalCrtKey> {
7073
match self {
@@ -88,6 +91,8 @@ impl Identity {
8891
}
8992
}
9093

94+
// === impl Recover ===
95+
9196
impl<E: Into<Error>> linkerd_error::Recover<E> for Recover {
9297
type Backoff = ExponentialBackoffStream;
9398

linkerd/app/src/lib.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,12 +203,15 @@ impl Config {
203203

204204
// Build a task that initializes and runs the proxy stacks.
205205
let start_proxy = {
206+
let identity = identity.local();
206207
let inbound_addr = inbound_addr;
207208
let profiles = dst.profiles;
208209
let resolve = dst.resolve;
209210

210211
Box::pin(async move {
211-
// TODO(ver): Block on identity.
212+
Self::await_identity(identity)
213+
.await
214+
.expect("failed to initialize identity");
212215

213216
tokio::spawn(
214217
outbound
@@ -238,6 +241,30 @@ impl Config {
238241
tap,
239242
})
240243
}
244+
245+
/// Waits for the proxy's identity to be certified.
246+
///
247+
/// If this does not complete in a timely fashion, warnings are logged every 15s
248+
async fn await_identity(id: Option<identity::LocalCrtKey>) -> Result<(), Error> {
249+
let id = match id {
250+
Some(id) => id,
251+
None => return Ok(()),
252+
};
253+
254+
tokio::pin! {
255+
let fut = id.await_crt();
256+
}
257+
258+
const TIMEOUT: time::Duration = time::Duration::from_secs(15);
259+
loop {
260+
tokio::select! {
261+
res = (&mut fut) => return res.map(|_| ()).map_err(Into::into),
262+
_ = time::sleep(TIMEOUT) => {
263+
tracing::warn!("Waiting for identity to be initialized...");
264+
}
265+
}
266+
}
267+
}
241268
}
242269

243270
impl App {

linkerd/proxy/identity/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ linkerd-identity = { path = "../../identity" }
1717
linkerd-metrics = { path = "../../metrics" }
1818
linkerd-stack = { path = "../../stack" }
1919
linkerd-tls = { path = "../../tls" }
20+
thiserror = "1"
2021
tokio = { version = "1", features = ["time", "sync"] }
2122
tonic = { version = "0.5", default-features = false }
2223
tracing = "0.1.26"

linkerd/proxy/identity/src/certify.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use pin_project::pin_project;
99
use std::convert::TryFrom;
1010
use std::sync::Arc;
1111
use std::time::{Duration, SystemTime, UNIX_EPOCH};
12+
use thiserror::Error;
1213
use tokio::sync::watch;
1314
use tokio::time::{self, Sleep};
1415
use tonic::{self as grpc, body::BoxBody, client::GrpcService};
@@ -42,8 +43,9 @@ pub struct LocalCrtKey {
4243
#[derive(Debug)]
4344
pub struct AwaitCrt(Option<LocalCrtKey>);
4445

45-
#[derive(Copy, Clone, Debug)]
46-
pub struct LostDaemon;
46+
#[derive(Copy, Clone, Debug, Error)]
47+
#[error("identity initialization failed")]
48+
pub struct LostDaemon(());
4749

4850
pub type CrtKeySender = watch::Sender<Option<id::CrtKey>>;
4951

@@ -187,7 +189,7 @@ impl LocalCrtKey {
187189
while self.crt_key.borrow().is_none() {
188190
// If the sender is dropped, the daemon task has ended.
189191
if self.crt_key.changed().await.is_err() {
190-
return Err(LostDaemon);
192+
return Err(LostDaemon(()));
191193
}
192194
}
193195
Ok(self)

0 commit comments

Comments
 (0)