Skip to content

Commit 36a665b

Browse files
authored
Merge pull request #1054 from openmina/fix/ledger/mask_leak
Fix: ledger stores more staged ledgers than it needs
2 parents 7e4bb21 + 6efb1bb commit 36a665b

File tree

13 files changed

+163
-51
lines changed

13 files changed

+163
-51
lines changed

cli/src/commands/replay/replay_state_with_input_actions.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ pub struct ReplayStateWithInputActions {
1010
#[arg(long, default_value = "./target/release/libreplay_dynamic_effects.so")]
1111
pub dynamic_effects_lib: String,
1212

13+
#[arg(long)]
14+
pub ignore_mismatch: bool,
15+
1316
/// Verbosity level
1417
#[arg(long, short, default_value = "info")]
1518
pub verbosity: tracing::Level,
@@ -30,13 +33,22 @@ impl ReplayStateWithInputActions {
3033
}
3134
};
3235

33-
replay_state_with_input_actions(&dir, dynamic_effects_lib, check_build_env)?;
36+
replay_state_with_input_actions(
37+
&dir,
38+
dynamic_effects_lib,
39+
self.ignore_mismatch,
40+
check_build_env,
41+
)?;
3442

3543
Ok(())
3644
}
3745
}
3846

39-
pub fn check_build_env(record_env: &BuildEnv, replay_env: &BuildEnv) -> anyhow::Result<()> {
47+
pub fn check_build_env(
48+
record_env: &BuildEnv,
49+
replay_env: &BuildEnv,
50+
ignore_mismatch: bool,
51+
) -> anyhow::Result<()> {
4052
let is_git_same = record_env.git.commit_hash == replay_env.git.commit_hash;
4153
let is_cargo_same = record_env.cargo == replay_env.cargo;
4254
let is_rustc_same = record_env.rustc == replay_env.rustc;
@@ -47,7 +59,8 @@ pub fn check_build_env(record_env: &BuildEnv, replay_env: &BuildEnv) -> anyhow::
4759
record_env.git, replay_env.git
4860
);
4961
let msg = format!("git build env mismatch!\n{diff}");
50-
if console::user_attended() {
62+
if ignore_mismatch {
63+
} else if console::user_attended() {
5164
use dialoguer::Confirm;
5265

5366
let prompt = format!("{msg}\nDo you want to continue?");

ledger/src/mask/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ pub fn alive_len() -> usize {
4141
exec(|list| list.len())
4242
}
4343

44-
pub fn alive_collect() -> Vec<Uuid> {
44+
pub fn alive_collect<B>() -> B
45+
where
46+
B: FromIterator<Uuid>,
47+
{
4548
exec(|list| list.iter().cloned().collect())
4649
}

ledger/src/staged_ledger/staged_ledger.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@ pub struct StagedLedger {
116116
}
117117

118118
impl StagedLedger {
119-
#[cfg(feature = "fuzzing")]
120119
pub fn ledger_ref(&self) -> &Mask {
121120
&self.ledger
122121
}

mina-p2p-messages/src/v2/hashing.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ impl Serialize for TransactionHash {
115115
if serializer.is_human_readable() {
116116
serializer.serialize_str(&self.to_string())
117117
} else {
118-
self.0.serialize(serializer)
118+
serde_bytes::serialize(&*self.0, serializer)
119119
}
120120
}
121121
}
@@ -129,9 +129,7 @@ impl<'de> serde::Deserialize<'de> for TransactionHash {
129129
let b58: String = Deserialize::deserialize(deserializer)?;
130130
Ok(b58.parse().map_err(|err| serde::de::Error::custom(err))?)
131131
} else {
132-
let v = Vec::deserialize(deserializer)?;
133-
v.try_into()
134-
.map_err(|_| serde::de::Error::custom("transaction hash wrong size"))
132+
serde_bytes::deserialize(deserializer)
135133
.map(Arc::new)
136134
.map(Self)
137135
}

node/native/src/replay.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
use std::cell::RefCell;
22

33
use node::{
4-
core::thread, recorder::StateWithInputActionsReader, snark::BlockVerifier, ActionWithMeta,
5-
BuildEnv, Store,
4+
core::thread,
5+
recorder::StateWithInputActionsReader,
6+
snark::{BlockVerifier, TransactionVerifier},
7+
ActionWithMeta, BuildEnv, Store,
68
};
79

810
use crate::NodeService;
911

1012
pub fn replay_state_with_input_actions(
1113
dir: &str,
1214
dynamic_effects_lib: Option<String>,
13-
mut check_build_env: impl FnMut(&BuildEnv, &BuildEnv) -> anyhow::Result<()>,
15+
ignore_mismatch: bool,
16+
mut check_build_env: impl FnMut(&BuildEnv, &BuildEnv, bool) -> anyhow::Result<()>,
1417
) -> anyhow::Result<crate::Node> {
1518
eprintln!("replaying node based on initial state and actions from the dir: {dir}");
1619
let reader = StateWithInputActionsReader::new(dir);
@@ -31,6 +34,8 @@ pub fn replay_state_with_input_actions(
3134
// index/srs doesn't match deserialized one.
3235
state.snark.block_verify.verifier_index = BlockVerifier::make();
3336
state.snark.block_verify.verifier_srs = node::snark::get_srs();
37+
state.snark.user_command_verify.verifier_index = TransactionVerifier::make();
38+
state.snark.user_command_verify.verifier_srs = node::snark::get_srs();
3439
state
3540
};
3641

@@ -46,7 +51,7 @@ pub fn replay_state_with_input_actions(
4651
let store = node.store_mut();
4752

4853
let replay_env = BuildEnv::get();
49-
check_build_env(&store.state().config.build, &replay_env)?;
54+
check_build_env(&store.state().config.build, &replay_env, ignore_mismatch)?;
5055

5156
eprintln!("reading actions from dir: {dir}");
5257

node/src/ledger/ledger_reducer.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,16 @@ impl LedgerState {
1717
} = action
1818
{
1919
if let Ok(state) = state_context.get_substate_mut() {
20+
if result.alive_masks > 294 {
21+
// TODO(binier): should be a bug condition, but can't be
22+
// because we get false positive during testing, since
23+
// multiple nodes/ledger run in the same process.
24+
openmina_core::log::warn!(
25+
meta.time();
26+
"ledger mask leak: more than 294 ledger masks ({}) detected!",
27+
result.alive_masks
28+
);
29+
}
2030
state.alive_masks = result.alive_masks;
2131
}
2232
}

node/src/ledger/ledger_service.rs

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
use super::{
22
ledger_empty_hash_at_depth,
3-
read::LedgerReadResponse,
4-
read::{LedgerReadId, LedgerReadRequest},
5-
write::CommitResult,
6-
write::LedgerWriteRequest,
7-
write::LedgerWriteResponse,
3+
read::{LedgerReadId, LedgerReadRequest, LedgerReadResponse},
4+
write::{CommitResult, LedgerWriteRequest, LedgerWriteResponse, LedgersToKeep},
85
LedgerAddress, LedgerEvent, LEDGER_DEPTH,
96
};
107
use crate::{
@@ -65,8 +62,8 @@ use mina_p2p_messages::{
6562
};
6663
use mina_signer::CompressedPubKey;
6764
use openmina_core::{
68-
block::AppliedBlock,
69-
block::ArcBlockWithHash,
65+
block::{AppliedBlock, ArcBlockWithHash},
66+
bug_condition,
7067
constants::constraint_constants,
7168
snark::{Snark, SnarkJobId},
7269
thread,
@@ -137,18 +134,18 @@ impl StagedLedgersStorage {
137134

138135
fn retain<F>(&mut self, fun: F)
139136
where
140-
F: Fn(&LedgerHash, &[Arc<MinaBaseStagedLedgerHashStableV1>]) -> bool,
137+
F: Fn(&MinaBaseStagedLedgerHashStableV1) -> bool,
141138
{
142-
self.by_merkle_root_hash
143-
.retain(|merkle_root_hash, staged_ledger_hashes| {
144-
let retain = fun(merkle_root_hash, staged_ledger_hashes.as_slice());
145-
if !retain {
146-
for staged_ledger_hash in staged_ledger_hashes {
147-
self.staged_ledgers.remove(staged_ledger_hash);
148-
}
139+
self.by_merkle_root_hash.retain(|_, staged_ledger_hashes| {
140+
staged_ledger_hashes.retain(|hash| {
141+
if fun(hash) {
142+
return true;
149143
}
150-
retain
144+
self.staged_ledgers.remove(hash);
145+
false
151146
});
147+
!staged_ledger_hashes.is_empty()
148+
});
152149
}
153150

154151
fn extend<I>(&mut self, iterator: I)
@@ -857,7 +854,7 @@ impl LedgerCtx {
857854

858855
pub fn commit(
859856
&mut self,
860-
ledgers_to_keep: BTreeSet<LedgerHash>,
857+
ledgers_to_keep: LedgersToKeep,
861858
root_snarked_ledger_updates: TransitionFrontierRootSnarkedLedgerUpdates,
862859
needed_protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
863860
new_root: &ArcBlockWithHash,
@@ -902,13 +899,13 @@ impl LedgerCtx {
902899
);
903900

904901
self.staged_ledgers
905-
.retain(|hash, _| ledgers_to_keep.contains(hash));
902+
.retain(|hash| ledgers_to_keep.contains(hash));
906903
self.staged_ledgers.extend(
907904
self.sync
908905
.staged_ledgers
909906
.take()
910907
.into_iter()
911-
.filter(|(hash, _)| ledgers_to_keep.contains(&hash.non_snark.ledger_hash)),
908+
.filter(|(hash, _)| ledgers_to_keep.contains(&**hash)),
912909
);
913910

914911
for ledger_hash in [
@@ -973,13 +970,45 @@ impl LedgerCtx {
973970
.unwrap_or_default(),
974971
);
975972

973+
// self.check_alive_masks();
974+
976975
CommitResult {
977976
alive_masks: ::ledger::mask::alive_len(),
978977
available_jobs,
979978
needed_protocol_states,
980979
}
981980
}
982981

982+
#[allow(dead_code)]
983+
fn check_alive_masks(&mut self) {
984+
let mut alive: BTreeSet<_> = ::ledger::mask::alive_collect();
985+
let staged_ledgers = self
986+
.staged_ledgers
987+
.staged_ledgers
988+
.iter()
989+
.map(|(hash, ledger)| (&hash.non_snark.ledger_hash, ledger.ledger_ref()));
990+
991+
let alive_ledgers = self
992+
.snarked_ledgers
993+
.iter()
994+
.chain(staged_ledgers)
995+
.map(|(hash, mask)| {
996+
let uuid = mask.get_uuid();
997+
if !alive.remove(&uuid) {
998+
bug_condition!("mask not found among alive masks! uuid: {uuid}, hash: {hash}");
999+
}
1000+
(uuid, hash)
1001+
})
1002+
.collect::<Vec<_>>();
1003+
openmina_core::debug!(redux::Timestamp::global_now(); "alive_ledgers_after_commit: {alive_ledgers:#?}");
1004+
1005+
if !alive.is_empty() {
1006+
bug_condition!(
1007+
"masks alive which are no longer part of the ledger service: {alive:#?}"
1008+
);
1009+
}
1010+
}
1011+
9831012
pub fn get_num_accounts(
9841013
&mut self,
9851014
ledger_hash: v2::LedgerHash,

node/src/ledger/write/mod.rs

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ pub enum LedgerWriteRequest {
5454
skip_verification: bool,
5555
},
5656
Commit {
57-
ledgers_to_keep: BTreeSet<v2::LedgerHash>,
57+
ledgers_to_keep: LedgersToKeep,
5858
root_snarked_ledger_updates: TransitionFrontierRootSnarkedLedgerUpdates,
5959
needed_protocol_states: BTreeMap<v2::StateHash, v2::MinaStateProtocolStateValueStableV2>,
6060
new_root: AppliedBlock,
@@ -162,6 +162,64 @@ impl TryFrom<&BlockApplyResult> for v2::ArchiveTransitionFronntierDiff {
162162
}
163163
}
164164

165+
#[derive(Serialize, Deserialize, Debug, Ord, PartialOrd, Eq, PartialEq, Default, Clone)]
166+
pub struct LedgersToKeep {
167+
snarked: BTreeSet<v2::LedgerHash>,
168+
staged: BTreeSet<Arc<v2::MinaBaseStagedLedgerHashStableV1>>,
169+
}
170+
171+
impl LedgersToKeep {
172+
pub fn new() -> Self {
173+
Self::default()
174+
}
175+
176+
pub fn contains<'a, T>(&self, key: T) -> bool
177+
where
178+
T: 'a + Into<LedgerToKeep<'a>>,
179+
{
180+
match key.into() {
181+
LedgerToKeep::Snarked(hash) => self.snarked.contains(hash),
182+
LedgerToKeep::Staged(hash) => self.staged.contains(hash),
183+
}
184+
}
185+
186+
pub fn add_snarked(&mut self, hash: v2::LedgerHash) -> bool {
187+
self.snarked.insert(hash)
188+
}
189+
190+
pub fn add_staged(&mut self, hash: Arc<v2::MinaBaseStagedLedgerHashStableV1>) -> bool {
191+
self.staged.insert(hash)
192+
}
193+
}
194+
195+
impl<'a> FromIterator<&'a ArcBlockWithHash> for LedgersToKeep {
196+
fn from_iter<T: IntoIterator<Item = &'a ArcBlockWithHash>>(iter: T) -> Self {
197+
let mut res = Self::new();
198+
let best_tip = iter.into_iter().fold(None, |best_tip, block| {
199+
res.add_snarked(block.snarked_ledger_hash().clone());
200+
res.add_staged(Arc::new(block.staged_ledger_hashes().clone()));
201+
match best_tip {
202+
None => Some(block),
203+
Some(tip) if tip.height() < block.height() => Some(block),
204+
old => old,
205+
}
206+
});
207+
208+
if let Some(best_tip) = best_tip {
209+
res.add_snarked(best_tip.staking_epoch_ledger_hash().clone());
210+
res.add_snarked(best_tip.next_epoch_ledger_hash().clone());
211+
}
212+
213+
res
214+
}
215+
}
216+
217+
#[derive(derive_more::From)]
218+
pub enum LedgerToKeep<'a> {
219+
Snarked(&'a v2::LedgerHash),
220+
Staged(&'a v2::MinaBaseStagedLedgerHashStableV1),
221+
}
222+
165223
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
166224
pub struct CommitResult {
167225
pub alive_masks: usize,

node/src/recorder/recorder.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ impl Recorder {
109109
let mut writer = BufWriter::new(file);
110110

111111
let encoded = data.encode().unwrap();
112+
// RecordedActionWithMeta::decode(&encoded)
113+
// .expect(&format!("failed to decode encoded message: {:?}", data));
112114
writer
113115
.write_all(&(encoded.len() as u64).to_be_bytes())
114116
.unwrap();

node/src/transition_frontier/sync/transition_frontier_sync_effects.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use p2p::channels::rpc::{P2pChannelsRpcAction, P2pRpcId};
44
use p2p::{P2pNetworkPubsubAction, PeerId};
55
use redux::ActionMeta;
66

7-
use crate::ledger::write::{LedgerWriteAction, LedgerWriteRequest};
7+
use crate::ledger::write::{LedgerWriteAction, LedgerWriteRequest, LedgersToKeep};
88
use crate::p2p::channels::rpc::P2pRpcRequest;
99
use crate::service::TransitionFrontierSyncLedgerSnarkedService;
1010
use crate::{p2p_ready, Service, Store, TransitionFrontierAction};
@@ -354,16 +354,8 @@ impl TransitionFrontierSyncAction {
354354
};
355355
let ledgers_to_keep = chain
356356
.iter()
357-
.flat_map(|b| {
358-
[
359-
b.snarked_ledger_hash(),
360-
b.merkle_root_hash(),
361-
b.staking_epoch_ledger_hash(),
362-
b.next_epoch_ledger_hash(),
363-
]
364-
})
365-
.cloned()
366-
.collect();
357+
.map(|block| &block.block)
358+
.collect::<LedgersToKeep>();
367359
let mut root_snarked_ledger_updates = root_snarked_ledger_updates.clone();
368360
if transition_frontier
369361
.best_chain

0 commit comments

Comments
 (0)