Skip to content

Commit e6beab7

Browse files
authored
dns: Run DNS resolutions on the main runtime (#637)
DNS resolutions are run on the admin runtime. This requires an unnecessary layer of indirection around the resolver, including an MPSC. Now that we allow the main runtime to use more than one thread, it's preferable to do this discovery on the main runtime and we can simplify the implementation.
1 parent fc7ae2d commit e6beab7

File tree

4 files changed

+25
-89
lines changed

4 files changed

+25
-89
lines changed

linkerd/app/core/src/dns.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,15 @@ pub struct Config {
1111

1212
pub struct Dns {
1313
pub resolver: Resolver,
14-
pub task: Task,
1514
}
1615

1716
// === impl Config ===
1817

1918
impl Config {
2019
pub fn build(self) -> Dns {
21-
let (resolver, task) =
20+
let resolver =
2221
Resolver::from_system_config_with(&self).expect("system DNS config must be valid");
23-
Dns { resolver, task }
22+
Dns { resolver }
2423
}
2524
}
2625

linkerd/app/src/lib.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ pub struct Config {
5757
pub struct App {
5858
admin: admin::Admin,
5959
drain: drain::Signal,
60-
dns: dns::Task,
6160
dst: ControlAddr,
6261
identity: identity::Identity,
6362
inbound_addr: SocketAddr,
@@ -223,7 +222,6 @@ impl Config {
223222
admin,
224223
dst: dst_addr,
225224
drain: drain_tx,
226-
dns: dns.task,
227225
identity,
228226
inbound_addr,
229227
oc_collector,
@@ -283,7 +281,6 @@ impl App {
283281
let App {
284282
admin,
285283
drain,
286-
dns,
287284
identity,
288285
oc_collector,
289286
start_proxy,
@@ -340,9 +337,6 @@ impl App {
340337
admin.latch.release()
341338
}
342339

343-
// Spawn the DNS resolver background task.
344-
tokio::spawn(dns.instrument(info_span!("dns")));
345-
346340
if let tap::Tap::Enabled {
347341
registry, serve, ..
348342
} = tap

linkerd/dns/src/lib.rs

Lines changed: 18 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,19 @@ pub use linkerd2_dns_name::{InvalidName, Name, Suffix};
77
use std::future::Future;
88
use std::pin::Pin;
99
use std::{fmt, net};
10-
use tokio::sync::{mpsc, oneshot};
11-
use tracing::{info_span, trace, Span};
10+
use tracing::{info_span, trace};
1211
use tracing_futures::Instrument;
13-
pub use trust_dns_resolver::config::ResolverOpts;
14-
pub use trust_dns_resolver::error::{ResolveError, ResolveErrorKind};
15-
use trust_dns_resolver::lookup_ip::LookupIp;
16-
use trust_dns_resolver::{config::ResolverConfig, system_conf, AsyncResolver};
12+
use trust_dns_resolver::{
13+
config::ResolverConfig, lookup_ip::LookupIp, system_conf, AsyncResolver, TokioAsyncResolver,
14+
};
15+
pub use trust_dns_resolver::{
16+
config::ResolverOpts,
17+
error::{ResolveError, ResolveErrorKind},
18+
};
1719

1820
#[derive(Clone)]
1921
pub struct Resolver {
20-
tx: mpsc::UnboundedSender<ResolveRequest>,
22+
dns: TokioAsyncResolver,
2123
}
2224

2325
pub trait ConfigureResolver {
@@ -28,19 +30,10 @@ pub trait ConfigureResolver {
2830
pub enum Error {
2931
NoAddressesFound,
3032
ResolutionFailed(ResolveError),
31-
TaskLost,
3233
}
3334

34-
pub type Task = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
35-
3635
pub type IpAddrFuture = Pin<Box<dyn Future<Output = Result<net::IpAddr, Error>> + Send + 'static>>;
3736

38-
struct ResolveRequest {
39-
name: Name,
40-
result_tx: oneshot::Sender<Result<LookupIp, ResolveError>>,
41-
span: tracing::Span,
42-
}
43-
4437
impl Resolver {
4538
/// Construct a new `Resolver` from environment variables and system
4639
/// configuration.
@@ -51,61 +44,24 @@ impl Resolver {
5144
/// could not be parsed.
5245
///
5346
/// TODO: This should be infallible like it is in the `domain` crate.
54-
pub fn from_system_config_with<C: ConfigureResolver>(
55-
c: &C,
56-
) -> Result<(Self, Task), ResolveError> {
47+
pub fn from_system_config_with<C: ConfigureResolver>(c: &C) -> Result<Self, ResolveError> {
5748
let (config, mut opts) = system_conf::read_system_conf()?;
5849
c.configure_resolver(&mut opts);
5950
trace!("DNS config: {:?}", &config);
6051
trace!("DNS opts: {:?}", &opts);
61-
Self::new(config, opts)
52+
Ok(Self::new(config, opts))
6253
}
6354

64-
pub fn new(
65-
config: ResolverConfig,
66-
mut opts: ResolverOpts,
67-
) -> Result<(Self, Task), ResolveError> {
55+
pub fn new(config: ResolverConfig, mut opts: ResolverOpts) -> Self {
6856
// Disable Trust-DNS's caching.
6957
opts.cache_size = 0;
70-
71-
// XXX(eliza): figure out an appropriate bound for the channel...
72-
let (tx, mut rx) = mpsc::unbounded_channel();
73-
let task = Box::pin(async move {
74-
let resolver = match AsyncResolver::tokio(config, opts) {
75-
Ok(resolver) => resolver,
76-
Err(e) => unreachable!("constructing resolver should not fail: {}", e),
77-
};
78-
while let Some(ResolveRequest {
79-
name,
80-
result_tx,
81-
span,
82-
}) = rx.recv().await
83-
{
84-
let resolver = resolver.clone();
85-
tokio::spawn(
86-
async move {
87-
let res = resolver.lookup_ip(name.as_ref()).await;
88-
if result_tx.send(res).is_err() {
89-
tracing::debug!("resolution canceled");
90-
}
91-
}
92-
.instrument(span),
93-
);
94-
}
95-
tracing::debug!("all resolver handles dropped; terminating.");
96-
});
97-
Ok((Resolver { tx }, task))
58+
let dns = AsyncResolver::tokio(config, opts).expect("Resolver must be valid");
59+
Resolver { dns }
9860
}
9961

100-
async fn lookup_ip(&self, name: Name, span: Span) -> Result<LookupIp, Error> {
101-
let (result_tx, rx) = oneshot::channel();
102-
self.tx.send(ResolveRequest {
103-
name,
104-
result_tx,
105-
span,
106-
})?;
107-
let ips = rx.await??;
108-
Ok(ips)
62+
async fn lookup_ip(&self, name: Name) -> Result<LookupIp, Error> {
63+
let lookup = self.dns.lookup_ip(name.as_ref()).await?;
64+
Ok(lookup)
10965
}
11066

11167
pub fn resolve_one_ip(
@@ -116,7 +72,7 @@ impl Resolver {
11672
let resolver = self.clone();
11773
Box::pin(async move {
11874
let span = info_span!("resolve_one_ip", %name);
119-
let ips = resolver.lookup_ip(name, span).await?;
75+
let ips = resolver.lookup_ip(name).instrument(span).await?;
12076
ips.iter().next().ok_or_else(|| Error::NoAddressesFound)
12177
})
12278
}
@@ -137,18 +93,6 @@ impl fmt::Debug for Resolver {
13793
}
13894
}
13995

140-
impl<T> From<mpsc::error::SendError<T>> for Error {
141-
fn from(_: mpsc::error::SendError<T>) -> Self {
142-
Self::TaskLost
143-
}
144-
}
145-
146-
impl From<oneshot::error::RecvError> for Error {
147-
fn from(_: oneshot::error::RecvError) -> Self {
148-
Self::TaskLost
149-
}
150-
}
151-
15296
impl From<ResolveError> for Error {
15397
fn from(e: ResolveError) -> Self {
15498
Self::ResolutionFailed(e)
@@ -160,7 +104,6 @@ impl fmt::Display for Error {
160104
match self {
161105
Self::NoAddressesFound => f.pad("no addresses found"),
162106
Self::ResolutionFailed(e) => fmt::Display::fmt(e, f),
163-
Self::TaskLost => f.pad("background task terminated unexpectedly"),
164107
}
165108
}
166109
}

linkerd/dns/src/refine.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::net::IpAddr;
99
use std::pin::Pin;
1010
use std::task::{Context, Poll};
1111
use std::time::Instant;
12+
use tracing_futures::Instrument;
1213
use trust_dns_resolver::lookup_ip::LookupIp;
1314

1415
/// A `MakeService` that produces a `Refine` for a given name.
@@ -54,12 +55,11 @@ impl tower::Service<()> for Refine {
5455
loop {
5556
self.state = match self.state {
5657
State::Init => {
57-
let resolver = self.resolver.clone();
5858
let name = self.name.clone();
59-
let span = tracing::Span::current();
60-
State::Pending(Box::pin(
61-
async move { resolver.lookup_ip(name, span).await },
62-
))
59+
let dns = self.resolver.clone();
60+
State::Pending(Box::pin(async move {
61+
dns.lookup_ip(name).in_current_span().await
62+
}))
6363
}
6464
State::Pending(ref mut fut) => {
6565
let lookup = ready!(fut.as_mut().poll(cx))?;

0 commit comments

Comments
 (0)