Skip to content

Commit 7207102

Browse files
authored
Merge pull request #114 from lunatic-solutions/distributed_quic
Replace tcp with quic for distributed nodes
2 parents 07236b5 + 09e1ca7 commit 7207102

File tree

20 files changed

+2034
-327
lines changed

20 files changed

+2034
-327
lines changed

Cargo.lock

Lines changed: 1154 additions & 139 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ path = "src/lib.rs"
1818
name = "lunatic"
1919
path = "src/main.rs"
2020

21+
[features]
22+
default = ["distributed-quinn"]
23+
distributed-tcp = ["lunatic-distributed/tcp"]
24+
distributed-s2n = ["lunatic-distributed/quic-s2n"]
25+
distributed-quinn = ["lunatic-distributed/quic-quinn"]
26+
2127
[dependencies]
2228
anyhow = "^1.0"
2329
clap = { version = "^3.0", features = ["cargo"] }
@@ -33,6 +39,7 @@ serde = "^1.0"
3339
bincode = "^1.3"
3440
dashmap = "^5.3.4"
3541
regex = "^1.5"
42+
uuid = { version = "^1.1", features = ["v4"] }
3643
hash-map-id = { version = "^0.10", path = "crates/hash-map-id" }
3744
lunatic-stdout-capture = { version = "^0.10", path = "crates/lunatic-stdout-capture" }
3845
lunatic-process = { version = "^0.10", path = "crates/lunatic-process" }

