Skip to content

Commit bbed2d8

Browse files
committed
refactor: move BeeMsg handlers to their own files
1 parent 4dd0151 commit bbed2d8

29 files changed

+985
-920
lines changed

mgmtd/src/bee_msg.rs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,33 @@ use sqlite_check::sql;
1818
use std::collections::HashMap;
1919
use std::fmt::Display;
2020

21-
mod buddy_group;
22-
mod misc;
23-
mod node;
24-
mod quota;
25-
mod storage_pool;
26-
mod target;
21+
mod common;
22+
23+
mod ack;
24+
mod authenticate_channel;
25+
mod change_target_consistency_states;
26+
mod get_mirror_buddy_groups;
27+
mod get_node_capacity_pools;
28+
mod get_nodes;
29+
mod get_states_and_buddy_groups;
30+
mod get_storage_pools;
31+
mod get_target_mappings;
32+
mod get_target_states;
33+
mod heartbeat;
34+
mod heartbeat_request;
35+
mod map_targets;
36+
mod map_targets_resp;
37+
mod peer_info;
38+
mod refresh_capacity_pools;
39+
mod register_node;
40+
mod register_target;
41+
mod remove_node;
42+
mod remove_node_resp;
43+
mod request_exceeded_quota;
44+
mod set_channel_direct;
45+
mod set_mirror_buddy_groups_resp;
46+
mod set_storage_target_info;
47+
mod set_target_consistency_states;
2748

2849
/// Msg request handler for requests where no response is expected.
2950
/// To handle a message, implement this and add it to the dispatch list with `=> _`.

