Skip to content

Commit 09e1ca7

Browse files
committed
Deregister node on exit and conflicting addr
1 parent d0479d8 commit 09e1ca7

File tree

4 files changed

+86
-51
lines changed

4 files changed

+86
-51
lines changed

crates/lunatic-distributed/src/control/client.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ impl Client {
6161
node_id,
6262
signed_cert,
6363
} = client.send_registration(signing_request).await?;
64+
6465
Ok((node_id, client, signed_cert))
6566
}
6667

@@ -135,6 +136,10 @@ impl Client {
135136
Ok(())
136137
}
137138

139+
pub async fn deregister(&self, node_id: u64) {
140+
self.send(Request::Deregister(node_id)).await.ok();
141+
}
142+
138143
pub fn node_info(&self, node_id: u64) -> Option<NodeInfo> {
139144
self.inner.nodes.get(&node_id).map(|e| e.clone())
140145
}

crates/lunatic-distributed/src/control/message.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ use serde::{Deserialize, Serialize};
55
#[derive(Clone, Debug, Serialize, Deserialize)]
66
pub enum Request {
77
Register(Registration),
8+
// Currently a node will send it's own id. We need to refactor this part: the control server
9+
// should always handle registration first and later know which node is sending requests.
10+
Deregister(u64),
811
ListNodes,
912
AddModule(Vec<u8>),
1013
GetModule(u64),
@@ -14,6 +17,7 @@ impl Request {
1417
pub fn kind(&self) -> &'static str {
1518
match self {
1619
Request::Register(_) => "Register",
20+
Request::Deregister(_) => "Deregister",
1721
Request::ListNodes => "ListNodes",
1822
Request::AddModule(_) => "AddModule",
1923
Request::GetModule(_) => "GetModule",
@@ -28,6 +32,7 @@ pub enum Response {
2832
Module(Option<Vec<u8>>),
2933
ModuleId(u64),
3034
Error(String),
35+
None,
3136
}
3237

3338
#[derive(Clone, Debug, Serialize, Deserialize)]

crates/lunatic-distributed/src/control/server.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub struct Server {
2222
struct InnerServer {
2323
next_node_id: AtomicU64,
2424
nodes: DashMap<u64, Registration>,
25+
addr_to_node: DashMap<SocketAddr, u64>,
2526
next_module_id: AtomicU64,
2627
modules: DashMap<u64, Vec<u8>>,
2728
ca_cert: Certificate,
@@ -34,6 +35,7 @@ impl Server {
3435
next_node_id: AtomicU64::new(1),
3536
next_module_id: AtomicU64::new(1),
3637
nodes: DashMap::new(),
38+
addr_to_node: DashMap::new(),
3739
modules: DashMap::new(),
3840
ca_cert,
3941
}),
@@ -58,7 +60,15 @@ impl Server {
5860
.and_then(|sign_request| sign_request.serialize_pem_with_signer(&self.inner.ca_cert));
5961
match signed_cert {
6062
Ok(signed_cert) => {
63+
// Remove another node using the same address. This is temporarily until we define
64+
// details of connection status & reconnecting/registering.
65+
if let Some(proc_id) = self.inner.addr_to_node.get(&reg.node_address) {
66+
self.inner.nodes.remove(&proc_id);
67+
}
68+
69+
self.inner.addr_to_node.insert(reg.node_address, node_id);
6170
self.inner.nodes.insert(node_id, reg);
71+
6272
Response::Register(Registered {
6373
node_id,
6474
signed_cert,
@@ -68,6 +78,11 @@ impl Server {
6878
}
6979
}
7080

81+
pub fn deregister(&self, node_id: u64) -> Response {
82+
self.inner.nodes.remove(&node_id);
83+
Response::None
84+
}
85+
7186
pub fn list_nodes(&self) -> Response {
7287
Response::Nodes(
7388
self.inner
@@ -170,6 +185,7 @@ pub async fn handle_request(
170185
use crate::control::message::Request::*;
171186
let response = match request {
172187
Register(reg) => server.register(reg),
188+
Deregister(node_id) => server.deregister(node_id),
173189
ListNodes => server.list_nodes(),
174190
AddModule(bytes) => server.add_module(bytes),
175191
GetModule(id) => server.get_module(id),

src/mode/execution.rs

Lines changed: 60 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -128,60 +128,62 @@ pub(crate) async fn execute() -> Result<()> {
128128

129129
let env = envs.get_or_create(1);
130130

131-
let distributed_state = if let (Some(node_address), Some(control_address)) =
132-
(args.value_of("node"), args.value_of("control"))
133-
{
134-
// TODO unwrap, better message
135-
let node_address = node_address.parse().unwrap();
136-
let node_name = Uuid::new_v4().to_string();
137-
let control_address = control_address.parse().unwrap();
138-
let ca_cert = lunatic_distributed::distributed::server::root_cert(
139-
args.is_present("test_ca"),
140-
args.value_of("ca_cert"),
141-
)
142-
.unwrap();
143-
let node_cert =
144-
lunatic_distributed::distributed::server::gen_node_cert(&node_name).unwrap();
131+
let (distributed_state, control_client, node_id) =
132+
if let (Some(node_address), Some(control_address)) =
133+
(args.value_of("node"), args.value_of("control"))
134+
{
135+
// TODO unwrap, better message
136+
let node_address = node_address.parse().unwrap();
137+
let node_name = Uuid::new_v4().to_string();
138+
let control_address = control_address.parse().unwrap();
139+
let ca_cert = lunatic_distributed::distributed::server::root_cert(
140+
args.is_present("test_ca"),
141+
args.value_of("ca_cert"),
142+
)
143+
.unwrap();
144+
let node_cert =
145+
lunatic_distributed::distributed::server::gen_node_cert(&node_name).unwrap();
145146

146-
let quic_client = quic::new_quic_client(&ca_cert).unwrap();
147+
let quic_client = quic::new_quic_client(&ca_cert).unwrap();
147148

148-
let (node_id, control_client, signed_cert_pem) = control::Client::register(
149-
node_address,
150-
node_name.to_string(),
151-
control_address,
152-
quic_client.clone(),
153-
node_cert.serialize_request_pem().unwrap(),
154-
)
155-
.await?;
149+
let (node_id, control_client, signed_cert_pem) = control::Client::register(
150+
node_address,
151+
node_name.to_string(),
152+
control_address,
153+
quic_client.clone(),
154+
node_cert.serialize_request_pem().unwrap(),
155+
)
156+
.await?;
156157

157-
let distributed_client =
158-
distributed::Client::new(node_id, control_client.clone(), quic_client.clone()).await?;
158+
let distributed_client =
159+
distributed::Client::new(node_id, control_client.clone(), quic_client.clone())
160+
.await?;
159161

160-
let dist = lunatic_distributed::DistributedProcessState::new(
161-
node_id,
162-
control_client.clone(),
163-
distributed_client,
164-
)
165-
.await?;
166-
167-
tokio::task::spawn(lunatic_distributed::distributed::server::node_server(
168-
ServerCtx {
169-
envs,
170-
modules: Modules::<DefaultProcessState>::default(),
171-
distributed: dist.clone(),
172-
runtime: runtime.clone(),
173-
},
174-
node_address,
175-
signed_cert_pem,
176-
node_cert.serialize_private_key_pem(),
177-
));
178-
179-
log::info!("Registration successful, node id {}", node_id);
180-
181-
Some(dist)
182-
} else {
183-
None
184-
};
162+
let dist = lunatic_distributed::DistributedProcessState::new(
163+
node_id,
164+
control_client.clone(),
165+
distributed_client,
166+
)
167+
.await?;
168+
169+
tokio::task::spawn(lunatic_distributed::distributed::server::node_server(
170+
ServerCtx {
171+
envs,
172+
modules: Modules::<DefaultProcessState>::default(),
173+
distributed: dist.clone(),
174+
runtime: runtime.clone(),
175+
},
176+
node_address,
177+
signed_cert_pem,
178+
node_cert.serialize_private_key_pem(),
179+
));
180+
181+
log::info!("Registration successful, node id {}", node_id);
182+
183+
(Some(dist), Some(control_client), Some(node_id))
184+
} else {
185+
(None, None, None)
186+
};
185187

186188
let mut config = DefaultProcessConfig::default();
187189
// Allow initial process to compile modules, create configurations and spawn sub-processes
@@ -244,7 +246,14 @@ pub(crate) async fn execute() -> Result<()> {
244246
path.to_string_lossy()
245247
))?;
246248
// Wait on the main process to finish
247-
task.await.map(|_| ()).map_err(|e| anyhow!(e.to_string()))
249+
let result = task.await.map(|_| ()).map_err(|e| anyhow!(e.to_string()));
250+
251+
// Until we refactor registration and reconnect authentication, send node id explicitly
252+
if let (Some(ctrl), Some(node_id)) = (control_client, node_id) {
253+
ctrl.deregister(node_id).await;
254+
}
255+
256+
result
248257
} else {
249258
// Block forever
250259
let (_sender, mut receiver) = channel::<()>(1);

0 commit comments

Comments
 (0)