Skip to content

Commit a35fa64

Browse files
committed
refactor: avoid overwriting blocks when clear-on-start is false
Signed-off-by: William Hankins <[email protected]>
1 parent 2a2ab2d commit a35fa64

File tree

3 files changed

+61
-1
lines changed

3 files changed

+61
-1
lines changed

modules/chain_store/src/chain_store.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,15 @@ impl ChainStore {
9494
// Get promise of params message so the params queue is cleared and
9595
// the message is ready as soon as possible when we need it
9696
let mut params_message = params_subscription.read();
97+
98+
// Validate the first stored block matches what is already persisted when clear-on-start is false
99+
let Ok((_, first_block_message)) = new_blocks_subscription.read().await else {
100+
return;
101+
};
102+
if let Err(err) = Self::handle_first_block(&store, &first_block_message) {
103+
panic!("Corrupted DB: {err}")
104+
};
105+
97106
loop {
98107
let Ok((_, message)) = new_blocks_subscription.read().await else {
99108
return;
@@ -129,7 +138,38 @@ impl ChainStore {
129138
bail!("Unexpected message type: {message:?}");
130139
};
131140

132-
store.insert_block(info, &raw_block.body)
141+
if store.should_persist(info.number) {
142+
store.insert_block(info, &raw_block.body)?;
143+
}
144+
145+
Ok(())
146+
}
147+
148+
fn handle_first_block(store: &Arc<dyn Store>, message: &Message) -> Result<()> {
149+
let Message::Cardano((block_info, CardanoMessage::BlockAvailable(raw_block))) = message
150+
else {
151+
bail!("Unexpected message type: {message:?}");
152+
};
153+
154+
if !store.should_persist(block_info.number) {
155+
if let Some(existing) = store.get_block_by_number(block_info.number)? {
156+
if existing.bytes != raw_block.body {
157+
return Err(anyhow::anyhow!(
158+
"Stored block {} does not match. Set clear-store to true",
159+
block_info.number
160+
));
161+
}
162+
} else {
163+
return Err(anyhow::anyhow!(
164+
"Unable to retrieve block {}. Set clear-store to true",
165+
block_info.number
166+
));
167+
}
168+
}
169+
170+
Self::handle_new_block(store, message)?;
171+
172+
Ok(())
133173
}
134174

135175
fn handle_blocks_query(

modules/chain_store/src/stores/fjall.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub struct FjallStore {
1111
keyspace: Keyspace,
1212
blocks: FjallBlockStore,
1313
txs: FjallTXStore,
14+
last_persisted_block: Option<u64>,
1415
}
1516

1617
const DEFAULT_DATABASE_PATH: &str = "fjall-blocks";
@@ -33,10 +34,20 @@ impl FjallStore {
3334
let keyspace = fjall_config.open()?;
3435
let blocks = FjallBlockStore::new(&keyspace)?;
3536
let txs = FjallTXStore::new(&keyspace)?;
37+
38+
let last_persisted_block = if !clear {
39+
blocks.block_hashes_by_number.iter().next_back().and_then(|res| {
40+
res.ok().and_then(|(key, _)| key.as_ref().try_into().ok().map(u64::from_be_bytes))
41+
})
42+
} else {
43+
None
44+
};
45+
3646
Ok(Self {
3747
keyspace,
3848
blocks,
3949
txs,
50+
last_persisted_block,
4051
})
4152
}
4253
}
@@ -69,6 +80,13 @@ impl super::Store for FjallStore {
6980
Ok(())
7081
}
7182

83+
fn should_persist(&self, block_number: u64) -> bool {
84+
match self.last_persisted_block {
85+
Some(last) => block_number > last,
86+
None => false,
87+
}
88+
}
89+
7290
fn get_block_by_hash(&self, hash: &[u8]) -> Result<Option<Block>> {
7391
self.blocks.get_by_hash(hash)
7492
}
@@ -117,6 +135,7 @@ impl FjallBlockStore {
117135
BLOCK_HASHES_BY_EPOCH_SLOT_PARTITION,
118136
fjall::PartitionCreateOptions::default(),
119137
)?;
138+
120139
Ok(Self {
121140
blocks,
122141
block_hashes_by_slot,

modules/chain_store/src/stores/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ pub mod fjall;
55

66
pub trait Store: Send + Sync {
77
fn insert_block(&self, info: &BlockInfo, block: &[u8]) -> Result<()>;
8+
fn should_persist(&self, block_number: u64) -> bool;
89

910
fn get_block_by_hash(&self, hash: &[u8]) -> Result<Option<Block>>;
1011
fn get_block_by_slot(&self, slot: u64) -> Result<Option<Block>>;

0 commit comments

Comments
 (0)