Skip to content

Commit 1bc53dd

Browse files
committed
Use externally provided block numbers.
The current way to have separate block number counters in nodes can not provide consistent block numbers. If a counter misses just a single increment, the node is permanently out of step with the others. We could potentially do away with block numbers altogether and just deliver certified blocks in the order they are given to us, however this complicates evidence handling, catch-up and GC in certifiers. The approach taken here is to require block numbers as input, i.e. as part of the block protobuf file. The block provider needs to ensure that block numbers are consistent, consecutive and strictly monotonically increasing.
1 parent dbcdfbd commit 1bc53dd

File tree

11 files changed

+243
-140
lines changed

11 files changed

+243
-140
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sailfish-consensus/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ where
157157
keypair,
158158
state: State::Startup,
159159
clock: ConsensusTime(Default::default()),
160-
nodes: NodeInfo::new(&committee),
160+
nodes: NodeInfo::new(&committee, committee.quorum_size()),
161161
dag: Dag::new(committee.size()),
162162
round: RoundNumber::genesis(),
163163
committed_round: RoundNumber::genesis(),

sailfish-types/src/nodeinfo.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::num::NonZeroUsize;
2+
13
use multisig::{Committee, PublicKey};
24

35
#[derive(Debug)]
@@ -7,10 +9,10 @@ pub struct NodeInfo<T> {
79
}
810