mgmtd/src/bee_msg/ack.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
use super::*;
2+
use shared::bee_msg::misc::*;
3+
4+
impl HandleNoResponse for Ack {
5+
async fn handle(self, _ctx: &Context, req: &mut impl Request) -> Result<()> {
6+
log::debug!("Ignoring Ack from {:?}: Id: {:?}", req.addr(), self.ack_id);
7+
Ok(())
8+
}
9+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
use super::*;
2+
use shared::bee_msg::misc::*;
3+
4+
impl HandleNoResponse for AuthenticateChannel {
5+
async fn handle(self, ctx: &Context, req: &mut impl Request) -> Result<()> {
6+
if let Some(ref secret) = ctx.info.auth_secret {
7+
if secret == &self.auth_secret {
8+
req.authenticate_connection();
9+
} else {
10+
log::error!(
11+
"Peer {:?} tried to authenticate stream with wrong secret",
12+
req.addr()
13+
);
14+
}
15+
} else {
16+
log::debug!(
17+
"Peer {:?} tried to authenticate stream, but authentication is not required",
18+
req.addr()
19+
);
20+
}
21+
22+
Ok(())
23+
}
24+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
use super::*;
2+
use common::update_last_contact_times;
3+
use shared::bee_msg::target::*;
4+
5+
impl HandleWithResponse for ChangeTargetConsistencyStates {
6+
type Response = ChangeTargetConsistencyStatesResp;
7+
8+
fn error_response() -> Self::Response {
9+
ChangeTargetConsistencyStatesResp {
10+
result: OpsErr::INTERNAL,
11+
}
12+
}
13+
14+
async fn handle(self, ctx: &Context, __req: &mut impl Request) -> Result<Self::Response> {
15+
fail_on_pre_shutdown(ctx)?;
16+
17+
// self.old_states is currently completely ignored. If something reports a non-GOOD state, I
18+
// see no apparent reason to that the old state matches before setting. We have the
19+
// authority, whatever nodes think their old state was doesn't matter.
20+
21+
let changed = ctx
22+
.db
23+
.write_tx(move |tx| {
24+
let node_type = self.node_type.try_into()?;
25+
26+
// Check given target Ids exist
27+
db::target::validate_ids(tx, &self.target_ids, node_type)?;
28+
29+
// Old management updates contact time while handling this message (comes usually in
30+
// every 30 seconds), so we do it as well
31+
update_last_contact_times(tx, &self.target_ids, node_type)?;
32+
33+
let affected = db::target::update_consistency_states(
34+
tx,
35+
self.target_ids
36+
.into_iter()
37+
.zip(self.new_states.iter().copied()),
38+
node_type,
39+
)?;
40+
41+
Ok(affected > 0)
42+
})
43+
.await?;
44+
45+
log::debug!(
46+
"Updated target consistency states for {:?} nodes",
47+
self.node_type
48+
);
49+
50+
if changed {
51+
notify_nodes(
52+
ctx,
53+
&[NodeType::Meta, NodeType::Storage, NodeType::Client],
54+
&RefreshTargetStates { ack_id: "".into() },
55+
)
56+
.await;
57+
}
58+
59+
Ok(ChangeTargetConsistencyStatesResp {
60+
result: OpsErr::SUCCESS,
61+
})
62+
}
63+
}

mgmtd/src/bee_msg/common.rs

Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
use super::*;
2+
use crate::db::node_nic::ReplaceNic;
3+
use db::misc::MetaRoot;
4+
use rusqlite::Transaction;
5+
use shared::bee_msg::node::*;
6+
use shared::bee_msg::target::*;
7+
use shared::types::{NodeId, TargetId};
8+
use std::net::SocketAddr;
9+
use std::sync::Arc;
10+
use std::time::Duration;
11+
12+
/// Processes incoming node information. Registers new nodes if config allows it
13+
pub(super) async fn update_node(msg: RegisterNode, ctx: &Context) -> Result<NodeId> {
14+
let nics = msg.nics.clone();
15+
let requested_node_id = msg.node_id;
16+
let info = ctx.info;
17+
18+
let licensed_machines = match ctx.license.get_num_machines() {
19+
Ok(n) => n,
20+
Err(err) => {
21+
log::debug!(
22+
"Could not obtain number of licensed machines, defaulting to unlimited: {err:#}"
23+
);
24+
u32::MAX
25+
}
26+
};
27+
28+
let (node, meta_root, is_new) = ctx
29+
.db
30+
.write_tx_no_sync(move |tx| {
31+
let node = if msg.node_id == 0 {
32+
// No node ID given => new node
33+
None
34+
} else {
35+
// If Some is returned, the node with node_id already exists
36+
try_resolve_num_id(tx, EntityType::Node, msg.node_type, msg.node_id)?
37+
};
38+
39+
let machine_uuid = if matches!(msg.node_type, NodeType::Meta | NodeType::Storage)
40+
&& !msg.machine_uuid.is_empty()
41+
{
42+
Some(std::str::from_utf8(&msg.machine_uuid)?)
43+
} else {
44+
None
45+
};
46+
47+
if let Some(machine_uuid) = machine_uuid {
48+
if db::node::count_machines(tx, machine_uuid, node.as_ref().map(|n| n.uid))?
49+
>= licensed_machines
50+
{
51+
bail!("Licensed machine limit reached. Node registration denied.");
52+
}
53+
}
54+
55+
let (node, is_new) = if let Some(node) = node {
56+
// Existing node, update data
57+
db::node::update(tx, node.uid, msg.port, machine_uuid)?;
58+
59+
(node, false)
60+
} else {
61+
// New node, do additional checks and insert data
62+
63+
// Check node registration is allowed. This should ignore registering client
64+
// nodes.
65+
if msg.node_type != NodeType::Client && info.user_config.registration_disable {
66+
bail!("Registration of new nodes is not allowed");
67+
}
68+
69+
let new_alias = if msg.node_type == NodeType::Client {
70+
// In versions prior to 8.0 the string node ID generated by the client
71+
// started with a number which is not allowed by the new alias schema.
72+
// As part of BeeGFS 8 the nodeID generated for each client mount was
73+
// updated to no longer start with a number, thus it is unlikely this
74+
// would happen unless BeeGFS 8 was mounted by a BeeGFS 7 client.
75+
76+
let new_alias = String::from_utf8(msg.node_alias)
77+
.ok()
78+
.and_then(|s| Alias::try_from(s).ok());
79+
80+
if new_alias.is_none() {
81+
log::warn!(
82+
"Unable to use alias requested by client (possibly the\
83+
client version < 8.0)"
84+
);
85+
}
86+
new_alias
87+
} else {
88+
None
89+
};
90+
91+
// Insert new node entry
92+
let node = db::node::insert(tx, msg.node_id, new_alias, msg.node_type, msg.port)?;
93+
94+
// if this is a meta node, auto-add a corresponding meta target after the node.
95+
if msg.node_type == NodeType::Meta {
96+
// Convert the NodeID to a TargetID. Due to the difference in bitsize, meta
97+
// node IDs are not allowed to be bigger than u16
98+
let Ok(target_id) = TargetId::try_from(node.num_id()) else {
99+
bail!(
100+
"{} is not a valid numeric meta node id\
101+
(must be between 1 and 65535)",
102+
node.num_id()
103+
);
104+
};
105+
106+
db::target::insert_meta(tx, target_id, None)?;
107+
}
108+
109+
(node, true)
110+
};
111+
112+
// Update the corresponding nic lists
113+
db::node_nic::replace(
114+
tx,
115+
node.uid,
116+
msg.nics.iter().map(|e| ReplaceNic {
117+
nic_type: e.nic_type,
118+
addr: &e.addr,
119+
name: String::from_utf8_lossy(&e.name),
120+
}),
121+
)?;
122+
123+
let meta_root = match node.node_type() {
124+
// In case this is a meta node, the requester expects info about the meta
125+
// root
126+
NodeType::Meta => db::misc::get_meta_root(tx)?,
127+
_ => MetaRoot::Unknown,
128+
};
129+
130+
Ok((node, meta_root, is_new))
131+
})
132+
.await?;
133+
134+
ctx.conn.replace_node_addrs(
135+
node.uid,
136+
nics.clone()
137+
.into_iter()
138+
.map(|e| SocketAddr::new(e.addr, msg.port))
139+
.collect::<Arc<_>>(),
140+
);
141+
142+
if is_new {
143+
log::info!("Registered new node {node} (Requested Numeric Id: {requested_node_id})",);
144+
} else {
145+
log::debug!("Updated node {node} node",);
146+
}
147+
148+
let node_num_id = node.num_id();
149+
150+
// notify all nodes
151+
notify_nodes(
152+
ctx,
153+
match node.node_type() {
154+
NodeType::Meta => &[NodeType::Meta, NodeType::Client],
155+
NodeType::Storage => &[NodeType::Meta, NodeType::Storage, NodeType::Client],
156+
NodeType::Client => &[NodeType::Meta],
157+
_ => &[],
158+
},
159+
&Heartbeat {
160+
instance_version: 0,
161+
nic_list_version: 0,
162+
node_type: node.node_type(),
163+
node_alias: String::from(node.alias).into_bytes(),
164+
ack_id: "".into(),
165+
node_num_id,
166+
root_num_id: match meta_root {
167+
MetaRoot::Unknown => 0,
168+
MetaRoot::Normal(node_id, _) => node_id,
169+
MetaRoot::Mirrored(group_id) => group_id.into(),
170+
},
171+
is_root_mirrored: match meta_root {
172+
MetaRoot::Unknown | MetaRoot::Normal(_, _) => 0,
173+
MetaRoot::Mirrored(_) => 1,
174+
},
175+
port: msg.port,
176+
port_tcp_unused: msg.port,
177+
nic_list: nics,
178+
machine_uuid: vec![], // No need for the other nodes to know machine UUIDs
179+
},
180+
)
181+
.await;
182+
183+
Ok(node_num_id)
184+
}
185+
186+
pub(super) fn get_targets_with_states(
187+
tx: &Transaction,
188+
pre_shutdown: bool,
189+
node_type: NodeTypeServer,
190+
node_offline_timeout: Duration,
191+
) -> Result<Vec<(TargetId, TargetConsistencyState, TargetReachabilityState)>> {
192+
let targets = tx.query_map_collect(
193+
sql!(
194+
"SELECT t.target_id, t.consistency,
195+
(UNIXEPOCH('now') - UNIXEPOCH(t.last_update)), gp.p_target_id, gs.s_target_id
196+
FROM targets AS t
197+
INNER JOIN nodes AS n USING(node_type, node_id)
198+
LEFT JOIN buddy_groups AS gp ON gp.p_target_id = t.target_id AND gp.node_type = t.node_type
199+
LEFT JOIN buddy_groups AS gs ON gs.s_target_id = t.target_id AND gs.node_type = t.node_type
200+
WHERE t.node_type = ?1"
201+
),
202+
[node_type.sql_variant()],
203+
|row| {
204+
let is_primary = row.get::<_, Option<TargetId>>(3)?.is_some();
205+
let is_secondary = row.get::<_, Option<TargetId>>(4)?.is_some();
206+
207+
Ok((
208+
row.get(0)?,
209+
TargetConsistencyState::from_row(row, 1)?,
210+
if !pre_shutdown || is_secondary {
211+
let age = Duration::from_secs(row.get(2)?);
212+
213+
// We never want to report a primary node of a buddy group as offline since this
214+
// is considered invalid. Instead we just report ProbablyOffline and wait for the switchover.
215+
if !is_primary && age > node_offline_timeout {
216+
TargetReachabilityState::Offline
217+
} else if age > node_offline_timeout / 2 {
218+
TargetReachabilityState::ProbablyOffline
219+
} else {
220+
TargetReachabilityState::Online
221+
}
222+
} else {
223+
TargetReachabilityState::ProbablyOffline
224+
},
225+
))
226+
},
227+
)?;
228+
229+
Ok(targets)
230+
}
231+
232+
/// Updates the `last_contact` time for all the nodes belonging to the passed targets and the
233+
/// targets `last_update` times themselves.
234+
pub(super) fn update_last_contact_times(
235+
tx: &Transaction,
236+
target_ids: &[TargetId],
237+
node_type: NodeTypeServer,
238+
) -> Result<()> {
239+
let target_ids_param = sqlite::rarray_param(target_ids.iter().copied());
240+
241+
tx.execute_cached(
242+
sql!(
243+
"UPDATE nodes AS n SET last_contact = DATETIME('now')
244+
WHERE n.node_uid IN (
245+
SELECT DISTINCT node_uid FROM targets_ext
246+
WHERE target_id IN rarray(?1) AND node_type = ?2)"
247+
),
248+
rusqlite::params![&target_ids_param, node_type.sql_variant()],
249+
)?;
250+
251+
tx.execute_cached(
252+
sql!(
253+
"UPDATE targets SET last_update = DATETIME('now')
254+
WHERE target_id IN rarray(?1) AND node_type = ?2"
255+
),
256+
rusqlite::params![&target_ids_param, node_type.sql_variant()],
257+
)?;
258+
259+
Ok(())
260+
}

0 commit comments

Comments
 (0)