Skip to content

Commit 86e69d8

Browse files
committed
node: perform DNS lookup with timeout
Passed the hostname_resolution_timeout down to the functions responsible for DNS resolution logic. Created a pub(crate) error type to distinguish between errors that can occur during hostname resolution. Notice: it's pub(crate) since the users of this API only emit logs in case of error. The errors are not passed to public API. Created a `lookup_host_with_timeout` function, and extracted some logic here. The purpose is not to introduce complex branching in original function.
1 parent ee813e4 commit 86e69d8

File tree

3 files changed

+50
-15
lines changed

3 files changed

+50
-15
lines changed

scylla/src/transport/connection_pool.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -867,6 +867,7 @@ impl PoolRefiller {
867867
mut endpoint: UntranslatedEndpoint,
868868
) -> impl Future<Output = UntranslatedEndpoint> {
869869
let cloud_config = self.pool_config.connection_config.cloud_config.clone();
870+
let hostname_resolution_timeout = self.pool_config.hostname_resolution_timeout;
870871
async move {
871872
if let Some(cloud_config) = cloud_config {
872873
// If we operate in the serverless Cloud, then we substitute every node's address
@@ -881,7 +882,9 @@ impl PoolRefiller {
881882
if let Some(dc) = datacenter.as_deref() {
882883
if let Some(dc_config) = cloud_config.get_datacenters().get(dc) {
883884
let hostname = dc_config.get_server();
884-
if let Ok(resolved) = resolve_hostname(hostname).await {
885+
if let Ok(resolved) =
886+
resolve_hostname(hostname, hostname_resolution_timeout).await
887+
{
885888
*address = NodeAddr::Untranslatable(resolved)
886889
} else {
887890
warn!(

scylla/src/transport/node.rs

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use itertools::Itertools;
2-
use tokio::net::lookup_host;
2+
use thiserror::Error;
3+
use tokio::net::{lookup_host, ToSocketAddrs};
34
use tracing::warn;
45
use uuid::Uuid;
56

@@ -13,6 +14,7 @@ use crate::transport::errors::{ConnectionPoolError, QueryError};
1314
use std::fmt::Display;
1415
use std::io;
1516
use std::net::IpAddr;
17+
use std::time::Duration;
1618
use std::{
1719
hash::{Hash, Hasher},
1820
net::SocketAddr,
@@ -267,27 +269,53 @@ pub(crate) struct ResolvedContactPoint {
267269
pub(crate) datacenter: Option<String>,
268270
}
269271

272+
#[derive(Error, Debug)]
273+
pub(crate) enum DnsLookupError {
274+
#[error("Failed to perform DNS lookup within {0}ms")]
275+
Timeout(u128),
276+
#[error("Empty address list returned by DNS for {0}")]
277+
EmptyAddressListForHost(String),
278+
#[error(transparent)]
279+
IoError(#[from] io::Error),
280+
}
281+
282+
/// Performs a DNS lookup with provided optional timeout.
283+
async fn lookup_host_with_timeout<T: ToSocketAddrs>(
284+
host: T,
285+
hostname_resolution_timeout: Option<Duration>,
286+
) -> Result<impl Iterator<Item = SocketAddr>, DnsLookupError> {
287+
if let Some(timeout) = hostname_resolution_timeout {
288+
match tokio::time::timeout(timeout, lookup_host(host)).await {
289+
Ok(res) => res.map_err(Into::into),
290+
// Elapsed error from tokio library does not provide any context.
291+
Err(_) => Err(DnsLookupError::Timeout(timeout.as_millis())),
292+
}
293+
} else {
294+
lookup_host(host).await.map_err(Into::into)
295+
}
296+
}
297+
270298
// Resolve the given hostname using a DNS lookup if necessary.
271299
// The resolution may return multiple IPs and the function returns one of them.
272300
// It prefers to return IPv4s first, and only if there are none, IPv6s.
273-
pub(crate) async fn resolve_hostname(hostname: &str) -> Result<SocketAddr, io::Error> {
274-
let addrs = match lookup_host(hostname).await {
301+
pub(crate) async fn resolve_hostname(
302+
hostname: &str,
303+
hostname_resolution_timeout: Option<Duration>,
304+
) -> Result<SocketAddr, DnsLookupError> {
305+
let addrs = match lookup_host_with_timeout(hostname, hostname_resolution_timeout).await {
275306
Ok(addrs) => itertools::Either::Left(addrs),
276307
// Use a default port in case of error, but propagate the original error on failure
277308
Err(e) => {
278-
let addrs = lookup_host((hostname, 9042)).await.or(Err(e))?;
309+
let addrs = lookup_host_with_timeout((hostname, 9042), hostname_resolution_timeout)
310+
.await
311+
.or(Err(e))?;
279312
itertools::Either::Right(addrs)
280313
}
281314
};
282315

283316
addrs
284317
.find_or_last(|addr| matches!(addr, SocketAddr::V4(_)))
285-
.ok_or_else(|| {
286-
io::Error::new(
287-
io::ErrorKind::Other,
288-
format!("Empty address list returned by DNS for {}", hostname),
289-
)
290-
})
318+
.ok_or_else(|| DnsLookupError::EmptyAddressListForHost(hostname.to_owned()))
291319
}
292320

293321
/// Transforms the given [`InternalKnownNode`]s into [`ContactPoint`]s.
@@ -296,6 +324,7 @@ pub(crate) async fn resolve_hostname(hostname: &str) -> Result<SocketAddr, io::E
296324
/// In case of a plain IP address, parses it and uses straight.
297325
pub(crate) async fn resolve_contact_points(
298326
known_nodes: &[InternalKnownNode],
327+
hostname_resolution_timeout: Option<Duration>,
299328
) -> (Vec<ResolvedContactPoint>, Vec<String>) {
300329
// Find IP addresses of all known nodes passed in the config
301330
let mut initial_peers: Vec<ResolvedContactPoint> = Vec::with_capacity(known_nodes.len());
@@ -323,7 +352,7 @@ pub(crate) async fn resolve_contact_points(
323352
let resolve_futures = to_resolve
324353
.into_iter()
325354
.map(|(hostname, datacenter)| async move {
326-
match resolve_hostname(hostname).await {
355+
match resolve_hostname(hostname, hostname_resolution_timeout).await {
327356
Ok(address) => Some(ResolvedContactPoint {
328357
address,
329358
datacenter,

scylla/src/transport/topology.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ impl MetadataReader {
481481
host_filter: &Option<Arc<dyn HostFilter>>,
482482
) -> Result<Self, NewSessionError> {
483483
let (initial_peers, resolved_hostnames) =
484-
resolve_contact_points(&initial_known_nodes).await;
484+
resolve_contact_points(&initial_known_nodes, hostname_resolution_timeout).await;
485485
// Ensure there is at least one resolved node
486486
if initial_peers.is_empty() {
487487
return Err(NewSessionError::FailedToResolveAnyHostname(
@@ -574,8 +574,11 @@ impl MetadataReader {
574574
// If no known peer is reachable, try falling back to initial contact points, in hope that
575575
// there are some hostnames there which will resolve to reachable new addresses.
576576
warn!("Failed to establish control connection and fetch metadata on all known peers. Falling back to initial contact points.");
577-
let (initial_peers, _hostnames) =
578-
resolve_contact_points(&self.initial_known_nodes).await;
577+
let (initial_peers, _hostnames) = resolve_contact_points(
578+
&self.initial_known_nodes,
579+
self.hostname_resolution_timeout,
580+
)
581+
.await;
579582
result = self
580583
.retry_fetch_metadata_on_nodes(
581584
initial,

0 commit comments

Comments
 (0)