Skip to content

Commit 2281311

Browse files
committed
fix: blockfetch uses slots
1 parent 7390aad commit 2281311

File tree

3 files changed

+20
-11
lines changed

3 files changed

+20
-11
lines changed

modules/peer_network_interface/src/connection.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,20 @@ impl PeerConnection {
6161
}
6262
}
6363

64+
#[derive(Debug)]
6465
pub enum PeerEvent {
6566
ChainSync(PeerChainSyncEvent),
6667
BlockFetched(BlockFetched),
6768
Disconnected,
6869
}
6970

71+
#[derive(Debug)]
7072
pub enum PeerChainSyncEvent {
7173
RollForward(Header),
7274
RollBackward(Point),
7375
}
7476

75-
#[derive(Clone)]
77+
#[derive(Clone, Debug)]
7678
pub struct Header {
7779
pub hash: BlockHash,
7880
pub slot: u64,
@@ -81,6 +83,7 @@ pub struct Header {
8183
pub era: Era,
8284
}
8385

86+
#[derive(Debug)]
8487
pub struct BlockFetched {
8588
pub hash: BlockHash,
8689
pub body: Vec<u8>,
@@ -157,8 +160,8 @@ impl PeerConnectionWorker {
157160
mut client: blockfetch::Client,
158161
mut commands: mpsc::Receiver<BlockfetchCommand>,
159162
) -> Result<()> {
160-
while let Some(BlockfetchCommand::Fetch(hash, height)) = commands.recv().await {
161-
let point = Point::Specific(height, hash.to_vec());
163+
while let Some(BlockfetchCommand::Fetch(hash, slot)) = commands.recv().await {
164+
let point = Point::Specific(slot, hash.to_vec());
162165
let body = client.fetch_single(point).await?;
163166
self.sender.write(PeerEvent::BlockFetched(BlockFetched { hash, body })).await?;
164167
}
@@ -174,7 +177,7 @@ impl PeerConnectionWorker {
174177
let Some(parsed) = self.parse_header(header)? else {
175178
return Ok(None);
176179
};
177-
let point = Point::Specific(parsed.number, parsed.hash.to_vec());
180+
let point = Point::Specific(parsed.slot, parsed.hash.to_vec());
178181
Ok(Some(ParsedChainsyncMessage {
179182
point,
180183
event: PeerChainSyncEvent::RollForward(parsed),

modules/peer_network_interface/src/network.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use acropolis_common::BlockHash;
1111
use anyhow::{Context as _, Result, bail};
1212
use pallas::network::miniprotocols::Point;
1313
use tokio::sync::mpsc;
14-
use tracing::warn;
14+
use tracing::{info, warn};
1515

1616
pub struct NetworkManager {
1717
network_magic: u64,
@@ -25,6 +25,7 @@ pub struct NetworkManager {
2525
events: mpsc::Receiver<NetworkEvent>,
2626
events_sender: mpsc::Sender<NetworkEvent>,
2727
block_sink: BlockSink,
28+
published_blocks: u64,
2829
}
2930

3031
impl NetworkManager {
@@ -46,6 +47,7 @@ impl NetworkManager {
4647
events,
4748
events_sender,
4849
block_sink,
50+
published_blocks: 0,
4951
}
5052
}
5153

@@ -130,7 +132,7 @@ impl NetworkManager {
130132
let Some(peer) = self.peers.get(&announcer) else {
131133
continue;
132134
};
133-
if let Err(e) = peer.request_block(header.hash, header.number).await
135+
if let Err(e) = peer.request_block(header.hash, header.slot).await
134136
{
135137
warn!("could not request block from {}: {e}", peer.address);
136138
self.handle_disconnect(announcer).await?
@@ -155,10 +157,10 @@ impl NetworkManager {
155157
self.blocks_to_fetch.clear();
156158
self.rolled_back = true;
157159
}
158-
Point::Specific(number, _) => {
160+
Point::Specific(slot, _) => {
159161
let mut already_sent = true;
160162
while let Some(newest) = self.blocks_to_fetch.back() {
161-
if newest.number == number {
163+
if newest.slot == slot {
162164
already_sent = false;
163165
break;
164166
} else {
@@ -214,7 +216,11 @@ impl NetworkManager {
214216
break;
215217
};
216218
self.block_sink.announce(header, body, self.rolled_back).await?;
217-
self.head = Some(Point::Specific(header.number, header.hash.to_vec()));
219+
self.published_blocks += 1;
220+
if self.published_blocks.is_multiple_of(100) {
221+
info!("Published block {}", header.number);
222+
}
223+
self.head = Some(Point::Specific(header.slot, header.hash.to_vec()));
218224
self.rolled_back = false;
219225
self.blocks_to_fetch.pop_front();
220226
}
@@ -227,7 +233,7 @@ pub enum NetworkEvent {
227233
PeerUpdate { peer: PeerId, event: PeerEvent },
228234
}
229235

230-
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
236+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
231237
pub struct PeerId(u64);
232238

233239
pub struct PeerMessageSender {

modules/peer_network_interface/src/peer_network_interface.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ impl PeerNetworkInterface {
9797
let (_, message) = subscription.read().await?;
9898
match message.as_ref() {
9999
Message::Cardano((block, CardanoMessage::SnapshotComplete)) => {
100-
Ok(Point::Specific(block.number, block.hash.to_vec()))
100+
Ok(Point::Specific(block.slot, block.hash.to_vec()))
101101
}
102102
msg => bail!("Unexpected message in snapshot completion topic: {msg:?}"),
103103
}

0 commit comments

Comments
 (0)