Skip to content

Commit db91ccb

Browse files
committed
fix(testing): identify non-deterministic events and add special handling for them
re: #188 With this commit non-determinism is fixed everywhere, except gossipsub.
1 parent ae9057e commit db91ccb

File tree

16 files changed

+430
-55
lines changed

16 files changed

+430
-55
lines changed

node/src/block_producer/block_producer_actions.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
use std::cmp::Ordering;
2-
31
use mina_p2p_messages::v2::{
42
ConsensusBodyReferenceStableV1, LedgerProofProdStableV2, MinaBaseStagedLedgerHashStableV1,
53
StagedLedgerDiffDiffStableV2,

node/src/block_producer/block_producer_effects.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ use super::{
1414
BlockProducerStagedLedgerDiffCreatePendingAction,
1515
BlockProducerStagedLedgerDiffCreateSuccessAction, BlockProducerWonSlot,
1616
BlockProducerWonSlotAction, BlockProducerWonSlotDiscardAction,
17-
BlockProducerWonSlotDiscardReason, BlockProducerWonSlotProduceInitAction,
18-
BlockProducerWonSlotSearchAction, BlockProducerWonSlotWaitAction,
17+
BlockProducerWonSlotProduceInitAction, BlockProducerWonSlotSearchAction,
18+
BlockProducerWonSlotWaitAction,
1919
};
2020

2121
pub fn block_producer_effects<S: crate::Service>(

node/src/block_producer/block_producer_reducer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ impl BlockProducerEnabled {
437437
time: meta.time(),
438438
won_slot: won_slot.clone(),
439439
chain: std::mem::take(chain),
440-
block: dbg!(block).clone(),
440+
block: block.clone(),
441441
};
442442
}
443443
}

node/src/rpc/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ pub use rpc_service::*;
2020

2121
mod rpc_impls;
2222

23+
pub use openmina_core::requests::{RpcId, RpcIdType};
24+
2325
use ledger::scan_state::scan_state::transaction_snark::OneOrTwo;
2426
use ledger::scan_state::scan_state::AvailableJobMessage;
2527
use mina_p2p_messages::v2::{CurrencyFeeStableV1, NonZeroCurvePoint};
26-
pub use openmina_core::requests::{RpcId, RpcIdType};
2728
use openmina_core::snark::SnarkJobId;
2829
use redux::Timestamp;
2930
use serde::{Deserialize, Serialize};

