Skip to content

Commit 4647b72

Browse files
committed
fix: handle cache errors more gracefully
1 parent ece87d3 commit 4647b72

File tree

7 files changed

+72
-44
lines changed

7 files changed

+72
-44
lines changed

common/src/upstream_cache.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ pub struct FileStorage {
2323
}
2424

2525
impl FileStorage {
26-
pub fn new<P: AsRef<Path>>(path: P) -> Self {
27-
Self {
28-
path: path.as_ref().to_path_buf(),
29-
}
26+
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
27+
let path = path.as_ref().to_path_buf();
28+
std::fs::create_dir_all(&path)?;
29+
Ok(Self { path })
3030
}
3131

3232
fn get_file_name(&self, chunk_no: usize) -> PathBuf {
@@ -37,8 +37,8 @@ impl FileStorage {
3737
pub type UpstreamCache = UpstreamCacheImpl<FileStorage>;
3838

3939
impl UpstreamCache {
40-
pub fn new<P: AsRef<Path>>(path: P) -> Self {
41-
UpstreamCache::new_impl(FileStorage::new(path))
40+
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
41+
Ok(UpstreamCache::new_impl(FileStorage::new(path)?))
4242
}
4343
}
4444

@@ -124,7 +124,7 @@ impl<S: Storage> UpstreamCacheImpl<S> {
124124

125125
pub fn write_record(&mut self, record: &UpstreamCacheRecord) -> Result<()> {
126126
self.chunk_cached.push(record.clone());
127-
self.storage.write_chunk(self.current_chunk, &self.chunk_cached)?;
127+
self.storage.write_chunk(self.current_chunk, &self.chunk_cached).context("could not write cache record")?;
128128

129129
self.current_record += 1;
130130
if self.current_record >= self.density {
@@ -155,7 +155,7 @@ impl Storage for FileStorage {
155155
}
156156

157157
fn write_chunk(&mut self, chunk_no: usize, data: &[UpstreamCacheRecord]) -> Result<()> {
158-
let mut file = File::create(self.get_file_name(chunk_no))?;
158+
let mut file = File::create(self.get_file_name(chunk_no)).context("could not write chunk")?;
159159
file.write_all(&serde_json::to_vec(data)?)?;
160160
Ok(())
161161
}

modules/peer_network_interface/config.default.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ genesis-completion-topic = "cardano.sequence.bootstrapped"
44

55
node-addresses = [
66
"backbone.cardano.iog.io:3001",
7+
"backbone.mainnet.cardanofoundation.org:3001",
8+
"backbone.mainnet.emurgornd.com:3001",
79
]
810
magic-number = 764824073
911

modules/peer_network_interface/src/network.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ impl NetworkManager {
5858
while let Some(event) = self.events.recv().await {
5959
match event {
6060
NetworkEvent::PeerUpdate { peer, event } => {
61-
let maybe_publish_blocks = self.handle_peer_update(peer, event)?;
61+
let maybe_publish_blocks = self.handle_peer_update(peer, event);
6262
if maybe_publish_blocks {
6363
self.publish_blocks().await?;
6464
}
@@ -124,7 +124,7 @@ impl NetworkManager {
124124
// or when publishing messages to other modules. This avoids deadlock; if our event queue
125125
// is full and this method is blocked on writing to it, the queue can never drain.
126126
// Returns true if we might have new events to publish downstream.
127-
fn handle_peer_update(&mut self, peer: PeerId, event: PeerEvent) -> Result<bool> {
127+
fn handle_peer_update(&mut self, peer: PeerId, event: PeerEvent) -> bool {
128128
let is_preferred = self.preferred_upstream.is_some_and(|id| id == peer);
129129
match event {
130130
PeerEvent::ChainSync(PeerChainSyncEvent::RollForward(header)) => {
@@ -148,12 +148,12 @@ impl NetworkManager {
148148
break; // only fetch from one
149149
}
150150
}
151-
Ok(false)
151+
false
152152
}
153153
BlockStatus::Fetched(_) => {
154154
// If chainsync has requested a block which we've already fetched,
155155
// we might be able to publish one or more.
156-
Ok(is_preferred)
156+
is_preferred
157157
}
158158
}
159159
}
@@ -184,18 +184,18 @@ impl NetworkManager {
184184
}
185185
}
186186
}
187-
Ok(false)
187+
false
188188
}
189189
PeerEvent::BlockFetched(fetched) => {
190190
let Some(block) = self.blocks.get_mut(&fetched.hash) else {
191-
return Ok(false);
191+
return false;
192192
};
193193
block.set_body(&fetched.body);
194-
Ok(true)
194+
true
195195
}
196196
PeerEvent::Disconnected => {
197197
self.handle_disconnect(peer);
198-
Ok(false)
198+
false
199199
}
200200
}
201201
}

modules/peer_network_interface/src/peer_network_interface.rs

Lines changed: 51 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,16 @@ mod connection;
33
mod network;
44

55
use acropolis_common::{
6-
BlockInfo, BlockStatus,
7-
genesis_values::GenesisValues,
8-
messages::{CardanoMessage, Message, RawBlockMessage},
9-
upstream_cache::{UpstreamCache, UpstreamCacheRecord},
6+
BlockInfo, BlockStatus, genesis_values::GenesisValues, messages::{CardanoMessage, Message, RawBlockMessage}, upstream_cache::{UpstreamCache, UpstreamCacheRecord}
107
};
118
use anyhow::{Result, bail};
129
use caryatid_sdk::{Context, Module, Subscription, module};
1310
use config::Config;
1411
use pallas::network::miniprotocols::Point;
1512
use tokio::sync::mpsc;
13+
use tracing::{error, warn};
1614

17-
use std::{sync::Arc, time::Duration};
15+
use std::{path::Path, sync::Arc, time::Duration};
1816

1917
use crate::{
2018
configuration::{InterfaceConfig, SyncPoint},
@@ -53,19 +51,17 @@ impl PeerNetworkInterface {
5351
};
5452

5553
let mut upstream_cache = None;
56-
let mut cache_sync_point = None;
54+
let mut cache_sync_point = Point::Origin;
5755
if cfg.sync_point == SyncPoint::Cache {
58-
let mut cache = UpstreamCache::new(&cfg.cache_dir);
59-
cache.start_reading()?;
60-
while let Some(record) = cache.read_record()? {
61-
cache_sync_point = Some((record.id.slot, record.id.hash));
62-
let message = Arc::new(Message::Cardano((
63-
record.id,
64-
CardanoMessage::BlockAvailable(Arc::unwrap_or_clone(record.message)),
65-
)));
66-
context.message_bus.publish(&cfg.block_topic, message).await?;
56+
match Self::init_cache(&cfg.cache_dir, &cfg.block_topic, &context).await {
57+
Ok((cache, sync_point)) => {
58+
upstream_cache = Some(cache);
59+
cache_sync_point = sync_point;
60+
}
61+
Err(e) => {
62+
warn!("could not initialize upstream cache: {e:#}");
63+
}
6764
}
68-
upstream_cache = Some(cache);
6965
}
7066

7167
let sink = BlockSink {
@@ -83,28 +79,57 @@ impl PeerNetworkInterface {
8379

8480
match cfg.sync_point {
8581
SyncPoint::Origin => manager.sync_to_point(Point::Origin),
86-
SyncPoint::Tip => manager.sync_to_tip().await?,
87-
SyncPoint::Cache => {
88-
let point = match cache_sync_point {
89-
Some((slot, hash)) => Point::Specific(slot, hash.to_vec()),
90-
None => Point::Origin,
91-
};
92-
manager.sync_to_point(point);
82+
SyncPoint::Tip => {
83+
if let Err(error) = manager.sync_to_tip().await {
84+
warn!("could not sync to tip: {error:#}");
85+
return;
86+
}
9387
}
88+
SyncPoint::Cache => manager.sync_to_point(cache_sync_point),
9489
SyncPoint::Snapshot => {
9590
let mut subscription =
9691
snapshot_complete.expect("Snapshot topic subscription missing");
97-
let point = Self::wait_snapshot_completion(&mut subscription).await?;
98-
manager.sync_to_point(point);
92+
match Self::wait_snapshot_completion(&mut subscription).await {
93+
Ok(point) => manager.sync_to_point(point),
94+
Err(error) => {
95+
warn!("snapshot restoration never completed: {error:#}");
96+
return;
97+
}
98+
}
9999
}
100100
}
101101

102-
manager.run().await
102+
if let Err(err) = manager.run().await {
103+
error!("chain sync failed: {err:#}");
104+
}
103105
});
104106

105107
Ok(())
106108
}
107109

110+
async fn init_cache(
111+
cache_dir: &Path,
112+
block_topic: &str,
113+
context: &Context<Message>,
114+
) -> Result<(UpstreamCache, Point)> {
115+
let mut cache = UpstreamCache::new(cache_dir)?;
116+
let mut cache_sync_point = None;
117+
cache.start_reading()?;
118+
while let Some(record) = cache.read_record()? {
119+
cache_sync_point = Some((record.id.slot, record.id.hash));
120+
let message = Arc::new(Message::Cardano((
121+
record.id,
122+
CardanoMessage::BlockAvailable(Arc::unwrap_or_clone(record.message)),
123+
)));
124+
context.message_bus.publish(block_topic, message).await?;
125+
}
126+
let sync_point = match cache_sync_point {
127+
None => Point::Origin,
128+
Some((slot, hash)) => Point::Specific(slot, hash.to_vec()),
129+
};
130+
Ok((cache, sync_point))
131+
}
132+
108133
async fn wait_genesis_completion(
109134
subscription: &mut Box<dyn Subscription<Message>>,
110135
) -> Result<GenesisValues> {

modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ impl UpstreamChainFetcher {
227227
Self::sync_to_point(cfg, None, Point::Origin).await?;
228228
}
229229
SyncPoint::Cache => {
230-
let mut upstream_cache = UpstreamCache::new(&cfg.cache_dir);
230+
let mut upstream_cache = UpstreamCache::new(&cfg.cache_dir)?;
231231
let point = match Self::read_cache(cfg.clone(), &mut upstream_cache).await? {
232232
None => Point::Origin,
233233
Some(blk) => Point::Specific(blk.slot, blk.hash.to_vec()),

processes/omnibus/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ sled-immutable-utxos
33
fjall-blocks
44
fjall-immutable-utxos
55
cache
6+
upstream-cache
67

78
# DB files
89
*_db

processes/omnibus/omnibus-local.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Top-level configuration for Acropolis omnibus process
22

33
[module.genesis-bootstrapper]
4-
network-name = "preview" # "sanchonet", "mainnet"
4+
network-name = "preview" # "sanchonet", "preview", "mainnet"
55

66
#[module.mithril-snapshot-fetcher]
77
#Turned off with SanchoNet

0 commit comments

Comments
 (0)