Skip to content

Commit 7390aad

Browse files
committed
feat: implementation of PeerNetworkInterface
1 parent a819884 commit 7390aad

File tree

9 files changed

+733
-0
lines changed

9 files changed

+733
-0
lines changed

Cargo.lock

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ members = [
1111
"modules/mithril_snapshot_fetcher", # Mithril snapshot fetcher
1212
"modules/snapshot_bootstrapper", # Bootstrap state from a ledger snapshot
1313
"modules/upstream_chain_fetcher", # Upstream chain fetcher
14+
"modules/peer_network_interface", # Multi-peer network interface
1415
"modules/block_unpacker", # Block to transaction unpacker
1516
"modules/tx_unpacker", # Tx to UTXO unpacker
1617
"modules/utxo_state", # UTXO state

common/src/genesis_values.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ const MAINNET_SHELLEY_GENESIS_HASH: &str =
66
"1a3be38bcbb7911969283716ad7aa550250226b76a61fc51cc9a9a35d9276d81";
77

88
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
9+
#[serde(rename_all = "kebab-case")]
910
pub struct GenesisValues {
1011
pub byron_timestamp: u64,
1112
pub shelley_epoch: u64,
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Acropolis upstream chain fetcher module
2+
3+
[package]
4+
name = "acropolis_module_peer_network_interface"
5+
version = "0.2.0"
6+
edition = "2024"
7+
authors = ["Paul Clark <[email protected]>"]
8+
description = "Multiplexed chain fetcher Caryatid module for Acropolis"
9+
license = "Apache-2.0"
10+
11+
[dependencies]
12+
acropolis_common = { path = "../../common" }
13+
14+
caryatid_sdk = { workspace = true }
15+
16+
anyhow = { workspace = true }
17+
config = { workspace = true }
18+
crossbeam = "0.8.4"
19+
pallas = { workspace = true }
20+
serde = { workspace = true, features = ["rc"] }
21+
serde_json = { workspace = true }
22+
tokio = { workspace = true }
23+
tracing = { workspace = true }
24+
25+
[lib]
26+
path = "src/peer_network_interface.rs"
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
block-topic = "cardano.block.available"
2+
snapshot-completion-topic = "cardano.snapshot.complete"
3+
genesis-completion-topic = "cardano.sequence.bootstrapped"
4+
5+
node-addresses = [
6+
"backbone.cardano.iog.io:3001",
7+
]
8+
magic-number = 764824073
9+
10+
sync-point = "origin"
11+
cache-dir = "upstream-cache"
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use std::path::PathBuf;
2+
3+
use acropolis_common::genesis_values::GenesisValues;
4+
use anyhow::Result;
5+
use config::Config;
6+
7+
#[derive(Clone, Debug, serde::Deserialize, PartialEq)]
8+
#[serde(rename_all = "camelCase")]
9+
pub enum SyncPoint {
10+
Origin,
11+
Tip,
12+
Cache,
13+
Snapshot,
14+
}
15+
16+
#[derive(serde::Deserialize)]
17+
#[serde(rename_all = "kebab-case")]
18+
pub struct InterfaceConfig {
19+
pub block_topic: String,
20+
pub sync_point: SyncPoint,
21+
pub snapshot_completion_topic: String,
22+
pub genesis_completion_topic: String,
23+
pub node_addresses: Vec<String>,
24+
pub magic_number: u64,
25+
#[expect(unused)]
26+
pub cache_dir: PathBuf,
27+
#[serde(flatten)]
28+
pub genesis_values: Option<GenesisValues>,
29+
}
30+
31+
impl InterfaceConfig {
32+
pub fn try_load(config: &Config) -> Result<Self> {
33+
let full_config = Config::builder()
34+
.add_source(config::File::from_str(
35+
include_str!("../config.default.toml"),
36+
config::FileFormat::Toml,
37+
))
38+
.add_source(config.clone())
39+
.build()?;
40+
Ok(full_config.try_deserialize()?)
41+
}
42+
}
Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
use std::time::Duration;
2+
3+
use acropolis_common::{BlockHash, Era};
4+
use anyhow::{Result, bail};
5+
pub use pallas::network::miniprotocols::Point;
6+
use pallas::{
7+
ledger::traverse::MultiEraHeader,
8+
network::{
9+
facades::PeerClient,
10+
miniprotocols::{blockfetch, chainsync},
11+
},
12+
};
13+
use tokio::{
14+
select,
15+
sync::{mpsc, oneshot},
16+
};
17+
use tracing::error;
18+
19+
use crate::network::PeerMessageSender;
20+
21+
pub struct PeerConnection {
22+
pub address: String,
23+
chainsync: mpsc::Sender<ChainsyncCommand>,
24+
blockfetch: mpsc::Sender<BlockfetchCommand>,
25+
}
26+
27+
impl PeerConnection {
28+
pub fn new(address: String, magic: u64, sender: PeerMessageSender, delay: Duration) -> Self {
29+
let worker = PeerConnectionWorker {
30+
address: address.clone(),
31+
magic,
32+
sender,
33+
};
34+
let (chainsync_tx, chainsync_rx) = mpsc::channel(16);
35+
let (blockfetch_tx, blockfetch_rx) = mpsc::channel(16);
36+
tokio::spawn(async move {
37+
tokio::time::sleep(delay).await;
38+
worker.run(chainsync_rx, blockfetch_rx).await;
39+
});
40+
Self {
41+
address,
42+
chainsync: chainsync_tx,
43+
blockfetch: blockfetch_tx,
44+
}
45+
}
46+
47+
pub async fn find_tip(&self) -> Result<Point> {
48+
let (tx, rx) = oneshot::channel();
49+
self.chainsync.send(ChainsyncCommand::FindTip(tx)).await?;
50+
Ok(rx.await?)
51+
}
52+
53+
pub async fn find_intersect(&self, points: Vec<Point>) -> Result<()> {
54+
self.chainsync.send(ChainsyncCommand::FindIntersect(points)).await?;
55+
Ok(())
56+
}
57+
58+
pub async fn request_block(&self, hash: BlockHash, height: u64) -> Result<()> {
59+
self.blockfetch.send(BlockfetchCommand::Fetch(hash, height)).await?;
60+
Ok(())
61+
}
62+
}
63+
64+
pub enum PeerEvent {
65+
ChainSync(PeerChainSyncEvent),
66+
BlockFetched(BlockFetched),
67+
Disconnected,
68+
}
69+
70+
pub enum PeerChainSyncEvent {
71+
RollForward(Header),
72+
RollBackward(Point),
73+
}
74+
75+
#[derive(Clone)]
76+
pub struct Header {
77+
pub hash: BlockHash,
78+
pub slot: u64,
79+
pub number: u64,
80+
pub bytes: Vec<u8>,
81+
pub era: Era,
82+
}
83+
84+
pub struct BlockFetched {
85+
pub hash: BlockHash,
86+
pub body: Vec<u8>,
87+
}
88+
89+
struct PeerConnectionWorker {
90+
address: String,
91+
magic: u64,
92+
sender: PeerMessageSender,
93+
}
94+
95+
impl PeerConnectionWorker {
96+
async fn run(
97+
mut self,
98+
chainsync: mpsc::Receiver<ChainsyncCommand>,
99+
blockfetch: mpsc::Receiver<BlockfetchCommand>,
100+
) {
101+
if let Err(err) = self.do_run(chainsync, blockfetch).await {
102+
error!(peer = self.address, "{err:#}");
103+
}
104+
let _ = self.sender.write(PeerEvent::Disconnected).await;
105+
}
106+
107+
async fn do_run(
108+
&mut self,
109+
chainsync: mpsc::Receiver<ChainsyncCommand>,
110+
blockfetch: mpsc::Receiver<BlockfetchCommand>,
111+
) -> Result<()> {
112+
let client = PeerClient::connect(self.address.clone(), self.magic).await?;
113+
select! {
114+
res = self.run_chainsync(client.chainsync, chainsync) => res,
115+
res = self.run_blockfetch(client.blockfetch, blockfetch) => res,
116+
}
117+
}
118+
119+
async fn run_chainsync(
120+
&self,
121+
mut client: chainsync::N2NClient,
122+
mut commands: mpsc::Receiver<ChainsyncCommand>,
123+
) -> Result<()> {
124+
let mut reached = None;
125+
loop {
126+
select! {
127+
msg = client.request_or_await_next(), if reached.is_some() => {
128+
if let Some(parsed) = self.parse_chainsync_message(msg?)? {
129+
reached = Some(parsed.point);
130+
self.sender.write(PeerEvent::ChainSync(parsed.event)).await?;
131+
}
132+
}
133+
cmd = commands.recv() => {
134+
let Some(cmd) = cmd else {
135+
bail!("parent process has disconnected");
136+
};
137+
match cmd {
138+
ChainsyncCommand::FindIntersect(points) => {
139+
let (point, _) = client.find_intersect(points).await?;
140+
reached = point;
141+
}
142+
ChainsyncCommand::FindTip(done) => {
143+
let points = reached.as_slice().to_vec();
144+
let (_, tip) = client.find_intersect(points).await?;
145+
if done.send(tip.0).is_err() {
146+
bail!("parent process has disconnected");
147+
}
148+
}
149+
}
150+
}
151+
}
152+
}
153+
}
154+
155+
async fn run_blockfetch(
156+
&self,
157+
mut client: blockfetch::Client,
158+
mut commands: mpsc::Receiver<BlockfetchCommand>,
159+
) -> Result<()> {
160+
while let Some(BlockfetchCommand::Fetch(hash, height)) = commands.recv().await {
161+
let point = Point::Specific(height, hash.to_vec());
162+
let body = client.fetch_single(point).await?;
163+
self.sender.write(PeerEvent::BlockFetched(BlockFetched { hash, body })).await?;
164+
}
165+
bail!("parent process has disconnected");
166+
}
167+
168+
fn parse_chainsync_message(
169+
&self,
170+
msg: chainsync::NextResponse<chainsync::HeaderContent>,
171+
) -> Result<Option<ParsedChainsyncMessage>> {
172+
match msg {
173+
chainsync::NextResponse::RollForward(header, _) => {
174+
let Some(parsed) = self.parse_header(header)? else {
175+
return Ok(None);
176+
};
177+
let point = Point::Specific(parsed.number, parsed.hash.to_vec());
178+
Ok(Some(ParsedChainsyncMessage {
179+
point,
180+
event: PeerChainSyncEvent::RollForward(parsed),
181+
}))
182+
}
183+
chainsync::NextResponse::RollBackward(point, _) => Ok(Some(ParsedChainsyncMessage {
184+
point: point.clone(),
185+
event: PeerChainSyncEvent::RollBackward(point),
186+
})),
187+
chainsync::NextResponse::Await => Ok(None),
188+
}
189+
}
190+
191+
fn parse_header(&self, header: chainsync::HeaderContent) -> Result<Option<Header>> {
192+
let hdr_tag = header.byron_prefix.map(|p| p.0);
193+
let hdr_variant = header.variant;
194+
let hdr = MultiEraHeader::decode(hdr_variant, hdr_tag, &header.cbor)?;
195+
let era = match hdr {
196+
MultiEraHeader::EpochBoundary(_) => return Ok(None),
197+
MultiEraHeader::Byron(_) => Era::Byron,
198+
MultiEraHeader::ShelleyCompatible(_) => match hdr_variant {
199+
1 => Era::Shelley,
200+
2 => Era::Allegra,
201+
3 => Era::Mary,
202+
4 => Era::Alonzo,
203+
x => bail!("Impossible header variant {x} for ShelleyCompatible (TPraos)"),
204+
},
205+
MultiEraHeader::BabbageCompatible(_) => match hdr_variant {
206+
5 => Era::Babbage,
207+
6 => Era::Conway,
208+
x => bail!("Impossible header variant {x} for BabbageCompatible (Praos)"),
209+
},
210+
};
211+
Ok(Some(Header {
212+
hash: BlockHash::new(*hdr.hash()),
213+
slot: hdr.slot(),
214+
number: hdr.number(),
215+
bytes: header.cbor,
216+
era,
217+
}))
218+
}
219+
}
220+
221+
enum ChainsyncCommand {
222+
FindIntersect(Vec<Point>),
223+
FindTip(oneshot::Sender<Point>),
224+
}
225+
226+
struct ParsedChainsyncMessage {
227+
point: Point,
228+
event: PeerChainSyncEvent,
229+
}
230+
231+
enum BlockfetchCommand {
232+
Fetch(BlockHash, u64),
233+
}

0 commit comments

Comments
 (0)