node/testing/src/cluster/config.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::node::OcamlNodeExecutable;
66
pub struct ClusterConfig {
77
port_range: Option<(u16, u16)>,
88
all_rust_to_rust_use_webrtc: bool,
9+
is_replay: bool,
910
use_debugger: bool,
1011
ocaml_node_executable: OcamlNodeExecutable,
1112
}
@@ -15,6 +16,7 @@ impl ClusterConfig {
1516
Ok(Self {
1617
port_range: None,
1718
all_rust_to_rust_use_webrtc: false,
19+
is_replay: false,
1820
use_debugger: false,
1921
ocaml_node_executable: match ocaml_node_executable {
2022
Some(v) => v,
@@ -28,6 +30,19 @@ impl ClusterConfig {
2830
self
2931
}
3032

33+
pub fn is_use_debugger(&self) -> bool {
34+
self.use_debugger
35+
}
36+
37+
pub fn set_replay(mut self) -> Self {
38+
self.is_replay = true;
39+
self
40+
}
41+
42+
pub fn is_replay(&self) -> bool {
43+
self.is_replay
44+
}
45+
3146
pub fn port_range(&self) -> std::ops::RangeInclusive<u16> {
3247
let range = self.port_range.unwrap_or((11_000, 49_151));
3348
(range.0)..=(range.1)
@@ -43,10 +58,6 @@ impl ClusterConfig {
4358
self.all_rust_to_rust_use_webrtc
4459
}
4560

46-
pub fn is_use_debugger(&self) -> bool {
47-
self.use_debugger
48-
}
49-
5061
pub fn set_ocaml_node_executable(mut self, executable: OcamlNodeExecutable) -> Self {
5162
self.ocaml_node_executable = executable;
5263
self

node/testing/src/cluster/mod.rs

Lines changed: 163 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ use ledger::proofs::{VerifierIndex, VerifierSRS};
1515
use libp2p::futures::{stream::FuturesUnordered, StreamExt};
1616
use node::core::channels::mpsc;
1717
use node::core::requests::RpcId;
18+
use node::p2p::connection::outgoing::P2pConnectionOutgoingInitOpts;
19+
use node::p2p::{P2pConnectionEvent, P2pDiscoveryEvent, PeerId};
1820
use node::{
1921
account::{AccountPublicKey, AccountSecretKey},
2022
event_source::Event,
@@ -38,7 +40,7 @@ use openmina_node_native::{http_server, rpc::RpcService, NodeService, RpcSender}
3840
use rand::{rngs::StdRng, SeedableRng};
3941
use serde::Serialize;
4042

41-
use crate::node::{DaemonJson, OcamlStep, TestPeerId};
43+
use crate::node::{DaemonJson, NonDeterministicEvent, OcamlStep, TestPeerId};
4244
use crate::{
4345
network_debugger::Debugger,
4446
node::{
@@ -277,6 +279,9 @@ impl Cluster {
277279
if self.config.all_rust_to_rust_use_webrtc() {
278280
service.set_rust_to_rust_use_webrtc();
279281
}
282+
if self.config.is_replay() {
283+
service.set_replay();
284+
}
280285

281286
let state = node::State::new(config);
282287
fn effects(store: &mut node::Store<NodeTestingService>, action: node::ActionWithMeta) {
@@ -409,6 +414,12 @@ impl Cluster {
409414
self.nodes.get(node_id.index())
410415
}
411416

417+
pub fn node_by_peer_id(&self, peer_id: PeerId) -> Option<&Node> {
418+
self.nodes_iter()
419+
.find(|(_, node)| node.peer_id() == peer_id)
420+
.map(|(_, node)| node)
421+
}
422+
412423
pub fn node_mut(&mut self, node_id: ClusterNodeId) -> Option<&mut Node> {
413424
self.nodes.get_mut(node_id.index())
414425
}
@@ -419,6 +430,12 @@ impl Cluster {
419430
.map(|opt| opt.as_ref().expect("tried to access removed ocaml node"))
420431
}
421432

433+
pub fn ocaml_node_by_peer_id(&self, peer_id: PeerId) -> Option<&OcamlNode> {
434+
self.ocaml_nodes_iter()
435+
.find(|(_, node)| node.peer_id() == peer_id)
436+
.map(|(_, node)| node)
437+
}
438+
422439
pub fn pending_events(
423440
&mut self,
424441
) -> impl Iterator<
@@ -471,6 +488,35 @@ impl Cluster {
471488
}
472489
}
473490

491+
pub async fn wait_for_pending_event(
492+
&mut self,
493+
node_id: ClusterNodeId,
494+
event_pattern: &str,
495+
) -> anyhow::Result<PendingEventId> {
496+
let node = self
497+
.nodes
498+
.get_mut(node_id.index())
499+
.ok_or_else(|| anyhow::anyhow!("node {node_id:?} not found"))?;
500+
let timeout = tokio::time::sleep(Duration::from_secs(60));
501+
tokio::select! {
502+
opt = node.wait_for_event(&event_pattern) => opt.ok_or_else(|| anyhow::anyhow!("wait_for_event: None")),
503+
_ = timeout => {
504+
let pending_events = node.pending_events().map(|(_, event)| event.to_string()).collect::<Vec<_>>();
505+
return Err(anyhow::anyhow!("waiting for event timed out! node {node_id:?}, event: \"{event_pattern}\"\n{pending_events:?}"));
506+
}
507+
}
508+
}
509+
510+
pub async fn wait_for_event_and_dispatch(
511+
&mut self,
512+
node_id: ClusterNodeId,
513+
event_pattern: &str,
514+
) -> anyhow::Result<bool> {
515+
let event_id = self.wait_for_pending_event(node_id, event_pattern).await?;
516+
let node = self.nodes.get_mut(node_id.index()).unwrap();
517+
Ok(node.take_event_and_dispatch(event_id))
518+
}
519+
474520
pub async fn add_steps_and_save(&mut self, steps: impl IntoIterator<Item = ScenarioStep>) {
475521
let scenario = self.scenario.chain.back_mut().unwrap();
476522
steps
@@ -480,10 +526,14 @@ impl Cluster {
480526
}
481527

482528
pub async fn exec_to_end(&mut self) -> Result<(), anyhow::Error> {
529+
let mut i = 0;
530+
let total = self.scenario.cur_scenario().steps.len();
483531
loop {
532+
eprintln!("[step]: {i}/{total}");
484533
if !self.exec_next().await? {
485534
break Ok(());
486535
}
536+
i += 1;
487537
}
488538
}
489539

@@ -545,29 +595,129 @@ impl Cluster {
545595
Ok(dispatched)
546596
}
547597

548-
pub async fn exec_step(&mut self, step: ScenarioStep) -> Result<bool, anyhow::Error> {
598+
pub async fn exec_step(&mut self, step: ScenarioStep) -> anyhow::Result<bool> {
599+
fn node_addr_by_peer_id(
600+
cluster: &Cluster,
601+
peer_id: PeerId,
602+
) -> anyhow::Result<P2pConnectionOutgoingInitOpts> {
603+
cluster
604+
.node_by_peer_id(peer_id)
605+
.map(|node| node.dial_addr())
606+
.or_else(|| {
607+
cluster
608+
.ocaml_node_by_peer_id(peer_id)
609+
.map(|node| node.dial_addr())
610+
})
611+
.ok_or_else(|| anyhow::anyhow!("node with peer_id: '{peer_id}' not found"))
612+
}
613+
549614
Ok(match step {
615+
ScenarioStep::Event { node_id, event } => {
616+
return self.wait_for_event_and_dispatch(node_id, &event).await;
617+
}
550618
ScenarioStep::ManualEvent { node_id, event } => self
551619
.nodes
552620
.get_mut(node_id.index())
553621
.ok_or_else(|| anyhow::anyhow!("node {node_id:?} not found"))?
554622
.dispatch_event(*event),
555-
ScenarioStep::Event { node_id, event } => {
556-
let node = self
557-
.nodes
558-
.get_mut(node_id.index())
559-
.ok_or_else(|| anyhow::anyhow!("node {node_id:?} not found"))?;
560-
let timeout = tokio::time::sleep(Duration::from_secs(60));
561-
tokio::select! {
562-
res = node.wait_for_event_and_dispatch(&event) => res,
563-
_ = timeout => {
564-
return Err(anyhow::anyhow!("waiting for event timed out! node {node_id:?}, event: \"{event}\""));
623+
ScenarioStep::NonDeterministicEvent { node_id, event } => {
624+
let event = match *event {
625+
NonDeterministicEvent::P2pListen => return Ok(true),
626+
NonDeterministicEvent::P2pConnectionClosed(peer_id) => {
627+
let node = self
628+
.nodes
629+
.get_mut(node_id.index())
630+
.ok_or_else(|| anyhow::anyhow!("node {node_id:?} not found"))?;
631+
node.p2p_disconnect(peer_id);
632+
let event =
633+
Event::P2p(P2pEvent::Connection(P2pConnectionEvent::Closed(peer_id)));
634+
return self
635+
.wait_for_event_and_dispatch(node_id, &event.to_string())
636+
.await;
565637
}
566-
}
638+
NonDeterministicEvent::P2pConnectionFinalized(peer_id, res) => {
639+
let node = self
640+
.nodes
641+
.get(node_id.index())
642+
.ok_or_else(|| anyhow::anyhow!("node {node_id:?} not found"))?;
643+
let res_is_ok = res.is_ok();
644+
let event = Event::P2p(P2pEvent::Connection(
645+
P2pConnectionEvent::Finalized(peer_id, res),
646+
));
647+
648+
if res_is_ok {
649+
let is_peer_connected =
650+
node.state().p2p.get_ready_peer(&peer_id).is_some();
651+
// deduce if kad initiated this conn.
652+
if !node.state().p2p.is_peer_connected_or_connecting(&peer_id) {
653+
let my_addr = node.dial_addr();
654+
let peer = self
655+
.nodes
656+
.iter_mut()
657+
.find(|node| node.peer_id() == peer_id)
658+
.ok_or_else(|| {
659+
anyhow::anyhow!("node with peer_id: '{peer_id}' not found")
660+
})?;
661+
662+
if !peer.state().p2p.is_peer_connecting(my_addr.peer_id()) {
663+
// kad initiated this connection so replay that.
664+
eprintln!(
665+
"p2p_kad_outgoing_init({:?}) -> {:?} - {}",
666+
peer.node_id(),
667+
node_id,
668+
my_addr
669+
);
670+
peer.p2p_kad_outgoing_init(my_addr);
671+
}
672+
}
673+
if is_peer_connected {
674+
// we are already connected, so skip the extra event.
675+
return Ok(true);
676+
}
677+
eprintln!("non_deterministic_wait_for_event_and_dispatch({node_id:?}): {event}");
678+
return self
679+
.wait_for_event_and_dispatch(node_id, &event.to_string())
680+
.await;
681+
} else {
682+
event
683+
}
684+
}
685+
NonDeterministicEvent::P2pLibp2pIdentify(peer_id) => {
686+
let addr = match node_addr_by_peer_id(self, peer_id)? {
687+
P2pConnectionOutgoingInitOpts::LibP2P(v) => (&v).into(),
688+
_ => unreachable!(),
689+
};
690+
P2pEvent::Libp2pIdentify(peer_id, addr).into()
691+
}
692+
NonDeterministicEvent::P2pDiscoveryReady => {
693+
P2pEvent::Discovery(P2pDiscoveryEvent::Ready).into()
694+
}
695+
NonDeterministicEvent::P2pDiscoveryDidFindPeers(ids) => {
696+
P2pEvent::Discovery(P2pDiscoveryEvent::DidFindPeers(ids)).into()
697+
}
698+
NonDeterministicEvent::P2pDiscoveryDidFindPeersError(err) => {
699+
P2pEvent::Discovery(P2pDiscoveryEvent::DidFindPeersError(err)).into()
700+
}
701+
NonDeterministicEvent::P2pDiscoveryAddRoute(id, ids) => {
702+
let addrs = ids
703+
.into_iter()
704+
.map(|id| node_addr_by_peer_id(&self, id))
705+
.collect::<Result<Vec<_>, _>>()?;
706+
P2pEvent::Discovery(P2pDiscoveryEvent::AddRoute(id, addrs)).into()
707+
}
708+
NonDeterministicEvent::RpcReadonly(id, req) => Event::Rpc(id, req).into(),
709+
};
710+
eprintln!("non_deterministic_event_dispatch({node_id:?}): {event}");
711+
self.nodes
712+
.get_mut(node_id.index())
713+
.ok_or_else(|| anyhow::anyhow!("node {node_id:?} not found"))?
714+
.dispatch_event(event)
567715
}
568716
ScenarioStep::AddNode { config } => match config {
569717
NodeTestingConfig::Rust(config) => {
570718
self.add_rust_node(config);
719+
// TODO(binier): wait for node ports to be opened instead.
720+
tokio::time::sleep(Duration::from_secs(2)).await;
571721
true
572722
}
573723
NodeTestingConfig::Ocaml(config) => {

node/testing/src/main.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,25 @@ impl Command {
117117
let config = ClusterConfig::new(None).map_err(|err| {
118118
anyhow::anyhow!("failed to create cluster configuration: {err}")
119119
})?;
120+
let config = config.set_replay();
120121

121122
let id = cmd.name.parse()?;
122123
let fut = async move {
123124
let mut cluster = Cluster::new(config);
124125
cluster.start(Scenario::load(&id).await?).await?;
125126
cluster.exec_to_end().await?;
127+
for (node_id, node) in cluster.nodes_iter() {
128+
let Some(best_tip) = node.state().transition_frontier.best_tip() else {
129+
continue;
130+
};
131+
132+
eprintln!(
133+
"[node_status] node_{node_id} {} - {} [{}]",
134+
best_tip.height(),
135+
best_tip.hash(),
136+
best_tip.producer()
137+
);
138+
}
126139
Ok(())
127140
};
128141
rt.block_on(async {

node/testing/src/node/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ mod config;
22
pub use config::*;
33

44
mod rust;
5-
pub use rust::Node;
5+
pub use rust::{Node, NonDeterministicEvent};
66

77
mod ocaml;
88
pub use ocaml::{OcamlNode, OcamlStep};

0 commit comments

Comments
 (0)