From f9376c9fc510ee0e26e97a5195bf8591fd560c18 Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Fri, 30 Oct 2020 19:03:17 +0100 Subject: [PATCH 1/5] Handle continuation frames --- backend/src/node/connector.rs | 46 ++++++++++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 4 deletions(-) diff --git a/backend/src/node/connector.rs b/backend/src/node/connector.rs index d6f09bb93..cc09c3c3f 100644 --- a/backend/src/node/connector.rs +++ b/backend/src/node/connector.rs @@ -1,9 +1,11 @@ use std::time::{Duration, Instant}; use std::net::Ipv4Addr; +use std::mem; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use actix::prelude::*; use actix_web_actors::ws; +use actix_http::ws::Item; use crate::aggregator::{Aggregator, AddNode}; use crate::chain::{Chain, UpdateNode, RemoveNode}; use crate::node::NodeId; @@ -14,6 +16,8 @@ use crate::util::LocateRequest; const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20); /// How long before lack of client response causes a timeout const CLIENT_TIMEOUT: Duration = Duration::from_secs(60); +/// Continuation buffer limit, 1mb +const CONT_BUF_LIMIT: usize = 1024 * 1024; pub struct NodeConnector { /// Id of the node this connector is responsible for handling @@ -30,6 +34,8 @@ pub struct NodeConnector { ip: Option, /// Actix address of location services locator: Recipient, + /// Buffer for constructing continuation messages + contbuf: BytesMut, } impl Actor for NodeConnector { @@ -57,6 +63,7 @@ impl NodeConnector { backlog: Vec::new(), ip, locator, + contbuf: BytesMut::new(), } } @@ -104,6 +111,27 @@ impl NodeConnector { self.backlog.push(msg); } } + + fn start_frame(&mut self, bytes: &[u8]) { + if !self.contbuf.is_empty() { + log::error!("Unused continuation buffer"); + self.contbuf.clear(); + } + self.continue_frame(bytes); + } + + fn continue_frame(&mut self, bytes: &[u8]) { + if self.contbuf.len() + bytes.len() <= CONT_BUF_LIMIT { + self.contbuf.extend_from_slice(&bytes); + } else { + log::error!("Continuation buffer overflow"); + self.contbuf = BytesMut::new(); + } + } + + fn finish_frame(&mut self) -> Bytes { + mem::replace(&mut self.contbuf, BytesMut::new()).freeze() + } } #[derive(Message)] @@ -148,9 +176,19 @@ impl StreamHandler> for NodeConnector { return; } Ok(ws::Message::Nop) => return, - Ok(ws::Message::Continuation(_)) => { - log::error!("Continuation not supported"); - return; + Ok(ws::Message::Continuation(cont)) => match cont { + Item::FirstText(bytes) | Item::FirstBinary(bytes) => { + self.start_frame(&bytes); + return; + } + Item::Continue(bytes) => { + self.continue_frame(&bytes); + return; + } + Item::Last(bytes) => { + self.continue_frame(&bytes); + self.finish_frame() + } } Err(error) => { log::error!("{:?}", error); From e373221147fef93c702905754591a1951571f385 Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Fri, 30 Oct 2020 19:46:25 +0100 Subject: [PATCH 2/5] Parachain multiplexing MVP --- backend/src/aggregator.rs | 20 +++++-------- backend/src/chain.rs | 14 ++++++--- backend/src/node/connector.rs | 55 ++++++++++++++++++++--------------- backend/src/node/message.rs | 3 +- backend/src/types.rs | 1 + 5 files changed, 53 insertions(+), 40 deletions(-) diff --git a/backend/src/aggregator.rs b/backend/src/aggregator.rs index ff8ab4765..f13466019 100644 --- a/backend/src/aggregator.rs +++ b/backend/src/aggregator.rs @@ -6,7 +6,7 @@ use crate::feed::connector::{FeedConnector, Connected, FeedId}; use crate::util::DenseMap; use crate::feed::{self, FeedMessageSerializer}; use crate::chain::{self, Chain, ChainId, Label, GetNodeNetworkState}; -use crate::types::{NodeDetails, NodeId}; +use crate::types::{ConnId, NodeDetails, NodeId}; pub struct Aggregator { labels: HashMap, @@ -106,8 +106,11 @@ impl Actor for Aggregator { #[derive(Message)] #[rtype(result = "()")] pub struct AddNode { + /// Details of the node being added to the aggregator pub node: NodeDetails, - pub network_id: Option