911
impl<T: Default + PartialOrd> NodeInfo<T> {
10-
pub fn new(c: &Committee) -> Self {
12+
pub fn new(c: &Committee, q: NonZeroUsize) -> Self {
1113
Self {
1214
nodes: c.parties().map(|k| (*k, T::default())).collect(),
13-
quorum: c.quorum_size().get(),
15+
quorum: q.get(),
1416
}
1517
}
1618

tests/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@ rust-version.workspace = true
88
[dependencies]
99
alloy = { workspace = true }
1010
async-trait = { workspace = true }
11+
blake3 = { workspace = true }
1112
bytes = { workspace = true }
1213
cliquenet = { path = "../cliquenet", features = ["turmoil"] }
1314
committable = { workspace = true }
1415
crossbeam-queue = { workspace = true }
1516
futures = { workspace = true }
1617
metrics = { path = "../metrics" }
1718
multisig = { path = "../multisig" }
19+
parking_lot = { workspace = true }
1820
portpicker = { workspace = true }
1921
rand = { workspace = true }
2022
sailfish = { path = "../sailfish", features = ["test"] }

tests/src/tests/timeboost.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ where
118118
.sign_keypair(kpair)
119119
.dh_keypair(xpair)
120120
.address(pa)
121+
.recover(recover_index.map(|r| r == i).unwrap_or(false))
121122
.committee(produce_committee.clone())
122123
.build();
123124
enc_keys.push(enc_key);
Lines changed: 82 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,27 @@
1+
use std::collections::HashMap;
12
use std::iter::once;
23
use std::num::NonZeroUsize;
4+
use std::sync::Arc;
5+
use std::sync::atomic::{AtomicU64, Ordering};
36
use std::time::Duration;
47

8+
use alloy::eips::Encodable2718;
59
use bytes::Bytes;
610
use metrics::NoMetrics;
711
use multisig::Certificate;
8-
use timeboost_builder::Certifier;
9-
use timeboost_sequencer::{Output, Sequencer};
10-
use timeboost_types::{Block, BlockInfo};
12+
use parking_lot::Mutex;
13+
use timeboost::builder::Certifier;
14+
use timeboost::sequencer::{Output, Sequencer};
15+
use timeboost::types::sailfish::RoundNumber;
16+
use timeboost::types::{Block, BlockInfo, BlockNumber, Transaction};
1117
use timeboost_utils::types::logging::init_logging;
1218
use tokio::select;
1319
use tokio::sync::broadcast::error::RecvError;
1420
use tokio::sync::{broadcast, mpsc};
15-
use tokio::task::JoinSet;
1621
use tokio::time::sleep;
1722
use tokio_util::sync::CancellationToken;
18-
use tracing::info;
23+
use tokio_util::task::TaskTracker;
24+
use tracing::{debug, error, info};
1925

2026
use super::{gen_bundles, make_configs};
2127

@@ -27,40 +33,57 @@ async fn block_order() {
2733
init_logging();
2834

2935
let num = NonZeroUsize::new(5).unwrap();
36+
let quorum = 4;
3037
let (enc_keys, cfg) = make_configs(num, RECOVER_INDEX);
3138

3239
let mut rxs = Vec::new();
33-
let mut tasks = JoinSet::new();
40+
let tasks = TaskTracker::new();
3441
let (bcast, _) = broadcast::channel(3);
3542
let finish = CancellationToken::new();
43+
let round2block = Arc::new(Round2Block::new());
3644

3745
for (c, b) in cfg {
3846
let (tx, rx) = mpsc::unbounded_channel();
3947
let mut brx = bcast.subscribe();
4048
let finish = finish.clone();
49+
let label = c.sign_keypair().public_key();
50+
let r2b = round2block.clone();
4151
tasks.spawn(async move {
4252
if c.is_recover() {
4353
// delay start of a recovering node:
4454
sleep(Duration::from_secs(5)).await
4555
}
4656
let mut s = Sequencer::new(c, &NoMetrics).await.unwrap();
4757
let mut p = Certifier::new(b, &NoMetrics).await.unwrap();
58+
let mut r = None;
59+
let handle = p.handle();
4860
loop {
4961
select! {
5062
t = brx.recv() => match t {
5163
Ok(trx) => s.add_bundles(once(trx)),
52-
Err(RecvError::Lagged(_)) => continue,
64+
Err(RecvError::Lagged(_)) => {
65+
error!(node = %s.public_key(), "lagging behind");
66+
continue
67+
}
5368
Err(err) => panic!("{err}")
5469
},
5570
o = s.next() => {
56-
let Output::Transactions { round, .. } = o.unwrap() else {
71+
let Output::Transactions { round, transactions, .. } = o.unwrap() else {
72+
error!(node = %s.public_key(), "no sequencer output");
5773
continue
5874
};
59-
let b = Block::new(*round, Bytes::new());
60-
p.handle().enqueue(b).await.unwrap()
75+
// We require unique round numbers.
76+
if Some(round) == r {
77+
continue
78+
}
79+
r = Some(round);
80+
let i = r2b.get(round);
81+
let b = Block::new(i, *round, hash(&transactions));
82+
handle.enqueue(b).await.unwrap()
6183
}
6284
b = p.next_block() => {
6385
let b = b.expect("block");
86+
debug!(node = %s.public_key(), hash = %b.data().hash(), "block received");
6487
let c: Certificate<BlockInfo> = b.into();
6588
tx.send(c.into_data()).unwrap()
6689
}
@@ -71,7 +94,7 @@ async fn block_order() {
7194
}
7295
}
7396
});
74-
rxs.push(rx)
97+
rxs.push((label, rx))
7598
}
7699

77100
for enc_key in &enc_keys {
@@ -80,28 +103,59 @@ async fn block_order() {
80103

81104
tasks.spawn(gen_bundles(enc_keys[0].clone(), bcast.clone()));
82105

83-
// Collect all outputs:
84-
let mut outputs: Vec<Vec<BlockInfo>> = vec![Vec::new(); num.get()];
85-
for _ in 0..NUM_OF_BLOCKS {
86-
for (i, o) in outputs.iter_mut().enumerate() {
87-
let x = rxs[i].recv().await.unwrap();
88-
o.push(x);
106+
let mut map: HashMap<BlockInfo, usize> = HashMap::new();
107+
108+
for b in 0..NUM_OF_BLOCKS {
109+
map.clear();
110+
info!(block = %b);
111+
for (node, r) in &mut rxs {
112+
debug!(%node, block = %b, "awaiting ...");
113+
let info = r.recv().await.unwrap();
114+
*map.entry(info).or_default() += 1
115+
}
116+
if map.values().any(|n| *n >= quorum && *n <= num.get()) {
117+
continue;
89118
}
119+
for (info, n) in map {
120+
eprintln!("{}: {} = {n}", info.hash(), info.round().num())
121+
}
122+
panic!("outputs do not match")
90123
}
91124

92125
finish.cancel();
126+
}
93127

94-
// Compare outputs:
95-
for (a, b) in outputs.iter().zip(outputs.iter().skip(1)) {
96-
if a != b {
97-
for infos in &outputs {
98-
let xy = infos
99-
.iter()
100-
.map(|i| (*i.num(), *i.round()))
101-
.collect::<Vec<_>>();
102-
eprintln!("{xy:?}")
103-
}
104-
panic!("outputs do not match")
128+
fn hash(tx: &[Transaction]) -> Bytes {
129+
let mut h = blake3::Hasher::new();
130+
for t in tx {
131+
h.update(&t.encoded_2718());
132+
}
133+
Bytes::copy_from_slice(h.finalize().as_bytes())
134+
}
135+
136+
/// Map round numbers to block numbers.
137+
///
138+
/// Block numbers need to be consistent, consecutive and strictly monotonic.
139+
/// The round numbers of our sequencer output may contain gaps. To provide
140+
/// block numbers with the required properties we have here one monotonic
141+
/// counter and record which block number is used for a round number.
142+
/// Subsequent lookups will then get a consistent result.
143+
struct Round2Block {
144+
counter: AtomicU64,
145+
block_numbers: Mutex<HashMap<RoundNumber, BlockNumber>>,
146+
}
147+
148+
impl Round2Block {
149+
fn new() -> Self {
150+
Self {
151+
counter: AtomicU64::new(0),
152+
block_numbers: Mutex::new(HashMap::new()),
105153
}
106154
}
155+
156+
fn get(&self, r: RoundNumber) -> BlockNumber {
157+
let mut map = self.block_numbers.lock();
158+
*map.entry(r)
159+
.or_insert_with(|| self.counter.fetch_add(1, Ordering::Relaxed).into())
160+
}
107161
}

0 commit comments

Comments
 (0)