Skip to content

Commit 04992e9

Browse files
authored
Add trace-level logging around discovery updates (#1287)
It's helpful to have more insight into what's going on during endpoint resolution. This change adds trace-level logging messages to various resolver modules.
1 parent 67de0c1 commit 04992e9

File tree

4 files changed

+48
-29
lines changed

4 files changed

+48
-29
lines changed

linkerd/proxy/discover/src/from_resolve.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::{
99
task::{Context, Poll},
1010
};
1111
use tower::discover::Change;
12+
use tracing::trace;
1213

1314
#[derive(Clone, Debug)]
1415
pub struct FromResolve<R, E> {
@@ -97,29 +98,36 @@ impl<R: TryStream, E> Discover<R, E> {
9798
impl<R, E> Stream for Discover<R, E>
9899
where
99100
R: TryStream<Ok = Update<E>>,
100-
E: Clone,
101+
E: Clone + std::fmt::Debug,
101102
{
102103
type Item = Result<Change<SocketAddr, E>, R::Error>;
103104

104105
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
105106
loop {
106107
let this = self.as_mut().project();
107108
if let Some(change) = this.pending.pop_front() {
109+
trace!(?change, "Changed");
108110
return Poll::Ready(Some(Ok(change)));
109111
}
110112

113+
trace!("poll");
111114
match ready!(this.resolution.try_poll_next(cx)) {
112115
Some(update) => match update? {
113116
Update::Reset(endpoints) => {
114117
let active = endpoints.iter().map(|(a, _)| *a).collect::<HashSet<_>>();
118+
trace!(new = ?active, old = ?this.active, "Reset");
115119
for addr in this.active.iter() {
116120
// If the old addr is not in the new set, remove it.
117121
if !active.contains(addr) {
122+
trace!(%addr, "Scheduling removal");
118123
this.pending.push_back(Change::Remove(*addr));
124+
} else {
125+
trace!(%addr, "Unchanged");
119126
}
120127
}
121128
for (addr, endpoint) in endpoints.into_iter() {
122129
if !this.active.contains(&addr) {
130+
trace!(%addr, "Scheduling addition");
123131
this.pending
124132
.push_back(Change::Insert(addr, endpoint.clone()));
125133
}
@@ -128,18 +136,21 @@ where
128136
}
129137
Update::Add(endpoints) => {
130138
for (addr, endpoint) in endpoints.into_iter() {
139+
trace!(%addr, "Scheduling addition");
131140
this.active.insert(addr);
132141
this.pending.push_back(Change::Insert(addr, endpoint));
133142
}
134143
}
135144
Update::Remove(addrs) => {
136145
for addr in addrs.into_iter() {
137146
if this.active.remove(&addr) {
147+
trace!(%addr, "Scheduling removal");
138148
this.pending.push_back(Change::Remove(addr));
139149
}
140150
}
141151
}
142152
Update::DoesNotExist => {
153+
trace!("Scheduling removals");
143154
this.pending.extend(this.active.drain().map(Change::Remove));
144155
}
145156
},

linkerd/proxy/dns-resolve/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ async fn resolution(dns: dns::Resolver, na: NameAddr) -> Result<UpdateStream, Er
7171
//
7272
// Note: this can't be an async_stream, due to pinniness.
7373
let (addrs, expiry) = dns.resolve_addrs(na.name(), na.port()).await?;
74+
debug!(?addrs, name = %na);
7475
let (tx, rx) = mpsc::channel(1);
7576
tokio::spawn(
7677
async move {
@@ -84,7 +85,7 @@ async fn resolution(dns: dns::Resolver, na: NameAddr) -> Result<UpdateStream, Er
8485
loop {
8586
match dns.resolve_addrs(na.name(), na.port()).await {
8687
Ok((addrs, expiry)) => {
87-
debug!(?addrs);
88+
debug!(?addrs, name = %na);
8889
let eps = addrs.into_iter().map(|a| (a, ())).collect();
8990
if tx.send(Ok(Update::Reset(eps))).await.is_err() {
9091
trace!("Closed");

linkerd/proxy/resolve/src/map_endpoint.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ where
107107
R: TryStream<Ok = resolve::Update<E>>,
108108
R::Error: Into<Error>,
109109
M: MapEndpoint<T, E>,
110+
M::Out: std::fmt::Debug,
110111
{
111112
type Item = Result<resolve::Update<M::Out>, R::Error>;
112113

@@ -135,6 +136,7 @@ where
135136
},
136137
None => return Poll::Ready(None),
137138
};
139+
tracing::trace!(?update);
138140
Poll::Ready(Some(Ok(update)))
139141
}
140142
}

linkerd/proxy/resolve/src/recover.rs

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ where
140140
R: resolve::Resolve<T>,
141141
R::Future: Unpin,
142142
R::Resolution: Unpin,
143+
R::Endpoint: std::fmt::Debug,
143144
E: Recover,
144145
E::Backoff: Unpin,
145146
{
@@ -155,37 +156,41 @@ where
155156
State::Connected {
156157
ref mut resolution,
157158
ref mut is_initial,
158-
} => match ready!(resolution.try_poll_next_unpin(cx)) {
159-
Some(Ok(Update::Remove(_))) if *is_initial => {
160-
debug_assert!(false, "Remove must not be initial update");
161-
tracing::debug!("Ignoring Remove after connection");
162-
// Continue polling until a useful update is received.
163-
}
164-
Some(Ok(update)) => {
165-
let update = if *is_initial {
166-
*is_initial = false;
167-
match update {
168-
Update::Add(eps) => Update::Reset(eps),
169-
up => up,
159+
} => {
160+
tracing::trace!("polling");
161+
match ready!(resolution.try_poll_next_unpin(cx)) {
162+
Some(Ok(Update::Remove(_))) if *is_initial => {
163+
debug_assert!(false, "Remove must not be initial update");
164+
tracing::debug!("Ignoring Remove after connection");
165+
// Continue polling until a useful update is received.
166+
}
167+
Some(Ok(update)) => {
168+
let update = if *is_initial {
169+
*is_initial = false;
170+
match update {
171+
Update::Add(eps) => Update::Reset(eps),
172+
up => up,
173+
}
174+
} else {
175+
update
176+
};
177+
tracing::trace!(?update);
178+
return Poll::Ready(Some(Ok(update)));
179+
}
180+
Some(Err(e)) => {
181+
this.inner.state = State::Recover {
182+
error: Some(e.into()),
183+
backoff: None,
170184
}
171-
} else {
172-
update
173-
};
174-
return Poll::Ready(Some(Ok(update)));
175-
}
176-
Some(Err(e)) => {
177-
this.inner.state = State::Recover {
178-
error: Some(e.into()),
179-
backoff: None,
180185
}
181-
}
182-
None => {
183-
this.inner.state = State::Recover {
184-
error: Some(Eos(()).into()),
185-
backoff: None,
186+
None => {
187+
this.inner.state = State::Recover {
188+
error: Some(Eos(()).into()),
189+
backoff: None,
190+
}
186191
}
187192
}
188-
},
193+
}
189194
_ => {}
190195
}
191196

0 commit comments

Comments
 (0)