Skip to content

Commit 14b6851

Browse files
authored
fix: Management fallback to ipv4 sockets if ipv6 is unavailable (#25)
Also fix broadcasting of UDP messages which before would abort on a send failure instead of just dumping the message to all known addresses. Now only debug logs and only errors if all sends failed. Improve logging.
1 parent 5807deb commit 14b6851

File tree

4 files changed

+69
-25
lines changed

4 files changed

+69
-25
lines changed

mgmtd/src/bee_msg.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,8 @@ pub async fn notify_nodes<M: Msg + Serializable>(
196196
) {
197197
log::trace!("NOTIFICATION to {node_types:?}: {msg:?}");
198198

199-
if let Err(err) = async {
200-
for t in node_types {
199+
for t in node_types {
200+
if let Err(err) = async {
201201
let nodes = ctx
202202
.db
203203
.read_tx(move |tx| db::node::get_with_type(tx, *t))
@@ -206,12 +206,12 @@ pub async fn notify_nodes<M: Msg + Serializable>(
206206
ctx.conn
207207
.broadcast_datagram(nodes.into_iter().map(|e| e.uid), msg)
208208
.await?;
209-
}
210209

211-
Ok(()) as Result<_>
212-
}
213-
.await
214-
{
215-
log::error!("Notification could not be sent to all nodes: {err:#}");
210+
Ok(()) as Result<_>
211+
}
212+
.await
213+
{
214+
log::error!("Notification could not be sent to all {t} nodes: {err:#}");
215+
}
216216
}
217217
}

mgmtd/src/grpc.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use sqlite::{TransactionExt, check_affected_rows};
1616
use sqlite_check::sql;
1717
use std::fmt::Debug;
1818
use std::future::Future;
19-
use std::net::SocketAddr;
19+
use std::net::{SocketAddr, TcpListener};
2020
use std::pin::Pin;
2121
use tonic::transport::{Identity, Server, ServerTlsConfig};
2222
use tonic::{Code, Request, Response, Status};
@@ -181,13 +181,12 @@ pub(crate) fn serve(ctx: Context, mut shutdown: RunStateHandle) -> Result<()> {
181181
builder
182182
};
183183

184-
let serve_addr = SocketAddr::new("::".parse()?, ctx.info.user_config.grpc_port);
185-
184+
let ctx2 = ctx.clone();
186185
let service = pm::management_server::ManagementServer::with_interceptor(
187186
ManagementService { ctx: ctx.clone() },
188187
move |req: Request<()>| {
189188
// If authentication is enabled, require the secret passed with every request
190-
if let Some(required_secret) = ctx.info.auth_secret {
189+
if let Some(required_secret) = ctx2.info.auth_secret {
191190
let check = || -> Result<()> {
192191
let Some(request_secret) = req.metadata().get("auth-secret") else {
193192
bail!("Request requires authentication but no secret was provided")
@@ -211,6 +210,20 @@ pub(crate) fn serve(ctx: Context, mut shutdown: RunStateHandle) -> Result<()> {
211210
},
212211
);
213212

213+
let mut serve_addr = SocketAddr::new("::".parse()?, ctx.info.user_config.grpc_port);
214+
215+
// Test for IPv6 available, fall back to IPv4 sockets if not
216+
match TcpListener::bind(serve_addr) {
217+
Ok(_) => {}
218+
Err(err) if err.raw_os_error() == Some(libc::EAFNOSUPPORT) => {
219+
log::debug!("gRPC: IPv6 not available, falling back to IPv4 sockets");
220+
serve_addr = SocketAddr::new("0.0.0.0".parse()?, ctx.info.user_config.grpc_port);
221+
}
222+
Err(err) => {
223+
anyhow::bail!(err);
224+
}
225+
}
226+
214227
log::info!("Serving gRPC requests on {serve_addr}");
215228

216229
tokio::spawn(async move {

mgmtd/src/lib.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use sqlite::TransactionExt;
2727
use sqlite_check::sql;
2828
use std::collections::HashSet;
2929
use std::future::Future;
30-
use std::net::SocketAddr;
30+
use std::net::{SocketAddr, TcpListener};
3131
use std::sync::Arc;
3232
use tokio::net::UdpSocket;
3333
use tokio::sync::mpsc;
@@ -60,14 +60,22 @@ pub async fn start(info: StaticInfo, license: LicenseVerifier) -> Result<RunCont
6060
// Static configuration which doesn't change at runtime
6161
let info = Box::leak(Box::new(info));
6262

63+
let mut beemsg_serve_addr = SocketAddr::new("::".parse()?, info.user_config.beemsg_port);
64+
65+
// Test for IPv6 available, fall back to IPv4 sockets if not
66+
match TcpListener::bind(beemsg_serve_addr) {
67+
Ok(_) => {}
68+
Err(err) if err.raw_os_error() == Some(libc::EAFNOSUPPORT) => {
69+
log::debug!("BeeMsg: IPv6 not available, falling back to IPv4 sockets");
70+
beemsg_serve_addr = SocketAddr::new("0.0.0.0".parse()?, info.user_config.beemsg_port);
71+
}
72+
Err(err) => {
73+
anyhow::bail!(err);
74+
}
75+
}
76+
6377
// UDP socket for in- and outgoing messages
64-
let udp_socket = Arc::new(
65-
UdpSocket::bind(SocketAddr::new(
66-
"::0".parse()?,
67-
info.user_config.beemsg_port,
68-
))
69-
.await?,
70-
);
78+
let udp_socket = Arc::new(UdpSocket::bind(beemsg_serve_addr).await?);
7179

7280
// Node address store and connection pool
7381
let conn_pool = Pool::new(
@@ -121,8 +129,9 @@ pub async fn start(info: StaticInfo, license: LicenseVerifier) -> Result<RunCont
121129
);
122130

123131
// Listen for incoming TCP connections
132+
// Fall back to ipv4 socket if ipv6 is not available
124133
incoming::listen_tcp(
125-
SocketAddr::new("::0".parse()?, ctx.info.user_config.beemsg_port),
134+
beemsg_serve_addr,
126135
ctx.clone(),
127136
info.auth_secret.is_some(),
128137
run_state.clone(),

shared/src/conn/outgoing.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,11 @@ impl Pool {
185185
Ok(())
186186
}
187187

188+
/// Broadcasts a BeeMsg datagram to all given nodes using all their known addresses
189+
///
190+
/// Logs errors if sending failed completely for a node, only fails if serialization fails.
191+
/// Remember that this is UDP and thus no errors only means that the sending was successful,
192+
/// not that the messages reached their destinations.
188193
pub async fn broadcast_datagram<M: Msg + Serializable>(
189194
&self,
190195
peers: impl IntoIterator<Item = Uid>,
@@ -194,12 +199,29 @@ impl Pool {
194199
buf.serialize_msg(msg)?;
195200

196201
for node_uid in peers {
197-
let Some(addrs) = self.store.get_node_addrs(node_uid) else {
198-
bail!("No network address found for node with uid {node_uid:?}");
199-
};
202+
let addrs = self.store.get_node_addrs(node_uid).unwrap_or_default();
203+
204+
if addrs.is_empty() {
205+
log::error!(
206+
"Failed to send datagram to node with uid {node_uid}: No known addresses"
207+
);
208+
continue;
209+
}
200210

211+
let mut errs = vec![];
201212
for addr in addrs.iter() {
202-
buf.send_to_socket(&self.udp_socket, addr).await?;
213+
if let Err(err) = buf.send_to_socket(&self.udp_socket, addr).await {
214+
log::debug!(
215+
"Sending datagram to node with uid {node_uid} using {addr} failed: {err}"
216+
);
217+
errs.push((addr, err));
218+
}
219+
}
220+
221+
if errs.len() == addrs.len() {
222+
log::error!(
223+
"Failed to send datagram to node with uid {node_uid} on all known addresses: {errs:?}"
224+
);
203225
}
204226
}
205227

0 commit comments

Comments
 (0)