Skip to content

Commit ece87d3

Browse files
committed
feat: integrate upstream cache
1 parent e085283 commit ece87d3

File tree

7 files changed

+77
-47
lines changed

7 files changed

+77
-47
lines changed

common/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub mod snapshot;
2121
pub mod stake_addresses;
2222
pub mod state_history;
2323
pub mod types;
24+
pub mod upstream_cache;
2425
pub mod validation;
2526

2627
// Flattened re-exports

modules/upstream_chain_fetcher/src/upstream_cache.rs renamed to common/src/upstream_cache.rs

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use acropolis_common::{messages::RawBlockMessage, BlockInfo};
2-
use anyhow::{anyhow, bail, Result};
1+
use crate::{messages::RawBlockMessage, BlockInfo};
2+
use anyhow::{anyhow, bail, Context, Result};
33
use std::{
44
fs::File,
55
io::{BufReader, Write},
6-
path::Path,
6+
path::{Path, PathBuf},
77
sync::Arc,
88
};
99

@@ -19,25 +19,25 @@ pub trait Storage {
1919
}
2020

2121
pub struct FileStorage {
22-
path: String,
22+
path: PathBuf,
2323
}
2424

2525
impl FileStorage {
26-
pub fn new(path: &str) -> Self {
26+
pub fn new<P: AsRef<Path>>(path: P) -> Self {
2727
Self {
28-
path: path.to_string(),
28+
path: path.as_ref().to_path_buf(),
2929
}
3030
}
3131

32-
fn get_file_name(&self, chunk_no: usize) -> String {
33-
format!("{}/chunk-{chunk_no}.json", self.path)
32+
fn get_file_name(&self, chunk_no: usize) -> PathBuf {
33+
self.path.join(format!("chunk-{chunk_no}.json"))
3434
}
3535
}
3636

3737
pub type UpstreamCache = UpstreamCacheImpl<FileStorage>;
3838

3939
impl UpstreamCache {
40-
pub fn new(path: &str) -> Self {
40+
pub fn new<P: AsRef<Path>>(path: P) -> Self {
4141
UpstreamCache::new_impl(FileStorage::new(path))
4242
}
4343
}
@@ -139,34 +139,32 @@ impl<S: Storage> UpstreamCacheImpl<S> {
139139

140140
impl Storage for FileStorage {
141141
fn read_chunk(&mut self, chunk_no: usize) -> Result<Vec<UpstreamCacheRecord>> {
142-
let name = self.get_file_name(chunk_no);
143-
let path = Path::new(&name);
142+
let path = self.get_file_name(chunk_no);
144143
if !path.try_exists()? {
145144
return Ok(vec![]);
146145
}
147146

148-
let file = File::open(&name)?;
147+
let file = File::open(&path)?;
149148
let reader = BufReader::new(file);
150-
match serde_json::from_reader::<BufReader<std::fs::File>, Vec<UpstreamCacheRecord>>(reader)
151-
{
152-
Ok(res) => Ok(res.clone()),
153-
Err(err) => Err(anyhow!(
154-
"Error reading upstream cache chunk JSON from {name}: '{err}'"
155-
)),
156-
}
149+
serde_json::from_reader(reader).with_context(|| {
150+
format!(
151+
"Error reading upstream cache chunk JSON from {}",
152+
path.display()
153+
)
154+
})
157155
}
158156

159157
fn write_chunk(&mut self, chunk_no: usize, data: &[UpstreamCacheRecord]) -> Result<()> {
160158
let mut file = File::create(self.get_file_name(chunk_no))?;
161-
file.write_all(serde_json::to_string(data)?.as_bytes())?;
159+
file.write_all(&serde_json::to_vec(data)?)?;
162160
Ok(())
163161
}
164162
}
165163

166164
#[cfg(test)]
167165
mod test {
168166
use crate::upstream_cache::{Storage, UpstreamCacheImpl, UpstreamCacheRecord};
169-
use acropolis_common::{messages::RawBlockMessage, BlockHash, BlockInfo, BlockStatus, Era};
167+
use crate::{messages::RawBlockMessage, BlockHash, BlockInfo, BlockStatus, Era};
170168
use anyhow::Result;
171169
use std::{collections::HashMap, sync::Arc};
172170

modules/peer_network_interface/src/configuration.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ pub struct InterfaceConfig {
2222
pub genesis_completion_topic: String,
2323
pub node_addresses: Vec<String>,
2424
pub magic_number: u64,
25-
#[expect(unused)]
2625
pub cache_dir: PathBuf,
2726
#[serde(flatten)]
2827
pub genesis_values: Option<GenesisValues>,

modules/peer_network_interface/src/connection.rs

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -201,22 +201,11 @@ impl PeerConnectionWorker {
201201
let hdr_tag = header.byron_prefix.map(|p| p.0);
202202
let hdr_variant = header.variant;
203203
let hdr = MultiEraHeader::decode(hdr_variant, hdr_tag, &header.cbor)?;
204-
let era = match hdr {
205-
MultiEraHeader::EpochBoundary(_) => return Ok(None),
206-
MultiEraHeader::Byron(_) => Era::Byron,
207-
MultiEraHeader::ShelleyCompatible(_) => match hdr_variant {
208-
1 => Era::Shelley,
209-
2 => Era::Allegra,
210-
3 => Era::Mary,
211-
4 => Era::Alonzo,
212-
x => bail!("Impossible header variant {x} for ShelleyCompatible (TPraos)"),
213-
},
214-
MultiEraHeader::BabbageCompatible(_) => match hdr_variant {
215-
5 => Era::Babbage,
216-
6 => Era::Conway,
217-
x => bail!("Impossible header variant {x} for BabbageCompatible (Praos)"),
218-
},
219-
};
204+
if hdr.as_eb().is_some() {
205+
// skip EpochBoundary blocks
206+
return Ok(None);
207+
}
208+
let era = Era::try_from(hdr_variant)?;
220209
Ok(Some(Header {
221210
hash: BlockHash::new(*hdr.hash()),
222211
slot: hdr.slot(),

modules/peer_network_interface/src/peer_network_interface.rs

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use acropolis_common::{
66
BlockInfo, BlockStatus,
77
genesis_values::GenesisValues,
88
messages::{CardanoMessage, Message, RawBlockMessage},
9+
upstream_cache::{UpstreamCache, UpstreamCacheRecord},
910
};
1011
use anyhow::{Result, bail};
1112
use caryatid_sdk::{Context, Module, Subscription, module};
@@ -50,10 +51,28 @@ impl PeerNetworkInterface {
5051
} else {
5152
cfg.genesis_values.expect("genesis values not found")
5253
};
54+
55+
let mut upstream_cache = None;
56+
let mut cache_sync_point = None;
57+
if cfg.sync_point == SyncPoint::Cache {
58+
let mut cache = UpstreamCache::new(&cfg.cache_dir);
59+
cache.start_reading()?;
60+
while let Some(record) = cache.read_record()? {
61+
cache_sync_point = Some((record.id.slot, record.id.hash));
62+
let message = Arc::new(Message::Cardano((
63+
record.id,
64+
CardanoMessage::BlockAvailable(Arc::unwrap_or_clone(record.message)),
65+
)));
66+
context.message_bus.publish(&cfg.block_topic, message).await?;
67+
}
68+
upstream_cache = Some(cache);
69+
}
70+
5371
let sink = BlockSink {
5472
context,
5573
topic: cfg.block_topic,
5674
genesis_values,
75+
upstream_cache,
5776
};
5877

5978
let mut manager =
@@ -65,7 +84,13 @@ impl PeerNetworkInterface {
6584
match cfg.sync_point {
6685
SyncPoint::Origin => manager.sync_to_point(Point::Origin),
6786
SyncPoint::Tip => manager.sync_to_tip().await?,
68-
SyncPoint::Cache => unimplemented!(),
87+
SyncPoint::Cache => {
88+
let point = match cache_sync_point {
89+
Some((slot, hash)) => Point::Specific(slot, hash.to_vec()),
90+
None => Point::Origin,
91+
};
92+
manager.sync_to_point(point);
93+
}
6994
SyncPoint::Snapshot => {
7095
let mut subscription =
7196
snapshot_complete.expect("Snapshot topic subscription missing");
@@ -109,15 +134,31 @@ struct BlockSink {
109134
context: Arc<Context<Message>>,
110135
topic: String,
111136
genesis_values: GenesisValues,
137+
upstream_cache: Option<UpstreamCache>,
112138
}
113139
impl BlockSink {
114-
pub async fn announce(&self, header: &Header, body: &[u8], rolled_back: bool) -> Result<()> {
140+
pub async fn announce(
141+
&mut self,
142+
header: &Header,
143+
body: &[u8],
144+
rolled_back: bool,
145+
) -> Result<()> {
115146
let info = self.make_block_info(header, rolled_back);
116-
let available = CardanoMessage::BlockAvailable(RawBlockMessage {
147+
let raw_block = RawBlockMessage {
117148
header: header.bytes.clone(),
118149
body: body.to_vec(),
119-
});
120-
let message = Arc::new(Message::Cardano((info, available)));
150+
};
151+
if let Some(cache) = self.upstream_cache.as_mut() {
152+
let record = UpstreamCacheRecord {
153+
id: info.clone(),
154+
message: Arc::new(raw_block.clone()),
155+
};
156+
cache.write_record(&record)?;
157+
}
158+
let message = Arc::new(Message::Cardano((
159+
info,
160+
CardanoMessage::BlockAvailable(raw_block),
161+
)));
121162
self.context.publish(&self.topic, message).await
122163
}
123164

modules/upstream_chain_fetcher/src/body_fetcher.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
//! Acropolis Miniprotocols module for Caryatid
22
//! Multi-connection, block body fetching part of the client (in separate thread).
33
4-
use acropolis_common::{messages::RawBlockMessage, BlockHash, BlockInfo, BlockStatus, Era};
4+
use acropolis_common::{
5+
messages::RawBlockMessage,
6+
upstream_cache::{UpstreamCache, UpstreamCacheRecord},
7+
BlockHash, BlockInfo, BlockStatus, Era,
8+
};
59
use anyhow::{bail, Result};
610
use crossbeam::channel::{Receiver, TryRecvError};
711
use pallas::{
@@ -15,7 +19,6 @@ use std::{sync::Arc, time::Duration};
1519
use tokio::{sync::Mutex, time::sleep};
1620
use tracing::{debug, error, info};
1721

18-
use crate::upstream_cache::{UpstreamCache, UpstreamCacheRecord};
1922
use crate::{
2023
utils,
2124
utils::{

modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use acropolis_common::{
55
genesis_values::GenesisValues,
66
messages::{CardanoMessage, Message},
7+
upstream_cache::{UpstreamCache, UpstreamCacheRecord},
78
BlockInfo,
89
};
910
use anyhow::{anyhow, bail, Result};
@@ -24,12 +25,10 @@ use tokio::{sync::Mutex, time::sleep};
2425
use tracing::{debug, error, info};
2526

2627
mod body_fetcher;
27-
mod upstream_cache;
2828
mod utils;
2929

3030
use crate::utils::FetchResult;
3131
use body_fetcher::BodyFetcher;
32-
use upstream_cache::{UpstreamCache, UpstreamCacheRecord};
3332
use utils::{FetcherConfig, SyncPoint};
3433

3534
const MAX_BODY_FETCHER_CHANNEL_LENGTH: usize = 100;

0 commit comments

Comments
 (0)