-
Notifications
You must be signed in to change notification settings - Fork 221
Parachain multiplexing #295
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f9376c9
e373221
51fa48a
1baff28
92eae0f
54d92e9
4d8918d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,35 +1,60 @@ | ||
use std::collections::BTreeMap; | ||
use std::time::{Duration, Instant}; | ||
use std::net::Ipv4Addr; | ||
use std::mem; | ||
|
||
use bytes::Bytes; | ||
use bytes::{Bytes, BytesMut}; | ||
use actix::prelude::*; | ||
use actix_web_actors::ws; | ||
use actix_http::ws::Item; | ||
use crate::aggregator::{Aggregator, AddNode}; | ||
use crate::chain::{Chain, UpdateNode, RemoveNode}; | ||
use crate::node::NodeId; | ||
use crate::node::message::{NodeMessage, Details, SystemConnected}; | ||
use crate::util::LocateRequest; | ||
use crate::types::ConnId; | ||
|
||
/// How often heartbeat pings are sent | ||
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20); | ||
/// How long before lack of client response causes a timeout | ||
const CLIENT_TIMEOUT: Duration = Duration::from_secs(60); | ||
/// Continuation buffer limit, 10mb | ||
const CONT_BUF_LIMIT: usize = 10 * 1024 * 1024; | ||
|
||
pub struct NodeConnector { | ||
/// Id of the node this connector is responsible for handling | ||
nid: NodeId, | ||
/// Multiplexing connections by id | ||
multiplex: BTreeMap<ConnId, ConnMultiplex>, | ||
/// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT), | ||
hb: Instant, | ||
/// Aggregator actor address | ||
aggregator: Addr<Aggregator>, | ||
/// Chain actor address | ||
chain: Option<Addr<Chain>>, | ||
/// Backlog of messages to be sent once we get a recipient handle to the chain | ||
backlog: Vec<NodeMessage>, | ||
/// IP address of the node this connector is responsible for | ||
ip: Option<Ipv4Addr>, | ||
/// Actix address of location services | ||
locator: Recipient<LocateRequest>, | ||
/// Buffer for constructing continuation messages | ||
contbuf: BytesMut, | ||
} | ||
|
||
enum ConnMultiplex { | ||
Connected { | ||
/// Id of the node this multiplex connector is responsible for handling | ||
nid: NodeId, | ||
/// Chain address to which this multiplex connector is delegating messages | ||
chain: Addr<Chain>, | ||
}, | ||
Waiting { | ||
/// Backlog of messages to be sent once we get a recipient handle to the chain | ||
backlog: Vec<NodeMessage>, | ||
} | ||
} | ||
|
||
impl Default for ConnMultiplex { | ||
fn default() -> Self { | ||
ConnMultiplex::Waiting { | ||
backlog: Vec::new(), | ||
} | ||
} | ||
} | ||
|
||
impl Actor for NodeConnector { | ||
|
@@ -40,23 +65,23 @@ impl Actor for NodeConnector { | |
} | ||
|
||
fn stopped(&mut self, _: &mut Self::Context) { | ||
if let Some(chain) = self.chain.as_ref() { | ||
chain.do_send(RemoveNode(self.nid)); | ||
for mx in self.multiplex.values() { | ||
if let ConnMultiplex::Connected { chain, nid } = mx { | ||
chain.do_send(RemoveNode(*nid)); | ||
} | ||
} | ||
} | ||
} | ||
|
||
impl NodeConnector { | ||
pub fn new(aggregator: Addr<Aggregator>, locator: Recipient<LocateRequest>, ip: Option<Ipv4Addr>) -> Self { | ||
Self { | ||
// Garbage id, will be replaced by the Initialize message | ||
nid: !0, | ||
multiplex: BTreeMap::new(), | ||
hb: Instant::now(), | ||
aggregator, | ||
chain: None, | ||
backlog: Vec::new(), | ||
ip, | ||
locator, | ||
contbuf: BytesMut::new(), | ||
} | ||
} | ||
|
||
|
@@ -71,58 +96,88 @@ impl NodeConnector { | |
} | ||
|
||
fn handle_message(&mut self, msg: NodeMessage, data: Bytes, ctx: &mut <Self as Actor>::Context) { | ||
if let Some(chain) = self.chain.as_ref() { | ||
chain.do_send(UpdateNode { | ||
nid: self.nid, | ||
msg, | ||
raw: Some(data) | ||
}); | ||
|
||
return; | ||
let conn_id = msg.id.unwrap_or(0); | ||
|
||
match self.multiplex.entry(conn_id).or_default() { | ||
ConnMultiplex::Connected { nid, chain } => { | ||
chain.do_send(UpdateNode { | ||
nid: *nid, | ||
msg, | ||
raw: Some(data), | ||
}); | ||
} | ||
ConnMultiplex::Waiting { backlog } => { | ||
if let Details::SystemConnected(connected) = msg.details { | ||
let SystemConnected { network_id: _, mut node } = connected; | ||
let rec = ctx.address().recipient(); | ||
|
||
// FIXME: Use genesis hash instead of names to avoid this mess | ||
match &*node.chain { | ||
"Kusama CC3" => node.chain = "Kusama".into(), | ||
"Polkadot CC1" => node.chain = "Polkadot".into(), | ||
_ => (), | ||
} | ||
|
||
self.aggregator.do_send(AddNode { rec, conn_id, node }); | ||
} else { | ||
if backlog.len() >= 10 { | ||
backlog.remove(0); | ||
} | ||
|
||
backlog.push(msg); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this is the fix here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The issue was that I had a separate backlog and |
||
} | ||
} | ||
} | ||
} | ||
|
||
if let Details::SystemConnected(connected) = msg.details { | ||
let SystemConnected { network_id: _, mut node } = connected; | ||
let rec = ctx.address().recipient(); | ||
|
||
// FIXME: mergin chains by network_id is not the way to do it. | ||
// This will at least force all CC3 nodes to be aggregated with | ||
// the rest. | ||
let network_id = None; // network_id.map(Into::into); | ||
match &*node.chain { | ||
"Kusama CC3" => node.chain = "Kusama".into(), | ||
"Polkadot CC1" => node.chain = "Polkadot".into(), | ||
_ => (), | ||
} | ||
fn start_frame(&mut self, bytes: &[u8]) { | ||
if !self.contbuf.is_empty() { | ||
log::error!("Unused continuation buffer"); | ||
self.contbuf.clear(); | ||
} | ||
self.continue_frame(bytes); | ||
} | ||
|
||
self.aggregator.do_send(AddNode { rec, network_id, node }); | ||
fn continue_frame(&mut self, bytes: &[u8]) { | ||
if self.contbuf.len() + bytes.len() <= CONT_BUF_LIMIT { | ||
self.contbuf.extend_from_slice(&bytes); | ||
} else { | ||
if self.backlog.len() >= 10 { | ||
self.backlog.remove(0); | ||
} | ||
|
||
self.backlog.push(msg); | ||
log::error!("Continuation buffer overflow"); | ||
self.contbuf = BytesMut::new(); | ||
} | ||
} | ||
|
||
fn finish_frame(&mut self) -> Bytes { | ||
mem::replace(&mut self.contbuf, BytesMut::new()).freeze() | ||
} | ||
} | ||
|
||
#[derive(Message)] | ||
#[rtype(result = "()")] | ||
pub struct Initialize(pub NodeId, pub Addr<Chain>); | ||
pub struct Initialize { | ||
pub nid: NodeId, | ||
pub conn_id: ConnId, | ||
pub chain: Addr<Chain>, | ||
} | ||
|
||
impl Handler<Initialize> for NodeConnector { | ||
type Result = (); | ||
|
||
fn handle(&mut self, msg: Initialize, _: &mut Self::Context) { | ||
let Initialize(nid, chain) = msg; | ||
let backlog = std::mem::replace(&mut self.backlog, Vec::new()); | ||
let Initialize { nid, conn_id, chain } = msg; | ||
|
||
for msg in backlog { | ||
chain.do_send(UpdateNode { nid, msg, raw: None }); | ||
} | ||
let mx = self.multiplex.entry(conn_id).or_default(); | ||
|
||
if let ConnMultiplex::Waiting { backlog } = mx { | ||
for msg in backlog.drain(..) { | ||
chain.do_send(UpdateNode { nid, msg, raw: None }); | ||
} | ||
|
||
self.nid = nid; | ||
self.chain = Some(chain.clone()); | ||
*mx = ConnMultiplex::Connected { | ||
nid, | ||
chain: chain.clone(), | ||
}; | ||
}; | ||
|
||
// Acquire the node's physical location | ||
if let Some(ip) = self.ip { | ||
|
@@ -148,9 +203,19 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for NodeConnector { | |
return; | ||
} | ||
Ok(ws::Message::Nop) => return, | ||
Ok(ws::Message::Continuation(_)) => { | ||
log::error!("Continuation not supported"); | ||
return; | ||
Ok(ws::Message::Continuation(cont)) => match cont { | ||
Item::FirstText(bytes) | Item::FirstBinary(bytes) => { | ||
self.start_frame(&bytes); | ||
return; | ||
} | ||
Item::Continue(bytes) => { | ||
self.continue_frame(&bytes); | ||
return; | ||
} | ||
Item::Last(bytes) => { | ||
self.continue_frame(&bytes); | ||
self.finish_frame() | ||
} | ||
} | ||
Err(error) => { | ||
log::error!("{:?}", error); | ||
|
Uh oh!
There was an error while loading. Please reload this page.