Skip to content

Commit dabb272

Browse files
committed
feat: extract chain logic to separate helper
1 parent 4647b72 commit dabb272

File tree

4 files changed

+239
-149
lines changed

4 files changed

+239
-149
lines changed
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
use std::collections::{BTreeMap, VecDeque};
2+
3+
use acropolis_common::{BlockHash, params::SECURITY_PARAMETER_K};
4+
use pallas::network::miniprotocols::Point;
5+
6+
use crate::{connection::Header, network::PeerId};
7+
8+
struct BlockData {
9+
header: Header,
10+
announced_by: Vec<PeerId>,
11+
body: Option<Vec<u8>>,
12+
}
13+
14+
#[derive(Default)]
15+
struct SlotBlockData {
16+
blocks: Vec<BlockData>,
17+
}
18+
impl SlotBlockData {
19+
fn track_announcement(&mut self, id: PeerId, header: Header) {
20+
if let Some(block) = self.blocks.iter_mut().find(|b| b.header.hash == header.hash) {
21+
block.announced_by.push(id);
22+
} else {
23+
self.blocks.push(BlockData { header, announced_by: vec![id], body: None });
24+
}
25+
}
26+
27+
fn announced(&self, id: PeerId, hash: BlockHash) -> bool {
28+
self.blocks.iter().any(|b| b.header.hash == hash && b.announced_by.contains(&id))
29+
}
30+
31+
fn announcers(&self, hash: BlockHash) -> Vec<PeerId> {
32+
match self.blocks.iter().find(|b| b.header.hash == hash) {
33+
Some(b) => b.announced_by.clone(),
34+
None => vec![]
35+
}
36+
}
37+
38+
fn track_body(&mut self, hash: BlockHash, body: Vec<u8>) {
39+
let Some(block) = self.blocks.iter_mut().find(|b| b.header.hash == hash) else {
40+
return;
41+
};
42+
if block.body.is_none() {
43+
block.body = Some(body);
44+
}
45+
}
46+
47+
fn body(&self, hash: BlockHash) -> Option<(&Header, &[u8])> {
48+
for block in &self.blocks {
49+
if block.header.hash != hash {
50+
continue;
51+
}
52+
return Some((&block.header, block.body.as_ref()?));
53+
}
54+
None
55+
}
56+
}
57+
58+
#[derive(Default)]
59+
pub struct ChainState {
60+
pub preferred_upstream: Option<PeerId>,
61+
blocks: BTreeMap<u64, SlotBlockData>,
62+
published_blocks: VecDeque<(u64, BlockHash)>,
63+
unpublished_blocks: VecDeque<(u64, BlockHash)>,
64+
rolled_back: bool,
65+
}
66+
67+
impl ChainState {
68+
pub fn new() -> Self {
69+
Self::default()
70+
}
71+
72+
pub fn handle_roll_forward(&mut self, id: PeerId, header: Header) -> Vec<PeerId> {
73+
let is_preferred = self.preferred_upstream == Some(id);
74+
let slot = header.slot;
75+
let hash = header.hash;
76+
let slot_blocks = self.blocks.entry(header.slot).or_default();
77+
slot_blocks.track_announcement(id, header);
78+
if is_preferred {
79+
self.unpublished_blocks.push_back((slot, hash));
80+
self.block_announcers(slot, hash)
81+
} else {
82+
vec![]
83+
}
84+
}
85+
86+
pub fn handle_roll_backward(&mut self, id: PeerId, point: Point) -> bool {
87+
let is_preferred = self.preferred_upstream == Some(id);
88+
if !is_preferred {
89+
return false;
90+
}
91+
match point {
92+
Point::Origin => {
93+
self.rolled_back = !self.published_blocks.is_empty();
94+
self.published_blocks.clear();
95+
self.unpublished_blocks.clear();
96+
self.rolled_back
97+
}
98+
Point::Specific(slot, _) => {
99+
while let Some((s, _)) = self.unpublished_blocks.back() {
100+
if *s > slot {
101+
self.unpublished_blocks.pop_back();
102+
} else {
103+
break;
104+
}
105+
}
106+
self.rolled_back = false;
107+
while let Some((s, _)) = self.published_blocks.back() {
108+
if *s > slot {
109+
self.rolled_back = true;
110+
self.published_blocks.pop_back();
111+
} else {
112+
break;
113+
}
114+
}
115+
self.rolled_back
116+
}
117+
}
118+
}
119+
120+
pub fn handle_body_fetched(&mut self, slot: u64, hash: BlockHash, body: Vec<u8>) {
121+
let Some(slot_blocks) = self.blocks.get_mut(&slot) else {
122+
return;
123+
};
124+
slot_blocks.track_body(hash, body);
125+
}
126+
127+
pub fn handle_new_preferred_upstream(&mut self, id: PeerId) {
128+
if self.preferred_upstream == Some(id) {
129+
return;
130+
}
131+
self.preferred_upstream = Some(id);
132+
while let Some((slot, hash)) = self.unpublished_blocks.back() {
133+
let Some(slot_blocks) = self.blocks.get(slot) else {
134+
break;
135+
};
136+
if !slot_blocks.announced(id, *hash) {
137+
self.unpublished_blocks.pop_back();
138+
}
139+
}
140+
}
141+
142+
pub fn next_unpublished_block(&self) -> Option<(&Header, &[u8], bool)> {
143+
let (slot, hash) = self.unpublished_blocks.front()?;
144+
let slot_blocks = self.blocks.get(slot)?;
145+
let (header, body) = slot_blocks.body(*hash)?;
146+
Some((header, body, self.rolled_back))
147+
}
148+
149+
pub fn handle_block_published(&mut self) {
150+
if let Some(published) = self.unpublished_blocks.pop_front() {
151+
self.published_blocks.push_back(published);
152+
self.rolled_back = false;
153+
while self.published_blocks.len() > SECURITY_PARAMETER_K as usize {
154+
let Some((slot, _)) = self.published_blocks.pop_back() else {
155+
break;
156+
};
157+
self.blocks.remove(&slot);
158+
}
159+
}
160+
}
161+
162+
pub fn choose_points_for_find_intersect(&self) -> Vec<Point> {
163+
let mut iterator = self.published_blocks.iter().rev();
164+
let mut result = vec![];
165+
166+
// send the 5 most recent points
167+
for _ in 0..5 {
168+
if let Some((slot, hash)) = iterator.next() {
169+
result.push(Point::Specific(*slot, hash.to_vec()));
170+
}
171+
}
172+
173+
// then 5 more points, spaced out by 10 block heights each
174+
let mut iterator = iterator.step_by(10);
175+
for _ in 0..5 {
176+
if let Some((slot, hash)) = iterator.next() {
177+
result.push(Point::Specific(*slot, hash.to_vec()));
178+
}
179+
}
180+
181+
// then 5 more points, spaced out by a total of 100 block heights each
182+
// (in case of an implausibly long rollback)
183+
let mut iterator = iterator.step_by(10);
184+
for _ in 0..5 {
185+
if let Some((slot, hash)) = iterator.next() {
186+
result.push(Point::Specific(*slot, hash.to_vec()));
187+
}
188+
}
189+
190+
result
191+
}
192+
193+
fn block_announcers(&self, slot: u64, hash: BlockHash) -> Vec<PeerId> {
194+
match self.blocks.get(&slot) {
195+
Some(slot_blocks) => slot_blocks.announcers(hash),
196+
None => vec![]
197+
}
198+
}
199+
}

