Skip to content

Commit f2a00c1

Browse files
A0-4817: Improve memory consumption in clique network (#1953) (#1955)
# Description Attempts at fixing the memory consumption in clique network. Current approach: Limit number of messages to send in clique network ## Type of change - Bug fix (non-breaking change which fixes an issue # Checklist: --------- Co-authored-by: Marcin <[email protected]>
1 parent 22ea4bd commit f2a00c1

File tree

9 files changed

+43
-45
lines changed

9 files changed

+43
-45
lines changed

.github/workflows/_build-aleph-e2e-client.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ jobs:
3838
uses: Cardinal-Cryptography/github-actions/get-ref-properties@v7
3939

4040
- name: Install Rust toolchain
41-
uses: Cardinal-Cryptography/github-actions/install-rust-toolchain@v7
41+
uses: Cardinal-Cryptography/github-actions/install-rust-toolchain@v7.3.0
4242
with:
43-
targets: wasm32-unknown-unknown
43+
directory: './e2e-tests/'
4444

4545
- name: Build aleph-e2e-client
4646
shell: bash

.github/workflows/_build-and-push-cliain.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ jobs:
2626
uses: actions/checkout@v4
2727

2828
- name: Install Rust toolchain
29-
uses: Cardinal-Cryptography/github-actions/install-rust-toolchain@v7
29+
uses: Cardinal-Cryptography/github-actions/[email protected]
30+
with:
31+
directory: './bin/cliain/'
3032

3133
- name: ${{ env.CARGO_COMMAND }} cliain binary
3234
run: |

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bin/node/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "aleph-node"
3-
version = "15.0.0"
3+
version = "15.1.0"
44
description = "Aleph node binary"
55
build = "build.rs"
66
license = "GPL-3.0-or-later"

clique/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ pub use crypto::{PublicKey, SecretKey};
2626
pub use rate_limiting::{RateLimitingDialer, RateLimitingListener};
2727
pub use service::{Service, SpawnHandleExt, SpawnHandleT};
2828

29+
const SEND_DATA_BUFFER: usize = 10;
30+
2931
const LOG_TARGET: &str = "network-clique";
3032
/// A basic alias for properties we expect basic data to satisfy.
3133
pub trait Data: Clone + Codec + Send + Sync + 'static {}

clique/src/manager/mod.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ use direction::DirectedPeers;
1515
pub enum SendError {
1616
/// Outgoing network connection closed
1717
ConnectionClosed,
18+
/// The underlying network is not keeping up with sending.
19+
FullChannel,
1820
/// Peer not added to the manager
1921
PeerNotFound,
2022
}
@@ -24,6 +26,7 @@ impl Display for SendError {
2426
use SendError::*;
2527
match self {
2628
ConnectionClosed => write!(f, "worker dead"),
29+
FullChannel => write!(f, "too many messages"),
2730
PeerNotFound => write!(f, "peer not found"),
2831
}
2932
}
@@ -158,7 +161,7 @@ pub struct Manager<PK: PublicKey + PeerId, A: Data, D: Data> {
158161
// Which peers we want to be connected with, and which way.
159162
wanted: DirectedPeers<PK, A>,
160163
// This peers we are connected with. We ensure that this is always a subset of what we want.
161-
have: HashMap<PK, mpsc::UnboundedSender<D>>,
164+
have: HashMap<PK, mpsc::Sender<D>>,
162165
}
163166

164167
impl<PK: PublicKey + PeerId, A: Data, D: Data> Manager<PK, A, D> {
@@ -192,11 +195,7 @@ impl<PK: PublicKey + PeerId, A: Data, D: Data> Manager<PK, A, D> {
192195
}
193196

194197
/// Add an established connection with a known peer, but only if the peer is among the peers we want to be connected to.
195-
pub fn add_connection(
196-
&mut self,
197-
peer_id: PK,
198-
data_for_network: mpsc::UnboundedSender<D>,
199-
) -> AddResult {
198+
pub fn add_connection(&mut self, peer_id: PK, data_for_network: mpsc::Sender<D>) -> AddResult {
200199
use AddResult::*;
201200
if !self.wanted.interested(&peer_id) {
202201
return Uninterested;
@@ -219,10 +218,13 @@ impl<PK: PublicKey + PeerId, A: Data, D: Data> Manager<PK, A, D> {
219218
/// or if the connection is dead.
220219
pub fn send_to(&mut self, peer_id: &PK, data: D) -> Result<(), SendError> {
221220
self.have
222-
.get(peer_id)
221+
.get_mut(peer_id)
223222
.ok_or(SendError::PeerNotFound)?
224-
.unbounded_send(data)
225-
.map_err(|_| SendError::ConnectionClosed)
223+
.try_send(data)
224+
.map_err(|e| match e.is_full() {
225+
true => SendError::FullChannel,
226+
false => SendError::ConnectionClosed,
227+
})
226228
}
227229

228230
/// A status of the manager, to be displayed somewhere.
@@ -243,6 +245,7 @@ mod tests {
243245
use crate::{
244246
metrics::Metrics,
245247
mock::{key, MockPublicKey},
248+
SEND_DATA_BUFFER,
246249
};
247250

248251
type Data = String;
@@ -286,7 +289,7 @@ mod tests {
286289
Manager::<MockPublicKey, Address, Data>::new(listening_id.clone(), Metrics::noop());
287290
let data = String::from("DATA");
288291
let address = String::from("43.43.43.43:43000");
289-
let (tx, _rx) = mpsc::unbounded();
292+
let (tx, _rx) = mpsc::channel(SEND_DATA_BUFFER);
290293
// try add unknown peer
291294
assert_eq!(
292295
connecting_manager.add_connection(listening_id.clone(), tx),
@@ -308,7 +311,7 @@ mod tests {
308311
assert!(connecting_manager.add_peer(listening_id.clone(), address.clone()));
309312
}
310313
// add outgoing to connecting
311-
let (tx, mut rx) = mpsc::unbounded();
314+
let (tx, mut rx) = mpsc::channel(SEND_DATA_BUFFER);
312315
assert_eq!(
313316
connecting_manager.add_connection(listening_id.clone(), tx),
314317
Added
@@ -319,7 +322,7 @@ mod tests {
319322
.is_ok());
320323
assert_eq!(data, rx.next().await.expect("should receive"));
321324
// add incoming to listening
322-
let (tx, mut rx) = mpsc::unbounded();
325+
let (tx, mut rx) = mpsc::channel(SEND_DATA_BUFFER);
323326
assert_eq!(
324327
listening_manager.add_connection(connecting_id.clone(), tx),
325328
Added

clique/src/protocols/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pub type Version = u32;
2020
/// What connections send back to the service after they become established. Starts with a public
2121
/// key of the remote node, followed by a channel for sending data to that node, with None if the
2222
/// connection was unsuccessful and should be reestablished.
23-
pub type ResultForService<PK, D> = (PK, Option<mpsc::UnboundedSender<D>>);
23+
pub type ResultForService<PK, D> = (PK, Option<mpsc::Sender<D>>);
2424

2525
/// Defines the protocol for communication. Currently single variant, but left in case of protocol change.
2626
#[derive(Debug, PartialEq, Eq)]

clique/src/protocols/v1/mod.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::{
1616
handshake::{v0_handshake_incoming, v0_handshake_outgoing},
1717
ProtocolError, ResultForService,
1818
},
19-
Data, PublicKey, SecretKey, Splittable, LOG_TARGET,
19+
Data, PublicKey, SecretKey, Splittable, LOG_TARGET, SEND_DATA_BUFFER,
2020
};
2121

2222
const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
@@ -43,7 +43,7 @@ async fn check_authorization<SK: SecretKey>(
4343

4444
async fn sending<PK: PublicKey, D: Data, S: AsyncWrite + Unpin + Send>(
4545
mut sender: S,
46-
mut data_from_user: mpsc::UnboundedReceiver<D>,
46+
mut data_from_user: mpsc::Receiver<D>,
4747
) -> Result<(), ProtocolError<PK>> {
4848
use Message::*;
4949
loop {
@@ -94,7 +94,7 @@ async fn manage_connection<
9494
>(
9595
sender: S,
9696
receiver: R,
97-
data_from_user: mpsc::UnboundedReceiver<D>,
97+
data_from_user: mpsc::Receiver<D>,
9898
data_for_user: mpsc::UnboundedSender<D>,
9999
) -> Result<(), ProtocolError<PK>> {
100100
let sending = sending(sender, data_from_user);
@@ -122,7 +122,7 @@ pub async fn outgoing<SK: SecretKey, D: Data, S: Splittable>(
122122
target: LOG_TARGET,
123123
"Outgoing handshake with {} finished successfully.", public_key
124124
);
125-
let (data_for_network, data_from_user) = mpsc::unbounded();
125+
let (data_for_network, data_from_user) = mpsc::channel(SEND_DATA_BUFFER);
126126
result_for_parent
127127
.unbounded_send((public_key.clone(), Some(data_for_network)))
128128
.map_err(|_| ProtocolError::NoParentConnection)?;
@@ -160,7 +160,7 @@ pub async fn incoming<SK: SecretKey, D: Data, S: Splittable>(
160160
return Err(ProtocolError::NotAuthorized);
161161
}
162162

163-
let (data_for_network, data_from_user) = mpsc::unbounded();
163+
let (data_for_network, data_from_user) = mpsc::channel(SEND_DATA_BUFFER);
164164
result_for_parent
165165
.unbounded_send((public_key.clone(), Some(data_for_network)))
166166
.map_err(|_| ProtocolError::NoParentConnection)?;
@@ -288,12 +288,12 @@ mod tests {
288288
_ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"),
289289
result = result_from_outgoing.next() => {
290290
let (_, maybe_data_for_outgoing) = result.expect("the channel shouldn't be dropped");
291-
let data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected");
291+
let mut data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected");
292292
data_for_outgoing
293-
.unbounded_send(vec![4, 3, 43])
293+
.try_send(vec![4, 3, 43])
294294
.expect("should send");
295295
data_for_outgoing
296-
.unbounded_send(vec![2, 1, 3, 7])
296+
.try_send(vec![2, 1, 3, 7])
297297
.expect("should send");
298298
data_for_outgoing
299299
},
@@ -303,12 +303,12 @@ mod tests {
303303
_ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"),
304304
result = result_from_incoming.next() => {
305305
let (_, maybe_data_for_incoming) = result.expect("the channel shouldn't be dropped");
306-
let data_for_incoming = maybe_data_for_incoming.expect("successfully connected");
306+
let mut data_for_incoming = maybe_data_for_incoming.expect("successfully connected");
307307
data_for_incoming
308-
.unbounded_send(vec![5, 4, 44])
308+
.try_send(vec![5, 4, 44])
309309
.expect("should send");
310310
data_for_incoming
311-
.unbounded_send(vec![3, 2, 4, 8])
311+
.try_send(vec![3, 2, 4, 8])
312312
.expect("should send");
313313
data_for_incoming
314314
},
@@ -426,9 +426,9 @@ mod tests {
426426
_ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"),
427427
result = result_from_outgoing.next() => {
428428
let (_, maybe_data_for_outgoing) = result.expect("the channel shouldn't be dropped");
429-
let data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected");
429+
let mut data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected");
430430
data_for_outgoing
431-
.unbounded_send(vec![2, 1, 3, 7])
431+
.try_send(vec![2, 1, 3, 7])
432432
.expect("should send");
433433
data_for_outgoing
434434
},

clique/src/service.rs

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@ use std::{
55
};
66

77
use futures::{
8-
channel::{
9-
mpsc::{self, UnboundedSender},
10-
oneshot,
11-
},
8+
channel::{mpsc, oneshot},
129
Future, StreamExt, TryFutureExt,
1310
};
1411
use log::{info, trace, warn};
@@ -250,18 +247,15 @@ where
250247
fn add_connection(
251248
&mut self,
252249
public_key: SK::PublicKey,
253-
data_for_network: mpsc::UnboundedSender<D>,
250+
data_for_network: mpsc::Sender<D>,
254251
) -> AddResult {
255252
self.manager.add_connection(public_key, data_for_network)
256253
}
257254

258255
fn handle_command(
259256
&mut self,
260257
command: ServiceCommand<<SK as SecretKey>::PublicKey, D, A>,
261-
result_for_parent: &UnboundedSender<(
262-
<SK as SecretKey>::PublicKey,
263-
Option<UnboundedSender<D>>,
264-
)>,
258+
result_for_parent: &mpsc::UnboundedSender<ResultForService<SK::PublicKey, D>>,
265259
) {
266260
use ServiceCommand::*;
267261
match command {
@@ -306,11 +300,8 @@ where
306300
fn handle_data_for_network(
307301
&mut self,
308302
public_key: <SK as SecretKey>::PublicKey,
309-
maybe_data_for_network: Option<UnboundedSender<D>>,
310-
result_for_parent: &UnboundedSender<(
311-
<SK as SecretKey>::PublicKey,
312-
Option<UnboundedSender<D>>,
313-
)>,
303+
maybe_data_for_network: Option<mpsc::Sender<D>>,
304+
result_for_parent: &mpsc::UnboundedSender<ResultForService<SK::PublicKey, D>>,
314305
) {
315306
use AddResult::*;
316307
match maybe_data_for_network {

0 commit comments

Comments
 (0)