Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 8 additions & 12 deletions backend/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::feed::connector::{FeedConnector, Connected, FeedId};
use crate::util::DenseMap;
use crate::feed::{self, FeedMessageSerializer};
use crate::chain::{self, Chain, ChainId, Label, GetNodeNetworkState};
use crate::types::{NodeDetails, NodeId};
use crate::types::{ConnId, NodeDetails, NodeId};

pub struct Aggregator {
labels: HashMap<Label, ChainId>,
Expand Down Expand Up @@ -106,8 +106,11 @@ impl Actor for Aggregator {
#[derive(Message)]
#[rtype(result = "()")]
pub struct AddNode {
/// Details of the node being added to the aggregator
pub node: NodeDetails,
pub network_id: Option<Label>,
/// Connection id used by the node connector for multiplexing parachains
pub conn_id: ConnId,
/// Recipient for the initialization message
pub rec: Recipient<Initialize>,
}

Expand Down Expand Up @@ -173,21 +176,14 @@ impl Handler<AddNode> for Aggregator {
type Result = ();

fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
let AddNode { node, network_id, rec } = msg;
let AddNode { node, conn_id, rec } = msg;

let cid = self.lazy_chain(&node.chain, &network_id, ctx);
let cid = self.lazy_chain(&node.chain, &None, ctx);
let chain = self.chains.get_mut(cid).expect("Entry just created above; qed");

if let Some(network_id) = network_id {
// Attach network id to the chain if it was not done yet
if chain.network_id.is_none() {
chain.network_id = Some(network_id.clone());
self.networks.insert(network_id, cid);
}
}

chain.addr.do_send(chain::AddNode {
node,
conn_id,
rec,
});
}
Expand Down
14 changes: 10 additions & 4 deletions backend/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::node::{Node, connector::Initialize, message::{NodeMessage, Details}};
use crate::feed::connector::{FeedId, FeedConnector, Subscribed, Unsubscribed};
use crate::feed::{self, FeedMessageSerializer};
use crate::util::{DenseMap, NumStats, now};
use crate::types::{NodeId, NodeDetails, NodeLocation, Block, Timestamp, BlockNumber};
use crate::types::{ConnId, NodeId, NodeDetails, NodeLocation, Block, Timestamp, BlockNumber};

const STALE_TIMEOUT: u64 = 2 * 60 * 1000; // 2 minutes

Expand Down Expand Up @@ -194,7 +194,11 @@ impl Actor for Chain {
#[derive(Message)]
#[rtype(result = "()")]
pub struct AddNode {
/// Details of the node being added to the aggregator
pub node: NodeDetails,
/// Connection id used by the node connector for multiplexing parachains
pub conn_id: ConnId,
/// Recipient for the initialization message
pub rec: Recipient<Initialize>,
}

Expand Down Expand Up @@ -248,11 +252,13 @@ impl Handler<AddNode> for Chain {
type Result = ();

fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
self.increment_label_count(&msg.node.chain);
let AddNode { node, conn_id, rec } = msg;
self.increment_label_count(&node.chain);

let nid = self.nodes.add(Node::new(msg.node));
let nid = self.nodes.add(Node::new(node));
let chain = ctx.address();

if let Err(_) = msg.rec.do_send(Initialize(nid, ctx.address())) {
if let Err(_) = rec.do_send(Initialize { nid, conn_id, chain }) {
self.nodes.remove(nid);
} else if let Some(node) = self.nodes.get(nid) {
self.serializer.push(feed::AddedNode(nid, node));
Expand Down
97 changes: 72 additions & 25 deletions backend/src/node/connector.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
use std::collections::BTreeMap;
use std::time::{Duration, Instant};
use std::net::Ipv4Addr;
use std::mem;

use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use actix::prelude::*;
use actix_web_actors::ws;
use actix_http::ws::Item;
use crate::aggregator::{Aggregator, AddNode};
use crate::chain::{Chain, UpdateNode, RemoveNode};
use crate::node::NodeId;
use crate::node::message::{NodeMessage, Details, SystemConnected};
use crate::util::LocateRequest;
use crate::types::ConnId;

/// How often heartbeat pings are sent
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20);
/// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(60);
/// Continuation buffer limit, 10mb
const CONT_BUF_LIMIT: usize = 10 * 1024 * 1024;

pub struct NodeConnector {
/// Id of the node this connector is responsible for handling
Expand All @@ -22,14 +28,17 @@ pub struct NodeConnector {
hb: Instant,
/// Aggregator actor address
aggregator: Addr<Aggregator>,
/// Chain actor address
chain: Option<Addr<Chain>>,
/// Mapping message connection id to addresses of chains for multiplexing
/// a node running multiple parachains
chains: BTreeMap<ConnId, Addr<Chain>>,
/// Backlog of messages to be sent once we get a recipient handle to the chain
backlog: Vec<NodeMessage>,
backlogs: BTreeMap<ConnId, Vec<NodeMessage>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR, but is it ok that the backlog is unbounded? Maybe we could re-work this into a bounded queue?
EDIT: I see it is bounded to 10 below. Let's document that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's bounded. I'm more worried about the number of ConnId being unbounded atm.

/// IP address of the node this connector is responsible for
ip: Option<Ipv4Addr>,
/// Actix address of location services
locator: Recipient<LocateRequest>,
/// Buffer for constructing continuation messages
contbuf: BytesMut,
}

impl Actor for NodeConnector {
Expand All @@ -40,7 +49,7 @@ impl Actor for NodeConnector {
}

fn stopped(&mut self, _: &mut Self::Context) {
if let Some(chain) = self.chain.as_ref() {
for chain in self.chains.values() {
chain.do_send(RemoveNode(self.nid));
}
}
Expand All @@ -53,10 +62,11 @@ impl NodeConnector {
nid: !0,
hb: Instant::now(),
aggregator,
chain: None,
backlog: Vec::new(),
chains: BTreeMap::new(),
backlogs: BTreeMap::new(),
ip,
locator,
contbuf: BytesMut::new(),
}
}

Expand All @@ -71,7 +81,9 @@ impl NodeConnector {
}

fn handle_message(&mut self, msg: NodeMessage, data: Bytes, ctx: &mut <Self as Actor>::Context) {
if let Some(chain) = self.chain.as_ref() {
let conn_id = msg.id.unwrap_or(0);

if let Some(chain) = self.chains.get(&conn_id) {
chain.do_send(UpdateNode {
nid: self.nid,
msg,
Expand All @@ -85,44 +97,69 @@ impl NodeConnector {
let SystemConnected { network_id: _, mut node } = connected;
let rec = ctx.address().recipient();

// FIXME: mergin chains by network_id is not the way to do it.
// This will at least force all CC3 nodes to be aggregated with
// the rest.
let network_id = None; // network_id.map(Into::into);
// FIXME: Use genesis hash instead of names to avoid this mess
match &*node.chain {
"Kusama CC3" => node.chain = "Kusama".into(),
"Polkadot CC1" => node.chain = "Polkadot".into(),
_ => (),
}

self.aggregator.do_send(AddNode { rec, network_id, node });
self.aggregator.do_send(AddNode { rec, conn_id, node });
} else {
if self.backlog.len() >= 10 {
self.backlog.remove(0);
let backlog = self.backlogs.entry(conn_id).or_default();

if backlog.len() >= 10 {
backlog.remove(0);
}

self.backlog.push(msg);
backlog.push(msg);
}
}

fn start_frame(&mut self, bytes: &[u8]) {
if !self.contbuf.is_empty() {
log::error!("Unused continuation buffer");
self.contbuf.clear();
}
self.continue_frame(bytes);
}

fn continue_frame(&mut self, bytes: &[u8]) {
if self.contbuf.len() + bytes.len() <= CONT_BUF_LIMIT {
self.contbuf.extend_from_slice(&bytes);
} else {
log::error!("Continuation buffer overflow");
self.contbuf = BytesMut::new();
}
}

fn finish_frame(&mut self) -> Bytes {
mem::replace(&mut self.contbuf, BytesMut::new()).freeze()
}
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct Initialize(pub NodeId, pub Addr<Chain>);
pub struct Initialize {
pub nid: NodeId,
pub conn_id: ConnId,
pub chain: Addr<Chain>,
}

impl Handler<Initialize> for NodeConnector {
type Result = ();

fn handle(&mut self, msg: Initialize, _: &mut Self::Context) {
let Initialize(nid, chain) = msg;
let backlog = std::mem::replace(&mut self.backlog, Vec::new());
let Initialize { nid, conn_id, chain } = msg;

for msg in backlog {
chain.do_send(UpdateNode { nid, msg, raw: None });
if let Some(backlog) = self.backlogs.remove(&conn_id) {
for msg in backlog {
chain.do_send(UpdateNode { nid, msg, raw: None });
}
}

self.nid = nid;
self.chain = Some(chain.clone());
self.chains.insert(conn_id, chain.clone());

// Acquire the node's physical location
if let Some(ip) = self.ip {
Expand All @@ -148,9 +185,19 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for NodeConnector {
return;
}
Ok(ws::Message::Nop) => return,
Ok(ws::Message::Continuation(_)) => {
log::error!("Continuation not supported");
return;
Ok(ws::Message::Continuation(cont)) => match cont {
Item::FirstText(bytes) | Item::FirstBinary(bytes) => {
self.start_frame(&bytes);
return;
}
Item::Continue(bytes) => {
self.continue_frame(&bytes);
return;
}
Item::Last(bytes) => {
self.continue_frame(&bytes);
self.finish_frame()
}
}
Err(error) => {
log::error!("{:?}", error);
Expand Down
3 changes: 2 additions & 1 deletion backend/src/node/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ use chrono::{DateTime, Utc};
use serde::Deserialize;
use serde::de::IgnoredAny;
use crate::node::NodeDetails;
use crate::types::{Block, BlockNumber, BlockHash};
use crate::types::{Block, BlockNumber, BlockHash, ConnId};

#[derive(Deserialize, Debug, Message)]
#[rtype(result = "()")]
pub struct NodeMessage {
pub level: Level,
pub ts: DateTime<Utc>,
pub id: Option<ConnId>,
#[serde(flatten)]
pub details: Details,
}
Expand Down
1 change: 1 addition & 0 deletions backend/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use serde::Deserialize;
use crate::util::{MeanList, now};

pub type NodeId = usize;
pub type ConnId = u64;
pub type BlockNumber = u64;
pub type Timestamp = u64;
pub type Address = Box<str>;
Expand Down