modules/peer_network_interface/src/connection.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ impl PeerConnection {
5555
Ok(())
5656
}
5757

58-
pub fn request_block(&self, hash: BlockHash, height: u64) -> Result<()> {
59-
self.blockfetch.send(BlockfetchCommand::Fetch(hash, height))?;
58+
pub fn request_block(&self, hash: BlockHash, slot: u64) -> Result<()> {
59+
self.blockfetch.send(BlockfetchCommand::Fetch(hash, slot))?;
6060
Ok(())
6161
}
6262
}
@@ -85,6 +85,7 @@ pub struct Header {
8585

8686
#[derive(Debug)]
8787
pub struct BlockFetched {
88+
pub slot: u64,
8889
pub hash: BlockHash,
8990
pub body: Vec<u8>,
9091
}
@@ -169,7 +170,7 @@ impl PeerConnectionWorker {
169170
while let Some(BlockfetchCommand::Fetch(hash, slot)) = commands.recv().await {
170171
let point = Point::Specific(slot, hash.to_vec());
171172
let body = client.fetch_single(point).await?;
172-
self.sender.write(PeerEvent::BlockFetched(BlockFetched { hash, body })).await?;
173+
self.sender.write(PeerEvent::BlockFetched(BlockFetched { slot, hash, body })).await?;
173174
}
174175
bail!("parent process has disconnected");
175176
}

0 commit comments

Comments
 (0)