Skip to content
Merged

tw/tcp #3896

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
8fac768
Import modified version from timeboost.
twittner Jan 15, 2026
ca620d8
WIP
twittner Jan 16, 2026
723645a
add cliquenet to test matrices, fmt
rob-maron Jan 16, 2026
bc6ddd9
Bring back Noise protocol.
twittner Jan 16, 2026
0ca7868
Run rustfmt and use `snow` with `ring-accelerated`.
twittner Jan 19, 2026
f4915fb
Use current rust edition.
twittner Jan 19, 2026
1fbbb8f
Address clippy warnings.
twittner Jan 19, 2026
45e455e
Remove unused import.
twittner Jan 19, 2026
05af88a
Remove useless conversion.
twittner Jan 19, 2026
b28dfb6
Merge branch 'main' into tw/tcp
twittner Jan 19, 2026
1311b37
Add more `Data` conversions.
twittner Jan 20, 2026
d3c6278
Without noise and higher frame size.
twittner Jan 20, 2026
7dc95b2
Noise with AES-GCM.
twittner Jan 20, 2026
756dbf4
Run rustfmt.
twittner Jan 20, 2026
2d9d87b
Adjust benchmark's port numbers.
twittner Jan 20, 2026
a749243
Use portpicker in benchmark.
twittner Jan 20, 2026
bd233ad
Make max. message size configurable.
twittner Jan 21, 2026
72da3f2
Fix conversion.
twittner Jan 21, 2026
39444d2
change default `MAX_MESSAGE_SIZE`
rob-maron Jan 21, 2026
59e76f7
Restore original cliquenet and use external locking.
twittner Jan 22, 2026
89302bb
Add `ViewNumber` parameter to `ConnectedNetwork`.
twittner Jan 22, 2026
0a30bdc
Reduce default channel capacities.
twittner Jan 22, 2026
34c50cb
Import Arc.
twittner Jan 22, 2026
74b7cf6
Merge branch 'main' into tw/tcp
twittner Jan 22, 2026
9a008a6
Back to internal locking.
twittner Jan 22, 2026
9f98cc3
Move `NetConf` to lib and use its retry_delays.
twittner Jan 22, 2026
d84c01c
Fix clippy warnings.
twittner Jan 22, 2026
eb07e03
Simplify delay access.
twittner Jan 23, 2026
f5347e8
Add metrics again behind a feature flag.
twittner Jan 23, 2026
bf53e9d
Merge branch 'main' into tw/tcp
twittner Jan 23, 2026
db89427
Encode `Header` and `Trailer` more efficiently.
twittner Jan 26, 2026
dabb889
Add property test for trailer encoding/decoding.
twittner Jan 26, 2026
b824423
Simplify.
twittner Jan 26, 2026
42fd7da
Small refactoring.
twittner Jan 26, 2026
328726c
Merge branch 'main' into tw/tcp
twittner Jan 27, 2026
0a61fd3
Merge branch 'main' into tw/tcp
twittner Feb 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
328 changes: 313 additions & 15 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ members = [
"contracts/rust/diff-test",
"contracts/rust/gen-vk-contract",
"crates/builder",
"crates/cliquenet",
"crates/hotshot-builder/legacy",
"crates/hotshot-builder/refactored",
"crates/hotshot/builder-api",
Expand Down Expand Up @@ -56,6 +57,7 @@ default-members = [
"contracts/rust/diff-test",
"contracts/rust/gen-vk-contract",
"crates/builder",
"crates/cliquenet",
"crates/hotshot/builder-api",
"crates/hotshot/example-types",
"crates/hotshot/examples",
Expand Down Expand Up @@ -118,14 +120,18 @@ async-once-cell = "0.5"
async-trait = "0.1"
base64 = "0.22"
base64-bytes = "0.1"
bimap = "0.6.3"
bincode = "1.3.3"
bitvec = { version = "1", features = ["serde"] }
blake3 = "1.5"
bytes = { version = "1.11.0", features = ["serde"] }
bon = "3.8.2"
cbor4ii = { version = "1.0", features = ["serde1"] }
chrono = { version = "0.4", features = ["serde"] }
circular-buffer = "0.1.9"
clap = { version = "4.4", features = ["derive", "env", "string"] }
cld = "0.5"
cliquenet = { path = "crates/cliquenet" }
dashmap = "6"
derive_builder = "0.20"
derive_more = { version = "2.0", features = ["full"] }
Expand All @@ -144,8 +150,10 @@ libp2p-swarm-derive = { version = "0.35" }
moka = { version = "0.12.12", features = ["future"] }
memoize = { version = "0.4", features = ["full"] }
multiaddr = { version = "0.18" }
nohash-hasher = "0.2.0"
num_cpus = "1"
parking_lot = { version = "0.12", features = ["send_guard"] }
quickcheck = "1.0"
semver = "1"
serde-inline-default = "0.2"
serde_bytes = { version = "0.11" }
Expand Down
36 changes: 36 additions & 0 deletions crates/cliquenet/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[package]
name = "cliquenet"
version = "0.1.0"
edition = "2024"
description = "A fully connected mesh network"

[features]
metrics = ["dep:hotshot-types"]

[dependencies]
bon = { workspace = true }
bytes = { workspace = true }
bimap = "0.6.3"
bs58 = "0.5.1"
ed25519-compact = "2.2.0"
nohash-hasher = { workspace = true }
parking_lot = { workspace = true }
rand = "0.9.2"
serde = { workspace = true }
serde_bytes = { workspace = true }
snow = { version = "0.10.0", features = ["ring-accelerated"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
# optional:
hotshot-types = { workspace = true, optional = true }

[dev-dependencies]
criterion = "0.8.1"
portpicker = { workspace = true }
quickcheck = { workspace = true }
tracing-subscriber = { workspace = true }

[[bench]]
name = "bench1"
harness = false
82 changes: 82 additions & 0 deletions crates/cliquenet/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Networking

The networking layer operates over TCP, using the [Noise protocol framework][noise]
to create secure, authenticated links between parties.

Creating a network requires

- a `Keypair` whose public key identifies a party,
- an address to accept inbound connections on, and
- a set of `(PublicKey, Address)` pairs, comprising all parties that want to communicate.

Once created, binary data can be sent to individual parties, addressed by their
`PublicKey`, or to all parties. Applications can also await receiving data from a party.
For details regarding the API, see [`Network`].

## Architecture

When a [`Network`] is created it spawns a server task, that binds a TCP listener to the
provided address and starts accepting connections. It also immediately creates tasks
to connect to each party (except to itself), using the provided address. It then
enters the main event loop which handles task creation and termination. Connections
undergo a series of stages.

### Accepting an inbound connection

If the TCP listener accepts a new inbound connection it creates a handshake task which
attempts to perform a Noise handshake which involves a Diffie-Hellman key exchange and
-- if successful -- results in an authenticated and secure link with an identified
peer.

### Connect task

A connect task will indefinitely try to establish a TCP connection to a single peer.
Between connection attempts it waits for an increasing amount of time, but no more
than 30s. If the connection has been established, the task will also perform a Noise
handshake with the remote party.

If either the handshake task or the connect task finish successfully, the connection
is ready to be used for the actual exchange of application data.

### IP address check

If a party's address is an IP address, we also check that the remote peer address is
actually the one given. For domain names, no such check takes place.

### Simultaneous connects

Given that all parties try to connect to each other, a network node may accept a
connection it has already established through its own connect task, or vice versa.
A node uses the order of public keys to decide which connection to keep, should
two connections exist at the same time, i.e. given two connections to the same
peer a node drops the one whose associated public key is smaller than its own.

### I/O tasks

After successful connection establishment, two tasks are created, one to continuously
read incoming data and one to send application data. The data is split and encrypted
into frames of 64 KiB (the maximum size of a Noise package) or less. Failure of either
task results in the termination of both and a new connect task is created to
re-establish the connection.

### Heartbeats and latency measurements

In addition to application data, a network node periodically sends a PING frame and
expects a PONG frame. When a PONG is received the embedded timestamp is used to
measure the network RTT. In addition, whenever a PING frame has been sent, a countdown
timer is started (if not already running) which will cause the connection to be dropped
if finished. Any data that is subsequently received will stop the countdown. This
mechanism is used like a heartbeat to ensure the remote peer is alive and responding.

### Channels

Communication between the various tasks proceeds over MPSC (multi producer, single
consumer) channels. When application code wishes to send data, it sends them over
the channel to the main event loop, which will forward the data over another MPSC
channel to the respective write task. The capacity of every channel is bounded.
If the one the application uses is full, backpressure is exercised, i.e. the
application has to wait. This can happen for example, if no connection is available
for some time. The channel to an I/O write task is also bounded, but if full, the
oldest item will be dropped.

[noise]: https://noiseprotocol.org/
196 changes: 196 additions & 0 deletions crates/cliquenet/benches/bench1.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
use std::{collections::HashMap, io, net::Ipv4Addr, sync::LazyLock, time::Duration};

use cliquenet::{Address, Keypair, MAX_MESSAGE_SIZE, NetConf, NetworkError, PublicKey, Retry};
use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
#[cfg(feature = "metrics")]
use hotshot_types::traits::metrics::NoMetrics;
use rand::RngCore;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
runtime::Runtime,
time::sleep,
};

const A: u8 = 0;
const B: u8 = 1;

const SIZES: &[usize] = &[
128 * 1024,
512 * 1024,
1024 * 1024,
5 * 1024 * 1024,
MAX_MESSAGE_SIZE,
];

static DATA: LazyLock<HashMap<usize, Vec<u8>>> = LazyLock::new(|| {
let mut g = rand::rng();
HashMap::from_iter(SIZES.iter().map(|n| {
let mut v = vec![0; *n];
g.fill_bytes(&mut v);
(*n, v)
}))
});

async fn setup_tcp() -> (TcpStream, TcpStream) {
let l = TcpListener::bind("127.0.0.1:0").await.unwrap();
let a = l.local_addr().unwrap();
let (a, b) = tokio::join!(l.accept(), TcpStream::connect(a));
let a = a.unwrap().0;
let b = b.unwrap();
a.set_nodelay(true).unwrap();
b.set_nodelay(true).unwrap();
(a, b)
}

async fn setup_cliquenet() -> (Retry<u8>, Retry<u8>) {
let a = Keypair::generate().unwrap();
let b = Keypair::generate().unwrap();

let all: [(u8, PublicKey, Address); 2] = [
(
A,
a.public_key(),
Address::from((
Ipv4Addr::from([127, 0, 0, 1]),
portpicker::pick_unused_port().unwrap(),
)),
),
(
B,
b.public_key(),
Address::from((
Ipv4Addr::from([127, 0, 0, 1]),
portpicker::pick_unused_port().unwrap(),
)),
),
];

let net_a = Retry::create({
let cfg = NetConf::builder()
.name("bench")
.label(A)
.keypair(a)
.bind(all[0].2.clone())
.parties(all.clone());
#[cfg(not(feature = "metrics"))]
{
cfg.build()
}
#[cfg(feature = "metrics")]
{
cfg.metrics(Box::new(NoMetrics)).build()
}
})
.await
.unwrap();

let net_b = Retry::create({
let cfg = NetConf::builder()
.name("bench")
.label(B)
.keypair(b)
.bind(all[1].2.clone())
.parties(all.clone());
#[cfg(not(feature = "metrics"))]
{
cfg.build()
}
#[cfg(feature = "metrics")]
{
cfg.metrics(Box::new(NoMetrics)).build()
}
})
.await
.unwrap();

(net_a, net_b)
}

async fn tcp(size: usize, srv: &mut TcpStream, clt: &mut TcpStream) {
async fn echo_server(stream: &mut TcpStream) -> io::Result<()> {
let len = stream.read_u32().await?;
let mut v = vec![0; len as usize];
stream.read_exact(&mut v).await?;
stream.write_u32(len).await?;
stream.write_all(&v).await
}

async fn echo_client(stream: &mut TcpStream, d: Vec<u8>) -> io::Result<()> {
stream.write_u32(d.len() as u32).await?;
stream.write_all(&d).await?;
let len = stream.read_u32().await?;
assert_eq!(len as usize, d.len());
let mut v = vec![0; len as usize];
stream.read_exact(&mut v).await?;
assert_eq!(&*v, &*d);
Ok(())
}

let dat = DATA[&size].clone();
let (ra, rb) = tokio::join!(echo_server(srv), echo_client(clt, dat));
ra.unwrap();
rb.unwrap();
}

async fn cliquenet(to: u8, size: usize, srv: &mut Retry<u8>, clt: &mut Retry<u8>) {
async fn echo_server(net: &mut Retry<u8>) -> Result<(), NetworkError> {
let (src, data) = net.receive().await?;
let _ = net.unicast(src, 0, data.into()).await?;
Ok(())
}

async fn echo_client(to: u8, net: &mut Retry<u8>, d: Vec<u8>) -> Result<(), NetworkError> {
let _ = net.unicast(to, 0, d.clone()).await?;
let (src, data) = net.receive().await?;
assert_eq!(src, to);
assert_eq!(&*data, &*d);
Ok(())
}

let dat = DATA[&size].clone();
let fa = echo_server(srv);
let fb = echo_client(to, clt, dat);
let (ra, rb) = tokio::join!(fa, fb);
ra.unwrap();
rb.unwrap();
}

fn bench_tcp(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let (mut srv, mut clt) = rt.block_on(setup_tcp());
let mut group = c.benchmark_group("tcp");
for n in SIZES {
group
.throughput(Throughput::Bytes(*n as u64))
.bench_with_input(
BenchmarkId::from_parameter(format!("{}k", n / 1024)),
n,
|b, n| b.iter(|| rt.block_on(tcp(*n, &mut srv, &mut clt))),
);
}
group.finish();
}

fn bench_cliquenet(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let (mut srv, mut clt) = rt.block_on(async {
let (a, b) = setup_cliquenet().await;
sleep(Duration::from_secs(3)).await;
(a, b)
});
let mut group = c.benchmark_group("cliquenet");
for n in SIZES {
group
.throughput(Throughput::Bytes(*n as u64))
.bench_with_input(
BenchmarkId::from_parameter(format!("{}k", n / 1024)),
n,
|b, n| b.iter(|| rt.block_on(cliquenet(A, *n, &mut srv, &mut clt))),
);
}
group.finish();
}

criterion_group!(benches, bench_tcp, bench_cliquenet);
criterion_main!(benches);
Loading
Loading