crates/lunatic-distributed-api/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,6 @@ where
122122
.data(&caller)
123123
.get(params_ptr as usize..(params_ptr + params_len) as usize)
124124
.or_trap("lunatic::distributed::spawn::params")?;
125-
126125
let params = params
127126
.chunks_exact(17)
128127
.map(|chunk| {

crates/lunatic-distributed/Cargo.toml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,24 @@ description = "Node to node communication"
66

77
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
88

9+
[features]
10+
tcp = []
11+
quic-s2n = ["dep:s2n-quic"]
12+
quic-quinn = ["dep:quinn", "dep:rustls", "dep:rustls-pemfile", "dep:futures-util"]
13+
914
[dependencies]
1015
anyhow = "^1.0"
1116
tokio = { version = "^1.20", features = ["macros", "rt-multi-thread", "net", "time", "io-util"] }
1217
serde = { version = "^1.0", features = ["derive"] }
18+
s2n-quic = { version = "1", default-features = false, features = ["provider-address-token-default", "provider-tls-rustls"], optional = true}
1319
wasmtime = "^0.39"
1420
dashmap = "5.3.4"
1521
bincode = "^1.3"
1622
log = "^0.4"
1723
async_cell = "0.2.1"
18-
lunatic-process = { version = "^0.10", path = "../lunatic-process" }
24+
rcgen = { version="^0.9", features=["pem", "x509-parser"] }
25+
lunatic-process = { version = "^0.10", path = "../lunatic-process" }
26+
quinn = { version = "^0.8", optional = true}
27+
rustls = { version = "^0.20", optional = true }
28+
rustls-pemfile = { version = "^1.0", optional = true }
29+
futures-util = { version = "^0.3", optional = true}

crates/lunatic-distributed/src/connection.rs

Lines changed: 0 additions & 57 deletions
This file was deleted.

crates/lunatic-distributed/src/control/client.rs

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,15 @@ use std::{
77
sync::{atomic, atomic::AtomicU64, Arc, RwLock},
88
time::Duration,
99
};
10-
use tokio::net::TcpStream;
1110

1211
use crate::{
13-
connection::Connection,
14-
control::message::{Registration, Request, Response},
12+
control::message::{Registered, Registration, Request, Response},
13+
quic::{self, Connection},
1514
NodeInfo,
1615
};
1716

17+
use super::server::CTRL_SERVER_NAME;
18+
1819
#[derive(Clone)]
1920
pub struct Client {
2021
inner: Arc<InnerClient>,
@@ -23,6 +24,7 @@ pub struct Client {
2324
pub struct InnerClient {
2425
next_message_id: AtomicU64,
2526
node_addr: SocketAddr,
27+
node_name: String,
2628
control_addr: SocketAddr,
2729
connection: Connection,
2830
pending_requests: DashMap<u64, Arc<AsyncCell<Response>>>,
@@ -31,13 +33,22 @@ pub struct InnerClient {
3133
}
3234

3335
impl Client {
34-
pub async fn register(node_addr: SocketAddr, control_addr: SocketAddr) -> Result<(u64, Self)> {
36+
pub async fn register(
37+
node_addr: SocketAddr,
38+
node_name: String,
39+
control_addr: SocketAddr,
40+
quic_client: quic::Client,
41+
signing_request: String,
42+
) -> Result<(u64, Self, String)> {
3543
let client = Client {
3644
inner: Arc::new(InnerClient {
3745
next_message_id: AtomicU64::new(1),
3846
control_addr,
3947
node_addr,
40-
connection: connect(control_addr, 5).await?,
48+
node_name,
49+
connection: quic_client
50+
.connect(control_addr, CTRL_SERVER_NAME, 5)
51+
.await?,
4152
pending_requests: DashMap::new(),
4253
nodes: Default::default(),
4354
node_ids: Default::default(),
@@ -46,8 +57,12 @@ impl Client {
4657
// Spawn reader task before register
4758
tokio::task::spawn(reader_task(client.clone()));
4859
tokio::task::spawn(refresh_nodes_task(client.clone()));
49-
let node_id: u64 = client.send_registration().await?;
50-
Ok((node_id, client))
60+
let Registered {
61+
node_id,
62+
signed_cert,
63+
} = client.send_registration(signing_request).await?;
64+
65+
Ok((node_id, client, signed_cert))
5166
}
5267

5368
pub fn next_message_id(&self) -> u64 {
@@ -78,15 +93,18 @@ impl Client {
7893
self.inner.connection.receive().await
7994
}
8095

81-
async fn send_registration(&self) -> Result<u64> {
96+
async fn send_registration(&self, signing_request: String) -> Result<Registered> {
8297
let reg = Registration {
8398
node_address: self.inner.node_addr,
99+
node_name: self.inner.node_name.clone(),
100+
signing_request,
84101
};
85102
let resp = self.send(Request::Register(reg)).await?;
86-
if let Response::Register(node_id) = resp {
87-
return Ok(node_id);
103+
match resp {
104+
Response::Register(data) => Ok(data),
105+
Response::Error(e) => Err(anyhow!("Registration failed. {e}")),
106+
_ => Err(anyhow!("Registration failed.")),
88107
}
89-
Err(anyhow!("Registration failed."))
90108
}
91109

92110
fn process_response(&self, id: u64, resp: Response) {
@@ -106,6 +124,7 @@ impl Client {
106124
NodeInfo {
107125
id,
108126
address: reg.node_address,
127+
name: reg.node_name,
109128
},
110129
);
111130
}
@@ -117,6 +136,10 @@ impl Client {
117136
Ok(())
118137
}
119138

139+
pub async fn deregister(&self, node_id: u64) {
140+
self.send(Request::Deregister(node_id)).await.ok();
141+
}
142+
120143
pub fn node_info(&self, node_id: u64) -> Option<NodeInfo> {
121144
self.inner.nodes.get(&node_id).map(|e| e.clone())
122145
}
@@ -146,17 +169,6 @@ impl Client {
146169
}
147170
}
148171

149-
async fn connect(addr: SocketAddr, retry: u32) -> Result<Connection> {
150-
for _ in 0..retry {
151-
log::info!("Connecting to control {addr}");
152-
if let Ok(stream) = TcpStream::connect(addr).await {
153-
return Ok(Connection::new(stream));
154-
}
155-
tokio::time::sleep(Duration::from_secs(2)).await;
156-
}
157-
Err(anyhow!("Failed to connect to {addr}"))
158-
}
159-
160172
async fn reader_task(client: Client) -> Result<()> {
161173
loop {
162174
if let Ok((id, resp)) = client.recv().await {

crates/lunatic-distributed/src/control/message.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,45 @@ use serde::{Deserialize, Serialize};
55
#[derive(Clone, Debug, Serialize, Deserialize)]
66
pub enum Request {
77
Register(Registration),
8+
// Currently a node will send it's own id. We need to refactor this part: the control server
9+
// should always handle registration first and later know which node is sending requests.
10+
Deregister(u64),
811
ListNodes,
912
AddModule(Vec<u8>),
1013
GetModule(u64),
1114
}
1215

16+
impl Request {
17+
pub fn kind(&self) -> &'static str {
18+
match self {
19+
Request::Register(_) => "Register",
20+
Request::Deregister(_) => "Deregister",
21+
Request::ListNodes => "ListNodes",
22+
Request::AddModule(_) => "AddModule",
23+
Request::GetModule(_) => "GetModule",
24+
}
25+
}
26+
}
27+
1328
#[derive(Clone, Debug, Serialize, Deserialize)]
1429
pub enum Response {
15-
Register(u64),
30+
Register(Registered),
1631
Nodes(Vec<(u64, Registration)>),
1732
Module(Option<Vec<u8>>),
1833
ModuleId(u64),
34+
Error(String),
35+
None,
1936
}
2037

2138
#[derive(Clone, Debug, Serialize, Deserialize)]
2239
pub struct Registration {
2340
pub node_address: SocketAddr,
41+
pub node_name: String,
42+
pub signing_request: String,
43+
}
44+
45+
#[derive(Clone, Debug, Serialize, Deserialize)]
46+
pub struct Registered {
47+
pub node_id: u64,
48+
pub signed_cert: String,
2449
}

0 commit comments

Comments
 (0)