Skip to content

Commit 56a466e

Browse files
apollo_network: add CommitteeStore with read/write API
1 parent 9a0ff77 commit 56a466e

File tree

5 files changed

+389
-0
lines changed

5 files changed

+389
-0
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/apollo_network/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ asynchronous-codec.workspace = true
2121
bytes.workspace = true
2222
derive_more = { workspace = true, features = ["display"] }
2323
futures.workspace = true
24+
indexmap.workspace = true
2425
lazy_static.workspace = true
2526
libp2p = { workspace = true, features = [
2627
"dns",
@@ -39,6 +40,7 @@ libp2p = { workspace = true, features = [
3940
prost.workspace = true
4041
replace_with.workspace = true
4142
serde = { workspace = true, features = ["derive"] }
43+
sha2.workspace = true
4244
starknet_api.workspace = true
4345
strum = { workspace = true, features = ["derive"] }
4446
thiserror.workspace = true
@@ -53,6 +55,7 @@ apollo_network_types = { workspace = true, features = ["testing"] }
5355
assert_matches.workspace = true
5456
deadqueue = { workspace = true, features = ["unlimited"] }
5557
defaultmap.workspace = true
58+
expect-test.workspace = true
5659
libp2p-swarm-test.workspace = true
5760
pretty_assertions.workspace = true
5861
rstest.workspace = true
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1+
pub mod store;
12
pub mod types;
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
#[cfg(test)]
2+
#[path = "store_test.rs"]
3+
mod store_test;
4+
5+
use std::collections::{BTreeSet, HashMap};
6+
7+
use indexmap::IndexMap;
8+
use libp2p::PeerId;
9+
use sha2::{Digest, Sha256};
10+
use starknet_api::staking::StakingWeight;
11+
12+
use super::types::{CommitteeId, CommitteeMember, EpochId};
13+
14+
fn sort_and_dedup_members(members: &mut [CommitteeMember]) -> Result<(), AddEpochError> {
15+
members.sort_by_key(|m| m.peer_id);
16+
17+
for window in members.windows(2) {
18+
if window[0].peer_id == window[1].peer_id {
19+
return Err(AddEpochError::DuplicatePeerId(window[0].peer_id));
20+
}
21+
}
22+
Ok(())
23+
}
24+
25+
fn compute_committee_id(sorted_members: &[CommitteeMember]) -> CommitteeId {
26+
let mut hasher = Sha256::new();
27+
for member in sorted_members {
28+
hasher.update(member.peer_id.to_bytes());
29+
hasher.update(member.weight.0.to_be_bytes());
30+
}
31+
let digest = hasher.finalize();
32+
33+
// TODO(AndrewL): change this once propeller uses actual 256-bit hashes.
34+
let truncated = u32::from_be_bytes([digest[0], digest[1], digest[2], digest[3]]);
35+
CommitteeId(truncated)
36+
}
37+
38+
#[derive(Debug, thiserror::Error)]
39+
pub enum AddEpochError {
40+
#[error("Duplicate epoch ID {0}.")]
41+
DuplicateEpochId(EpochId),
42+
#[error("Duplicate peer ID {0} in committee members.")]
43+
DuplicatePeerId(PeerId),
44+
}
45+
46+
/// Output of adding a new epoch to the store.
47+
#[derive(Debug)]
48+
pub struct AddEpochOutput {
49+
/// All peer IDs that are part of any active committee.
50+
pub allowed_peers: BTreeSet<PeerId>,
51+
/// If this epoch introduced a committee not previously tracked, contains the committee ID
52+
/// and its members as (peer_id, weight) pairs.
53+
pub new_committee: Option<(CommitteeId, Vec<(PeerId, StakingWeight)>)>,
54+
/// If adding this epoch caused an old committee to become inactive, contains its ID.
55+
pub removed_committee: Option<CommitteeId>,
56+
}
57+
58+
/// Stores active epochs and derives committee and peer data from them.
59+
///
60+
/// Epochs are the single source of truth. A committee exists as long as at least one active epoch
61+
/// references it. Peer allow-lists are derived from the union of all active committees.
62+
#[derive(Debug)]
63+
pub struct ActiveCommittees {
64+
capacity: usize,
65+
/// Epoch ID -> committee ID, in insertion order for FIFO eviction.
66+
epochs: IndexMap<EpochId, CommitteeId>,
67+
/// Number of active epochs referencing each committee ID.
68+
committee_ref_counts: HashMap<CommitteeId, u64>,
69+
/// The members for each tracked committee. Exists iff `committee_ref_counts` has the key.
70+
committee_data: HashMap<CommitteeId, Vec<CommitteeMember>>,
71+
}
72+
73+
impl ActiveCommittees {
74+
pub fn new(capacity: usize) -> Self {
75+
Self {
76+
capacity,
77+
epochs: IndexMap::new(),
78+
committee_ref_counts: HashMap::new(),
79+
committee_data: HashMap::new(),
80+
}
81+
}
82+
83+
pub fn add_epoch(
84+
&mut self,
85+
epoch_id: EpochId,
86+
mut members: Vec<CommitteeMember>,
87+
) -> Result<AddEpochOutput, AddEpochError> {
88+
if self.epochs.contains_key(&epoch_id) {
89+
return Err(AddEpochError::DuplicateEpochId(epoch_id));
90+
}
91+
92+
sort_and_dedup_members(&mut members)?;
93+
let committee_id = compute_committee_id(&members);
94+
95+
// Track the epoch.
96+
self.epochs.insert(epoch_id, committee_id);
97+
98+
// Track the committee. A ref count of 0 (or absent) means this is a new committee.
99+
let count = self.committee_ref_counts.entry(committee_id).or_insert(0);
100+
let new_committee = if *count == 0 {
101+
let committee_peers = members.iter().map(|m| (m.peer_id, m.weight)).collect();
102+
self.committee_data.insert(committee_id, members);
103+
Some((committee_id, committee_peers))
104+
} else {
105+
None
106+
};
107+
*count += 1;
108+
109+
// Evict the oldest epoch if over capacity.
110+
let removed_committee =
111+
if self.epochs.len() > self.capacity { self.evict_oldest_epoch() } else { None };
112+
113+
let allowed_peers = self.compute_allowed_peers();
114+
115+
Ok(AddEpochOutput { allowed_peers, new_committee, removed_committee })
116+
}
117+
118+
/// Evicts the oldest epoch. If its committee has no remaining epochs, removes the committee
119+
/// and returns its ID.
120+
fn evict_oldest_epoch(&mut self) -> Option<CommitteeId> {
121+
let (_evicted_epoch_id, committee_id) = self.epochs.shift_remove_index(0)?;
122+
123+
let count = self
124+
.committee_ref_counts
125+
.get_mut(&committee_id)
126+
.expect("committee id in epochs references a committee not in committee_ref_counts");
127+
*count -= 1;
128+
129+
if *count == 0 {
130+
self.committee_ref_counts.remove(&committee_id);
131+
self.committee_data.remove(&committee_id);
132+
Some(committee_id)
133+
} else {
134+
None
135+
}
136+
}
137+
138+
fn compute_allowed_peers(&self) -> BTreeSet<PeerId> {
139+
self.committee_data.values().flat_map(|members| members.iter().map(|m| m.peer_id)).collect()
140+
}
141+
}

0 commit comments

Comments
 (0)