diff --git a/node/common/src/service/block_producer/vrf_evaluator.rs b/node/common/src/service/block_producer/vrf_evaluator.rs index 837a582a30..5f9768e538 100644 --- a/node/common/src/service/block_producer/vrf_evaluator.rs +++ b/node/common/src/service/block_producer/vrf_evaluator.rs @@ -51,7 +51,9 @@ pub fn vrf_evaluator( } } -impl node::block_producer::vrf_evaluator::BlockProducerVrfEvaluatorService for NodeService { +impl node::block_producer_effectful::vrf_evaluator_effectful::BlockProducerVrfEvaluatorService + for NodeService +{ fn evaluate(&mut self, data: VrfEvaluatorInput) { if let Some(bp) = self.block_producer.as_mut() { let _ = bp.vrf_evaluation_sender.send(data); diff --git a/node/src/action.rs b/node/src/action.rs index 867890ea0b..88ac985bf8 100644 --- a/node/src/action.rs +++ b/node/src/action.rs @@ -5,6 +5,7 @@ pub type ActionWithMeta = redux::ActionWithMeta; pub type ActionWithMetaRef<'a> = redux::ActionWithMeta<&'a Action>; pub use crate::block_producer::BlockProducerAction; +pub use crate::block_producer_effectful::BlockProducerEffectfulAction; pub use crate::consensus::ConsensusAction; pub use crate::event_source::EventSourceAction; pub use crate::external_snark_worker::ExternalSnarkWorkerAction; @@ -48,6 +49,7 @@ pub enum Action { TransactionPoolEffect(TransactionPoolEffectfulAction), ExternalSnarkWorker(ExternalSnarkWorkerAction), BlockProducer(BlockProducerAction), + BlockProducerEffectful(BlockProducerEffectfulAction), Rpc(RpcAction), RpcEffectful(RpcEffectfulAction), @@ -92,6 +94,7 @@ impl redux::EnablingCondition for Action { Action::SnarkPoolEffect(a) => a.is_enabled(state, time), Action::ExternalSnarkWorker(a) => a.is_enabled(state, time), Action::BlockProducer(a) => a.is_enabled(state, time), + Action::BlockProducerEffectful(a) => a.is_enabled(state, time), Action::Rpc(a) => a.is_enabled(state, time), Action::WatchedAccounts(a) => a.is_enabled(state, time), Action::TransactionPool(a) => a.is_enabled(state, time), diff --git a/node/src/action_kind.rs b/node/src/action_kind.rs index 6009bf5971..ca8823d8c8 100644 --- a/node/src/action_kind.rs +++ b/node/src/action_kind.rs @@ -17,6 +17,8 @@ use strum_macros::VariantArray; use crate::block_producer::vrf_evaluator::BlockProducerVrfEvaluatorAction; use crate::block_producer::BlockProducerAction; +use crate::block_producer_effectful::vrf_evaluator_effectful::BlockProducerVrfEvaluatorEffectfulAction; +use crate::block_producer_effectful::BlockProducerEffectfulAction; use crate::consensus::ConsensusAction; use crate::event_source::EventSourceAction; use crate::external_snark_worker::ExternalSnarkWorkerAction; @@ -129,6 +131,13 @@ pub enum ActionKind { BlockProducerWonSlotTransactionsGet, BlockProducerWonSlotTransactionsSuccess, BlockProducerWonSlotWait, + BlockProducerEffectfulBlockProveInit, + BlockProducerEffectfulBlockProveSuccess, + BlockProducerEffectfulBlockUnprovenBuild, + BlockProducerEffectfulStagedLedgerDiffCreateInit, + BlockProducerEffectfulStagedLedgerDiffCreateSuccess, + BlockProducerEffectfulWonSlot, + BlockProducerEffectfulWonSlotDiscard, BlockProducerVrfEvaluatorBeginDelegatorTableConstruction, BlockProducerVrfEvaluatorBeginEpochEvaluation, BlockProducerVrfEvaluatorCheckEpochBounds, @@ -145,6 +154,7 @@ pub enum ActionKind { BlockProducerVrfEvaluatorProcessSlotEvaluationSuccess, BlockProducerVrfEvaluatorSelectInitialSlot, BlockProducerVrfEvaluatorWaitForNextEvaluation, + BlockProducerVrfEvaluatorEffectfulEvaluateSlot, CheckTimeouts, ConsensusBestTipUpdate, ConsensusBlockChainProofUpdate, @@ -694,7 +704,7 @@ pub enum ActionKind { } impl ActionKind { - pub const COUNT: u16 = 581; + pub const COUNT: u16 = 589; } impl std::fmt::Display for ActionKind { @@ -721,6 +731,7 @@ impl ActionKindGet for Action { Self::TransactionPoolEffect(a) => a.kind(), Self::ExternalSnarkWorker(a) => a.kind(), Self::BlockProducer(a) => a.kind(), + Self::BlockProducerEffectful(a) => a.kind(), Self::Rpc(a) => a.kind(), Self::RpcEffectful(a) => a.kind(), Self::WatchedAccounts(a) => a.kind(), @@ -969,6 +980,25 @@ impl ActionKindGet for BlockProducerAction { } } +impl ActionKindGet for BlockProducerEffectfulAction { + fn kind(&self) -> ActionKind { + match self { + Self::VrfEvaluator(a) => a.kind(), + Self::WonSlot { .. } => ActionKind::BlockProducerEffectfulWonSlot, + Self::WonSlotDiscard { .. } => ActionKind::BlockProducerEffectfulWonSlotDiscard, + Self::StagedLedgerDiffCreateInit => { + ActionKind::BlockProducerEffectfulStagedLedgerDiffCreateInit + } + Self::StagedLedgerDiffCreateSuccess => { + ActionKind::BlockProducerEffectfulStagedLedgerDiffCreateSuccess + } + Self::BlockUnprovenBuild => ActionKind::BlockProducerEffectfulBlockUnprovenBuild, + Self::BlockProveInit => ActionKind::BlockProducerEffectfulBlockProveInit, + Self::BlockProveSuccess => ActionKind::BlockProducerEffectfulBlockProveSuccess, + } + } +} + impl ActionKindGet for RpcAction { fn kind(&self) -> ActionKind { match self { @@ -1482,6 +1512,14 @@ impl ActionKindGet for BlockProducerVrfEvaluatorAction { } } +impl ActionKindGet for BlockProducerVrfEvaluatorEffectfulAction { + fn kind(&self) -> ActionKind { + match self { + Self::EvaluateSlot { .. } => ActionKind::BlockProducerVrfEvaluatorEffectfulEvaluateSlot, + } + } +} + impl ActionKindGet for P2pConnectionOutgoingAction { fn kind(&self) -> ActionKind { match self { diff --git a/node/src/block_producer/block_producer_actions.rs b/node/src/block_producer/block_producer_actions.rs index b4baca2dfc..e234f33495 100644 --- a/node/src/block_producer/block_producer_actions.rs +++ b/node/src/block_producer/block_producer_actions.rs @@ -4,11 +4,10 @@ use openmina_core::block::ArcBlockWithHash; use openmina_core::ActionEvent; use serde::{Deserialize, Serialize}; +use crate::block_producer_effectful::StagedLedgerDiffCreateOutput; + use super::vrf_evaluator::BlockProducerVrfEvaluatorAction; -use super::{ - BlockProducerCurrentState, BlockProducerWonSlot, BlockProducerWonSlotDiscardReason, - StagedLedgerDiffCreateOutput, -}; +use super::{BlockProducerCurrentState, BlockProducerWonSlot, BlockProducerWonSlotDiscardReason}; pub type BlockProducerActionWithMeta = redux::ActionWithMeta; pub type BlockProducerActionWithMetaRef<'a> = redux::ActionWithMeta<&'a BlockProducerAction>; diff --git a/node/src/block_producer/block_producer_reducer.rs b/node/src/block_producer/block_producer_reducer.rs index d51596312f..fe178cd5fb 100644 --- a/node/src/block_producer/block_producer_reducer.rs +++ b/node/src/block_producer/block_producer_reducer.rs @@ -12,68 +12,103 @@ use mina_p2p_messages::{ StagedLedgerDiffBodyStableV1, StateBodyHash, StateHash, UnsignedExtendedUInt32StableV1, }, }; -use openmina_core::constants::constraint_constants; +use openmina_core::{block::ArcBlockWithHash, constants::constraint_constants}; use openmina_core::{ - block::AppliedBlock, + bug_condition, consensus::{ global_sub_window, grace_period_end, in_same_checkpoint_window, in_seed_update_range, relative_sub_window, }, }; +use redux::{callback, Dispatcher, Timestamp}; + +use crate::{ + transition_frontier::sync::TransitionFrontierSyncAction, Action, BlockProducerEffectfulAction, + State, Substate, TransactionPoolAction, +}; use super::{ - calc_epoch_seed, to_epoch_and_slot, BlockProducerAction, BlockProducerActionWithMetaRef, - BlockProducerCurrentState, BlockProducerEnabled, BlockProducerState, BlockWithoutProof, + calc_epoch_seed, next_epoch_first_slot, to_epoch_and_slot, + vrf_evaluator::{ + BlockProducerVrfEvaluatorAction, BlockProducerVrfEvaluatorState, InterruptReason, + }, + BlockProducerAction, BlockProducerActionWithMetaRef, BlockProducerCurrentState, + BlockProducerEnabled, BlockProducerState, BlockWithoutProof, }; impl BlockProducerState { - pub fn reducer( - &mut self, - action: BlockProducerActionWithMetaRef<'_>, - best_chain: &[AppliedBlock], - ) { - self.with_mut((), move |state| state.reducer(action, best_chain)) + pub fn reducer(state_context: Substate, action: BlockProducerActionWithMetaRef<'_>) { + BlockProducerEnabled::reducer(state_context, action); } } impl BlockProducerEnabled { - pub fn reducer( - &mut self, - action: BlockProducerActionWithMetaRef<'_>, - best_chain: &[AppliedBlock], - ) { + /// Substate is accesses from global state, because applied blocks from transition frontier are required + pub fn reducer(mut state_context: Substate, action: BlockProducerActionWithMetaRef<'_>) { let (action, meta) = action.split(); + let Ok(global_state) = state_context.get_substate_mut() else { + return; + }; + + let best_chain = &global_state.transition_frontier.best_chain; + let Some(state) = global_state.block_producer.as_mut() else { + return; + }; + match action { BlockProducerAction::VrfEvaluator(action) => { - self.vrf_evaluator.reducer(meta.with_action(action)) + BlockProducerVrfEvaluatorState::reducer( + Substate::from_compatible_substate(state_context), + meta.with_action(action), + ); } BlockProducerAction::BestTipUpdate { best_tip } => { - self.injected_blocks.remove(best_tip.hash()); + state.injected_blocks.remove(best_tip.hash()); // set the genesis timestamp on the first best tip update // TODO: move/remove once we can generate the genesis block - if self.vrf_evaluator.genesis_timestamp == redux::Timestamp::ZERO { - self.vrf_evaluator.genesis_timestamp = best_tip.genesis_timestamp(); + if state.vrf_evaluator.genesis_timestamp == redux::Timestamp::ZERO { + state.vrf_evaluator.genesis_timestamp = best_tip.genesis_timestamp(); + } + + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + Self::dispatch_best_tip_update(dispatcher, state, best_tip); + } + BlockProducerAction::WonSlotSearch => { + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + if let Some(won_slot) = state.block_producer.with(None, |bp| { + let best_tip = state.transition_frontier.best_tip()?; + let cur_global_slot = state.cur_global_slot()?; + bp.vrf_evaluator.next_won_slot(cur_global_slot, best_tip) + }) { + dispatcher.push(BlockProducerAction::WonSlot { won_slot }); } } - BlockProducerAction::WonSlotSearch => {} BlockProducerAction::WonSlot { won_slot } => { - self.current = BlockProducerCurrentState::WonSlot { + state.current = BlockProducerCurrentState::WonSlot { time: meta.time(), won_slot: won_slot.clone(), }; + + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(BlockProducerEffectfulAction::WonSlot { + won_slot: won_slot.clone(), + }); } BlockProducerAction::WonSlotDiscard { reason } => { - if let Some(won_slot) = self.current.won_slot() { - self.current = BlockProducerCurrentState::WonSlotDiscarded { + if let Some(won_slot) = state.current.won_slot() { + state.current = BlockProducerCurrentState::WonSlotDiscarded { time: meta.time(), won_slot: won_slot.clone(), - reason: reason.clone(), + reason: *reason, }; } + + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(BlockProducerEffectfulAction::WonSlotDiscard { reason: *reason }); } BlockProducerAction::WonSlotWait => { - if let Some(won_slot) = self.current.won_slot() { - self.current = BlockProducerCurrentState::WonSlotWait { + if let Some(won_slot) = state.current.won_slot() { + state.current = BlockProducerCurrentState::WonSlotWait { time: meta.time(), won_slot: won_slot.clone(), }; @@ -82,37 +117,43 @@ impl BlockProducerEnabled { BlockProducerAction::WonSlotTransactionsGet => { let BlockProducerCurrentState::WonSlotProduceInit { won_slot, chain, .. - } = &mut self.current + } = &mut state.current else { return; }; - self.current = BlockProducerCurrentState::WonSlotTransactionsGet { + state.current = BlockProducerCurrentState::WonSlotTransactionsGet { time: meta.time(), won_slot: won_slot.clone(), chain: chain.clone(), - } + }; + + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(TransactionPoolAction::CollectTransactionsByFee); } BlockProducerAction::WonSlotTransactionsSuccess { transactions_by_fee, } => { let BlockProducerCurrentState::WonSlotTransactionsGet { won_slot, chain, .. - } = &mut self.current + } = &mut state.current else { return; }; - self.current = BlockProducerCurrentState::WonSlotTransactionsSuccess { + state.current = BlockProducerCurrentState::WonSlotTransactionsSuccess { time: meta.time(), won_slot: won_slot.clone(), chain: chain.clone(), transactions_by_fee: transactions_by_fee.clone(), - } + }; + + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(BlockProducerAction::StagedLedgerDiffCreateInit); } BlockProducerAction::WonSlotProduceInit => { - if let Some(won_slot) = self.current.won_slot() { - let Some(chain) = best_chain.last().map(|best_tip| { + if let Some(won_slot) = state.current.won_slot() { + if let Some(chain) = best_chain.last().map(|best_tip| { if best_tip.global_slot() == won_slot.global_slot() { // We are producing block which replaces current best tip // instead of extending it. @@ -120,29 +161,33 @@ impl BlockProducerEnabled { } else { best_chain.to_vec() } - }) else { - return; - }; - - self.current = BlockProducerCurrentState::WonSlotProduceInit { - time: meta.time(), - won_slot: won_slot.clone(), - chain, + }) { + state.current = BlockProducerCurrentState::WonSlotProduceInit { + time: meta.time(), + won_slot: won_slot.clone(), + chain, + }; }; } + + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(BlockProducerAction::WonSlotTransactionsGet); + } + BlockProducerAction::StagedLedgerDiffCreateInit => { + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(BlockProducerEffectfulAction::StagedLedgerDiffCreateInit); } - BlockProducerAction::StagedLedgerDiffCreateInit => {} BlockProducerAction::StagedLedgerDiffCreatePending => { let BlockProducerCurrentState::WonSlotTransactionsSuccess { won_slot, chain, transactions_by_fee, .. - } = &mut self.current + } = &mut state.current else { return; }; - self.current = BlockProducerCurrentState::StagedLedgerDiffCreatePending { + state.current = BlockProducerCurrentState::StagedLedgerDiffCreatePending { time: meta.time(), won_slot: won_slot.clone(), chain: std::mem::take(chain), @@ -154,11 +199,11 @@ impl BlockProducerEnabled { won_slot, chain, .. - } = &mut self.current + } = &mut state.current else { return; }; - self.current = BlockProducerCurrentState::StagedLedgerDiffCreateSuccess { + state.current = BlockProducerCurrentState::StagedLedgerDiffCreateSuccess { time: meta.time(), won_slot: won_slot.clone(), chain: std::mem::take(chain), @@ -170,302 +215,20 @@ impl BlockProducerEnabled { pending_coinbase_witness: output.pending_coinbase_witness.clone(), stake_proof_sparse_ledger: output.stake_proof_sparse_ledger.clone(), }; + + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(BlockProducerEffectfulAction::StagedLedgerDiffCreateSuccess); } BlockProducerAction::BlockUnprovenBuild => { - let BlockProducerCurrentState::StagedLedgerDiffCreateSuccess { - won_slot, - chain, - diff, - diff_hash, - staged_ledger_hash, - emitted_ledger_proof, - pending_coinbase_update, - pending_coinbase_witness, - stake_proof_sparse_ledger, - .. - } = std::mem::take(&mut self.current) - else { - return; - }; - let Some(pred_block) = chain.last() else { - return; - }; - - let pred_consensus_state = &pred_block.header().protocol_state.body.consensus_state; - let pred_blockchain_state = - &pred_block.header().protocol_state.body.blockchain_state; - - let genesis_ledger_hash = &pred_blockchain_state.genesis_ledger_hash; - - let block_timestamp = won_slot.timestamp(); - let pred_global_slot = pred_consensus_state - .curr_global_slot_since_hard_fork - .clone(); - let curr_global_slot_since_hard_fork = won_slot.global_slot.clone(); - let global_slot_since_genesis = - won_slot.global_slot_since_genesis(pred_block.global_slot_diff()); - let (pred_epoch, _) = to_epoch_and_slot(&pred_global_slot); - let (next_epoch, next_slot) = to_epoch_and_slot(&curr_global_slot_since_hard_fork); - let has_ancestor_in_same_checkpoint_window = - in_same_checkpoint_window(&pred_global_slot, &curr_global_slot_since_hard_fork); - - let block_stake_winner = won_slot.delegator.0.clone(); - let vrf_truncated_output: ConsensusVrfOutputTruncatedStableV1 = - (*won_slot.vrf_output).clone().into(); - let vrf_hash = won_slot.vrf_output.hash(); - let block_creator = self.config.pub_key.clone(); - let coinbase_receiver = self.config.coinbase_receiver().clone(); - let proposed_protocol_version_opt = self.config.proposed_protocol_version.clone(); - - let ledger_proof_statement = ledger_proof_statement_from_emitted_proof( - emitted_ledger_proof.as_deref(), - &pred_blockchain_state.ledger_proof_statement, - ); - - let supply_increase = emitted_ledger_proof.as_ref().map_or(Signed::zero(), |v| { - Signed::from(&v.statement.supply_increase) - }); - - let total_currency = { - let (amount, overflowed) = - Amount::from(pred_consensus_state.total_currency.clone()) - .add_signed_flagged(supply_increase); - if overflowed { - todo!("total_currency overflowed"); - } - amount - }; - - let (staking_epoch_data, next_epoch_data, epoch_count) = { - let next_staking_ledger = - if pred_block.snarked_ledger_hash() == genesis_ledger_hash { - pred_consensus_state.next_epoch_data.ledger.clone() - } else { - MinaBaseEpochLedgerValueStableV1 { - hash: pred_block.snarked_ledger_hash().clone(), - total_currency: (&total_currency).into(), - } - }; - let (staking_data, next_data, epoch_count) = if next_epoch > pred_epoch { - let staking_data = - next_to_staking_epoch_data(&pred_consensus_state.next_epoch_data); - let next_data = - ConsensusProofOfStakeDataEpochDataNextValueVersionedValueStableV1 { - seed: pred_consensus_state.next_epoch_data.seed.clone(), - ledger: next_staking_ledger, - start_checkpoint: pred_block.hash().clone(), - // comment from Mina repo: (* TODO: We need to make sure issue #2328 is properly addressed. *) - lock_checkpoint: StateHash::zero(), - epoch_length: UnsignedExtendedUInt32StableV1(1.into()), - }; - let epoch_count = UnsignedExtendedUInt32StableV1( - (pred_consensus_state.epoch_count.as_u32() + 1).into(), - ); - (staking_data, next_data, epoch_count) - } else { - assert_eq!(pred_epoch, next_epoch); - let mut next_data = pred_consensus_state.next_epoch_data.clone(); - next_data.epoch_length = UnsignedExtendedUInt32StableV1( - (next_data.epoch_length.as_u32() + 1).into(), - ); - ( - pred_consensus_state.staking_epoch_data.clone(), - next_data, - pred_consensus_state.epoch_count, - ) - }; - - let next_data = if in_seed_update_range(next_slot, pred_block.constants()) { - ConsensusProofOfStakeDataEpochDataNextValueVersionedValueStableV1 { - seed: calc_epoch_seed(&next_data.seed, vrf_hash), - lock_checkpoint: pred_block.hash().clone(), - ..next_data - } - } else { - next_data - }; - - (staking_data, next_data, epoch_count) - }; - - let (min_window_density, sub_window_densities) = { - // TODO(binier): when should this be false? - let incr_window = true; - let pred_sub_window_densities = &pred_consensus_state.sub_window_densities; - - let pred_global_sub_window = - global_sub_window(&pred_global_slot, pred_block.constants()); - let next_global_sub_window = global_sub_window( - &curr_global_slot_since_hard_fork, - pred_block.constants(), - ); - - let pred_relative_sub_window = relative_sub_window(pred_global_sub_window); - let next_relative_sub_window = relative_sub_window(next_global_sub_window); - - let is_same_global_sub_window = - pred_global_sub_window == next_global_sub_window; - let are_windows_overlapping = pred_global_sub_window - + constraint_constants().sub_windows_per_window as u32 - >= next_global_sub_window; - - let current_sub_window_densities = pred_sub_window_densities - .iter() - .enumerate() - .map(|(i, density)| (i as u32, density.as_u32())) - .map(|(i, density)| { - let gt_pred_sub_window = i > pred_relative_sub_window; - let lt_next_sub_window = i < next_relative_sub_window; - let within_range = - if pred_relative_sub_window < next_relative_sub_window { - gt_pred_sub_window && lt_next_sub_window - } else { - gt_pred_sub_window || lt_next_sub_window - }; - if is_same_global_sub_window || are_windows_overlapping && !within_range - { - density - } else { - 0 - } - }) - .collect::>(); - - let grace_period_end = grace_period_end(pred_block.constants()); - let min_window_density = if is_same_global_sub_window - || curr_global_slot_since_hard_fork.slot_number.as_u32() < grace_period_end - { - pred_consensus_state.min_window_density - } else { - let cur_density = current_sub_window_densities.iter().sum(); - let min_density = pred_consensus_state - .min_window_density - .as_u32() - .min(cur_density); - UnsignedExtendedUInt32StableV1(min_density.into()) - }; - - let next_sub_window_densities = current_sub_window_densities - .into_iter() - .enumerate() - .map(|(i, density)| (i as u32, density)) - .map(|(i, density)| { - let is_next_sub_window = i == next_relative_sub_window; - if is_next_sub_window { - let density = if is_same_global_sub_window { - density - } else { - 0 - }; - if incr_window { - density + 1 - } else { - density - } - } else { - density - } - }) - .map(|v| UnsignedExtendedUInt32StableV1(v.into())) - .collect(); - - (min_window_density, next_sub_window_densities) - }; - - let consensus_state = ConsensusProofOfStakeDataConsensusStateValueStableV2 { - blockchain_length: UnsignedExtendedUInt32StableV1( - (pred_block.height() + 1).into(), - ), - epoch_count, - min_window_density, - sub_window_densities, - last_vrf_output: vrf_truncated_output, - total_currency: (&total_currency).into(), - curr_global_slot_since_hard_fork, - global_slot_since_genesis, - staking_epoch_data, - next_epoch_data, - has_ancestor_in_same_checkpoint_window, - block_stake_winner, - block_creator, - coinbase_receiver, - // TODO(binier): Staged_ledger.can_apply_supercharged_coinbase_exn - supercharge_coinbase: constraint_constants().supercharged_coinbase_factor != 0, - }; - - let protocol_state = MinaStateProtocolStateValueStableV2 { - previous_state_hash: pred_block.hash().clone(), - body: MinaStateProtocolStateBodyValueStableV2 { - genesis_state_hash: if pred_block.is_genesis() { - pred_block.hash().clone() - } else { - pred_block - .header() - .protocol_state - .body - .genesis_state_hash - .clone() - }, - constants: pred_block.header().protocol_state.body.constants.clone(), - blockchain_state: MinaStateBlockchainStateValueStableV2 { - staged_ledger_hash: staged_ledger_hash.clone(), - genesis_ledger_hash: genesis_ledger_hash.clone(), - ledger_proof_statement, - timestamp: block_timestamp, - body_reference: diff_hash.clone(), - }, - consensus_state, - }, - }; - - let chain_proof_len = pred_block.constants().delta.as_u32() as usize; - let delta_block_chain_proof = match chain_proof_len { - 0 => (pred_block.hash().clone(), List::new()), - chain_proof_len => { - // TODO(binier): test - let mut iter = chain.iter().rev().take(chain_proof_len + 1).rev(); - if let Some(first_block) = iter.next() { - let first_hash = first_block.hash().clone(); - let body_hashes = iter - .filter_map(|b| b.header().protocol_state.body.try_hash().ok()) // TODO: Handle error ? - .map(StateBodyHash::from) - .collect(); - (first_hash, body_hashes) - } else { - // TODO: test this as well - // If the chain is empty, return the same as when chain_proof_len is 0 - (pred_block.hash().clone(), List::new()) - } - } - }; - - let block = BlockWithoutProof { - protocol_state, - delta_block_chain_proof, - current_protocol_version: pred_block.header().current_protocol_version.clone(), - proposed_protocol_version_opt, - body: StagedLedgerDiffBodyStableV1 { - staged_ledger_diff: diff.clone(), - }, - }; - let Ok(block_hash) = block.protocol_state.try_hash() else { - openmina_core::log::inner::error!("Invalid protocol state"); - return; - }; + state.reduce_block_unproved_build(meta.time()); - self.current = BlockProducerCurrentState::BlockUnprovenBuilt { - time: meta.time(), - won_slot, - chain, - emitted_ledger_proof, - pending_coinbase_update, - pending_coinbase_witness, - stake_proof_sparse_ledger, - block, - block_hash, - } + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(BlockProducerEffectfulAction::BlockUnprovenBuild); + } + BlockProducerAction::BlockProveInit => { + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(BlockProducerEffectfulAction::BlockProveInit); } - BlockProducerAction::BlockProveInit => {} BlockProducerAction::BlockProvePending => { if let BlockProducerCurrentState::BlockUnprovenBuilt { won_slot, @@ -477,9 +240,9 @@ impl BlockProducerEnabled { block, block_hash, .. - } = std::mem::take(&mut self.current) + } = std::mem::take(&mut state.current) { - self.current = BlockProducerCurrentState::BlockProvePending { + state.current = BlockProducerCurrentState::BlockProvePending { time: meta.time(), won_slot, chain, @@ -499,9 +262,9 @@ impl BlockProducerEnabled { block, block_hash, .. - } = std::mem::take(&mut self.current) + } = std::mem::take(&mut state.current) { - self.current = BlockProducerCurrentState::BlockProveSuccess { + state.current = BlockProducerCurrentState::BlockProveSuccess { time: meta.time(), won_slot, chain, @@ -510,6 +273,9 @@ impl BlockProducerEnabled { proof: proof.clone(), }; } + + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(BlockProducerEffectfulAction::BlockProveSuccess); } BlockProducerAction::BlockProduced => { if let BlockProducerCurrentState::BlockProveSuccess { @@ -519,34 +285,421 @@ impl BlockProducerEnabled { block_hash, proof, .. - } = std::mem::take(&mut self.current) + } = std::mem::take(&mut state.current) { - self.current = BlockProducerCurrentState::Produced { + state.current = BlockProducerCurrentState::Produced { time: meta.time(), won_slot, chain, block: block.with_hash_and_proof(block_hash, *proof), }; } + + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(BlockProducerAction::BlockInject); + } + BlockProducerAction::BlockInject => { + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + + let Some((best_tip, root_block, blocks_inbetween)) = None.or_else(|| { + let (best_tip, chain) = state.block_producer.produced_block_with_chain()?; + let mut iter = chain.iter(); + let root_block = iter.next()?.block_with_hash(); + let blocks_inbetween = iter.map(|b| b.hash().clone()).collect(); + Some((best_tip.clone(), root_block.clone(), blocks_inbetween)) + }) else { + return; + }; + + let previous_root_snarked_ledger_hash = state + .transition_frontier + .root() + .map(|b| b.snarked_ledger_hash().clone()); + + dispatcher.push(TransitionFrontierSyncAction::BestTipUpdate { + previous_root_snarked_ledger_hash, + best_tip: best_tip.clone(), + root_block, + blocks_inbetween, + on_success: Some(callback!( + on_transition_frontier_sync_best_tip_update(_p: ()) -> crate::Action{ + BlockProducerAction::BlockInjected + } + )), + }); } - BlockProducerAction::BlockInject => {} BlockProducerAction::BlockInjected => { if let BlockProducerCurrentState::Produced { won_slot, chain, block, .. - } = &mut self.current + } = &mut state.current { - self.injected_blocks.insert(block.hash().clone()); - self.current = BlockProducerCurrentState::Injected { + state.injected_blocks.insert(block.hash().clone()); + state.current = BlockProducerCurrentState::Injected { time: meta.time(), won_slot: won_slot.clone(), chain: std::mem::take(chain), block: block.clone(), }; } + + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(BlockProducerAction::WonSlotSearch); + } + } + } + + fn reduce_block_unproved_build(&mut self, time: Timestamp) { + let BlockProducerCurrentState::StagedLedgerDiffCreateSuccess { + won_slot, + chain, + diff, + diff_hash, + staged_ledger_hash, + emitted_ledger_proof, + pending_coinbase_update, + pending_coinbase_witness, + stake_proof_sparse_ledger, + .. + } = std::mem::take(&mut self.current) + else { + return; + }; + let Some(pred_block) = chain.last() else { + return; + }; + + let pred_consensus_state = &pred_block.header().protocol_state.body.consensus_state; + let pred_blockchain_state = &pred_block.header().protocol_state.body.blockchain_state; + + let genesis_ledger_hash = &pred_blockchain_state.genesis_ledger_hash; + + let block_timestamp = won_slot.timestamp(); + let pred_global_slot = pred_consensus_state + .curr_global_slot_since_hard_fork + .clone(); + let curr_global_slot_since_hard_fork = won_slot.global_slot.clone(); + let global_slot_since_genesis = + won_slot.global_slot_since_genesis(pred_block.global_slot_diff()); + let (pred_epoch, _) = to_epoch_and_slot(&pred_global_slot); + let (next_epoch, next_slot) = to_epoch_and_slot(&curr_global_slot_since_hard_fork); + let has_ancestor_in_same_checkpoint_window = + in_same_checkpoint_window(&pred_global_slot, &curr_global_slot_since_hard_fork); + + let block_stake_winner = won_slot.delegator.0.clone(); + let vrf_truncated_output: ConsensusVrfOutputTruncatedStableV1 = + (*won_slot.vrf_output).clone().into(); + let vrf_hash = won_slot.vrf_output.hash(); + let block_creator = self.config.pub_key.clone(); + let coinbase_receiver = self.config.coinbase_receiver().clone(); + let proposed_protocol_version_opt = self.config.proposed_protocol_version.clone(); + + let ledger_proof_statement = ledger_proof_statement_from_emitted_proof( + emitted_ledger_proof.as_deref(), + &pred_blockchain_state.ledger_proof_statement, + ); + + let supply_increase = emitted_ledger_proof.as_ref().map_or(Signed::zero(), |v| { + Signed::from(&v.statement.supply_increase) + }); + + let total_currency = { + let (amount, overflowed) = Amount::from(pred_consensus_state.total_currency.clone()) + .add_signed_flagged(supply_increase); + if overflowed { + todo!("total_currency overflowed"); } + amount + }; + + let (staking_epoch_data, next_epoch_data, epoch_count) = { + let next_staking_ledger = if pred_block.snarked_ledger_hash() == genesis_ledger_hash { + pred_consensus_state.next_epoch_data.ledger.clone() + } else { + MinaBaseEpochLedgerValueStableV1 { + hash: pred_block.snarked_ledger_hash().clone(), + total_currency: (&total_currency).into(), + } + }; + let (staking_data, next_data, epoch_count) = if next_epoch > pred_epoch { + let staking_data = + next_to_staking_epoch_data(&pred_consensus_state.next_epoch_data); + let next_data = ConsensusProofOfStakeDataEpochDataNextValueVersionedValueStableV1 { + seed: pred_consensus_state.next_epoch_data.seed.clone(), + ledger: next_staking_ledger, + start_checkpoint: pred_block.hash().clone(), + // comment from Mina repo: (* TODO: We need to make sure issue #2328 is properly addressed. *) + lock_checkpoint: StateHash::zero(), + epoch_length: UnsignedExtendedUInt32StableV1(1.into()), + }; + let epoch_count = UnsignedExtendedUInt32StableV1( + (pred_consensus_state.epoch_count.as_u32() + 1).into(), + ); + (staking_data, next_data, epoch_count) + } else { + assert_eq!(pred_epoch, next_epoch); + let mut next_data = pred_consensus_state.next_epoch_data.clone(); + next_data.epoch_length = + UnsignedExtendedUInt32StableV1((next_data.epoch_length.as_u32() + 1).into()); + ( + pred_consensus_state.staking_epoch_data.clone(), + next_data, + pred_consensus_state.epoch_count, + ) + }; + + let next_data = if in_seed_update_range(next_slot, pred_block.constants()) { + ConsensusProofOfStakeDataEpochDataNextValueVersionedValueStableV1 { + seed: calc_epoch_seed(&next_data.seed, vrf_hash), + lock_checkpoint: pred_block.hash().clone(), + ..next_data + } + } else { + next_data + }; + + (staking_data, next_data, epoch_count) + }; + + let (min_window_density, sub_window_densities) = { + // TODO(binier): when should this be false? + let incr_window = true; + let pred_sub_window_densities = &pred_consensus_state.sub_window_densities; + + let pred_global_sub_window = + global_sub_window(&pred_global_slot, pred_block.constants()); + let next_global_sub_window = + global_sub_window(&curr_global_slot_since_hard_fork, pred_block.constants()); + + let pred_relative_sub_window = relative_sub_window(pred_global_sub_window); + let next_relative_sub_window = relative_sub_window(next_global_sub_window); + + let is_same_global_sub_window = pred_global_sub_window == next_global_sub_window; + let are_windows_overlapping = pred_global_sub_window + + constraint_constants().sub_windows_per_window as u32 + >= next_global_sub_window; + + let current_sub_window_densities = pred_sub_window_densities + .iter() + .enumerate() + .map(|(i, density)| (i as u32, density.as_u32())) + .map(|(i, density)| { + let gt_pred_sub_window = i > pred_relative_sub_window; + let lt_next_sub_window = i < next_relative_sub_window; + let within_range = if pred_relative_sub_window < next_relative_sub_window { + gt_pred_sub_window && lt_next_sub_window + } else { + gt_pred_sub_window || lt_next_sub_window + }; + if is_same_global_sub_window || are_windows_overlapping && !within_range { + density + } else { + 0 + } + }) + .collect::>(); + + let grace_period_end = grace_period_end(pred_block.constants()); + let min_window_density = if is_same_global_sub_window + || curr_global_slot_since_hard_fork.slot_number.as_u32() < grace_period_end + { + pred_consensus_state.min_window_density + } else { + let cur_density = current_sub_window_densities.iter().sum(); + let min_density = pred_consensus_state + .min_window_density + .as_u32() + .min(cur_density); + UnsignedExtendedUInt32StableV1(min_density.into()) + }; + + let next_sub_window_densities = current_sub_window_densities + .into_iter() + .enumerate() + .map(|(i, density)| (i as u32, density)) + .map(|(i, density)| { + let is_next_sub_window = i == next_relative_sub_window; + if is_next_sub_window { + let density = if is_same_global_sub_window { + density + } else { + 0 + }; + if incr_window { + density + 1 + } else { + density + } + } else { + density + } + }) + .map(|v| UnsignedExtendedUInt32StableV1(v.into())) + .collect(); + + (min_window_density, next_sub_window_densities) + }; + + let consensus_state = ConsensusProofOfStakeDataConsensusStateValueStableV2 { + blockchain_length: UnsignedExtendedUInt32StableV1((pred_block.height() + 1).into()), + epoch_count, + min_window_density, + sub_window_densities, + last_vrf_output: vrf_truncated_output, + total_currency: (&total_currency).into(), + curr_global_slot_since_hard_fork, + global_slot_since_genesis, + staking_epoch_data, + next_epoch_data, + has_ancestor_in_same_checkpoint_window, + block_stake_winner, + block_creator, + coinbase_receiver, + // TODO(binier): Staged_ledger.can_apply_supercharged_coinbase_exn + supercharge_coinbase: constraint_constants().supercharged_coinbase_factor != 0, + }; + + let protocol_state = MinaStateProtocolStateValueStableV2 { + previous_state_hash: pred_block.hash().clone(), + body: MinaStateProtocolStateBodyValueStableV2 { + genesis_state_hash: if pred_block.is_genesis() { + pred_block.hash().clone() + } else { + pred_block + .header() + .protocol_state + .body + .genesis_state_hash + .clone() + }, + constants: pred_block.header().protocol_state.body.constants.clone(), + blockchain_state: MinaStateBlockchainStateValueStableV2 { + staged_ledger_hash: staged_ledger_hash.clone(), + genesis_ledger_hash: genesis_ledger_hash.clone(), + ledger_proof_statement, + timestamp: block_timestamp, + body_reference: diff_hash.clone(), + }, + consensus_state, + }, + }; + + let chain_proof_len = pred_block.constants().delta.as_u32() as usize; + let delta_block_chain_proof = match chain_proof_len { + 0 => (pred_block.hash().clone(), List::new()), + chain_proof_len => { + // TODO(binier): test + let mut iter = chain.iter().rev().take(chain_proof_len + 1).rev(); + if let Some(first_block) = iter.next() { + let first_hash = first_block.hash().clone(); + let body_hashes = iter + .filter_map(|b| b.header().protocol_state.body.try_hash().ok()) // TODO: Handle error ? + .map(StateBodyHash::from) + .collect(); + (first_hash, body_hashes) + } else { + // TODO: test this as well + // If the chain is empty, return the same as when chain_proof_len is 0 + (pred_block.hash().clone(), List::new()) + } + } + }; + + let block = BlockWithoutProof { + protocol_state, + delta_block_chain_proof, + current_protocol_version: pred_block.header().current_protocol_version.clone(), + proposed_protocol_version_opt, + body: StagedLedgerDiffBodyStableV1 { + staged_ledger_diff: diff.clone(), + }, + }; + let Ok(block_hash) = block.protocol_state.try_hash() else { + openmina_core::log::inner::error!("Invalid protocol state"); + return; + }; + + self.current = BlockProducerCurrentState::BlockUnprovenBuilt { + time, + won_slot, + chain, + emitted_ledger_proof, + pending_coinbase_update, + pending_coinbase_witness, + stake_proof_sparse_ledger, + block, + block_hash, + }; + } + + fn dispatch_best_tip_update( + dispatcher: &mut Dispatcher, + state: &State, + best_tip: &ArcBlockWithHash, + ) { + let global_slot = best_tip + .consensus_state() + .curr_global_slot_since_hard_fork + .clone(); + + let (best_tip_epoch, best_tip_slot) = to_epoch_and_slot(&global_slot); + let root_block_epoch = if let Some(root_block) = state.transition_frontier.root() { + let root_block_global_slot = root_block.curr_global_slot_since_hard_fork(); + to_epoch_and_slot(root_block_global_slot).0 + } else { + bug_condition!("Expected to find a block at the root of the transition frontier but there was none"); + best_tip_epoch.saturating_sub(1) + }; + let next_epoch_first_slot = next_epoch_first_slot(&global_slot); + let current_epoch = state.current_epoch(); + let current_slot = state.current_slot(); + + dispatcher.push(BlockProducerVrfEvaluatorAction::InitializeEvaluator { + best_tip: best_tip.clone(), + }); + + // None if the evaluator is not evaluating + let currenty_evaluated_epoch = state + .block_producer + .vrf_evaluator() + .and_then(|vrf_evaluator| vrf_evaluator.currently_evaluated_epoch()); + + if let Some(currently_evaluated_epoch) = currenty_evaluated_epoch { + // if we receive a block with higher epoch than the current one, interrupt the evaluation + if currently_evaluated_epoch < best_tip_epoch { + dispatcher.push(BlockProducerVrfEvaluatorAction::InterruptEpochEvaluation { + reason: InterruptReason::BestTipWithHigherEpoch, + }); + } + } + + let is_next_epoch_seed_finalized = if let Some(current_slot) = current_slot { + !in_seed_update_range(current_slot, best_tip.constants()) + } else { + false + }; + + dispatcher.push(BlockProducerVrfEvaluatorAction::CheckEpochEvaluability { + current_epoch, + is_next_epoch_seed_finalized, + root_block_epoch, + best_tip_epoch, + best_tip_slot, + best_tip_global_slot: best_tip.global_slot(), + next_epoch_first_slot, + staking_epoch_data: Box::new(best_tip.consensus_state().staking_epoch_data.clone()), + next_epoch_data: Box::new(best_tip.consensus_state().next_epoch_data.clone()), + }); + + if let Some(reason) = state + .block_producer + .with(None, |bp| bp.current.won_slot_should_discard(best_tip)) + { + dispatcher.push(BlockProducerAction::WonSlotDiscard { reason }); + } else { + dispatcher.push(BlockProducerAction::WonSlotSearch); } } } diff --git a/node/src/block_producer/block_producer_state.rs b/node/src/block_producer/block_producer_state.rs index 015b4aa2e0..69c7405a03 100644 --- a/node/src/block_producer/block_producer_state.rs +++ b/node/src/block_producer/block_producer_state.rs @@ -135,7 +135,7 @@ pub enum BlockProducerCurrentState { }, } -#[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Clone)] +#[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Clone, Copy)] pub enum BlockProducerWonSlotDiscardReason { BestTipStakingLedgerDifferent, BestTipGlobalSlotHigher, @@ -152,20 +152,19 @@ impl BlockProducerState { })) } - #[inline(always)] - pub(super) fn with<'a, F, R: 'a>(&'a self, default: R, fun: F) -> R + pub fn with<'a, F, R: 'a>(&'a self, default: R, fun: F) -> R where F: FnOnce(&'a BlockProducerEnabled) -> R, { self.0.as_ref().map_or(default, fun) } - #[inline(always)] - pub(super) fn with_mut(&mut self, default: R, fun: F) -> R - where - F: FnOnce(&mut BlockProducerEnabled) -> R, - { - self.0.as_mut().map_or(default, fun) + pub fn as_mut(&mut self) -> Option<&mut BlockProducerEnabled> { + self.0.as_mut() + } + + pub fn as_ref(&self) -> Option<&BlockProducerEnabled> { + self.0.as_ref() } pub fn is_enabled(&self) -> bool { diff --git a/node/src/block_producer/mod.rs b/node/src/block_producer/mod.rs index ba0008c70a..e0fd494eb1 100644 --- a/node/src/block_producer/mod.rs +++ b/node/src/block_producer/mod.rs @@ -14,12 +14,6 @@ pub use block_producer_actions::*; mod block_producer_reducer; -mod block_producer_effects; -pub use block_producer_effects::*; - -mod block_producer_service; -pub use block_producer_service::*; - use ledger::AccountIndex; use mina_p2p_messages::{list::List, v2}; use openmina_core::block::ArcBlockWithHash; diff --git a/node/src/block_producer/vrf_evaluator/block_producer_vrf_evaluator_effects.rs b/node/src/block_producer/vrf_evaluator/block_producer_vrf_evaluator_effects.rs deleted file mode 100644 index 0127e3eb71..0000000000 --- a/node/src/block_producer/vrf_evaluator/block_producer_vrf_evaluator_effects.rs +++ /dev/null @@ -1,238 +0,0 @@ -use redux::ActionMeta; - -use crate::block_producer::to_epoch_and_slot; -use crate::ledger::read::LedgerReadAction; -use crate::ledger::read::LedgerReadInitCallback; -use crate::ledger::read::LedgerReadRequest; -use crate::Service; -use crate::Store; - -use super::BlockProducerVrfEvaluatorAction; -use super::BlockProducerVrfEvaluatorStatus; -use super::SlotPositionInEpoch; - -impl BlockProducerVrfEvaluatorAction { - pub fn effects(self, _: &ActionMeta, store: &mut Store) { - match self { - BlockProducerVrfEvaluatorAction::EvaluateSlot { vrf_input } => { - store.service.evaluate(vrf_input); - } - BlockProducerVrfEvaluatorAction::ProcessSlotEvaluationSuccess { - vrf_output, .. - } => { - if let Some(vrf_evaluator_state) = store.state().block_producer.vrf_evaluator() { - if let Some(pending_evaluation) = vrf_evaluator_state.current_evaluation() { - store.dispatch(BlockProducerVrfEvaluatorAction::CheckEpochBounds { - epoch_number: pending_evaluation.epoch_number, - latest_evaluated_global_slot: vrf_output.global_slot(), - }); - } - } - } - BlockProducerVrfEvaluatorAction::CheckEpochBounds { - latest_evaluated_global_slot, - epoch_number, - } => { - if let Some(epoch_bound) = store - .state() - .block_producer - .vrf_evaluator() - .and_then(|s| s.get_epoch_bound_from_check()) - { - match epoch_bound { - SlotPositionInEpoch::Beginning | SlotPositionInEpoch::Within => { - store.dispatch( - BlockProducerVrfEvaluatorAction::ContinueEpochEvaluation { - latest_evaluated_global_slot, - epoch_number, - }, - ); - } - SlotPositionInEpoch::End => { - store.dispatch( - BlockProducerVrfEvaluatorAction::FinishEpochEvaluation { - latest_evaluated_global_slot, - epoch_number, - }, - ); - } - } - } - } - BlockProducerVrfEvaluatorAction::InitializeEvaluator { best_tip } => { - // Note: pure function, but needs access to other parts of the state - if store.state().block_producer.vrf_evaluator().is_some() { - if best_tip.consensus_state().epoch_count.as_u32() == 0 { - store.dispatch( - BlockProducerVrfEvaluatorAction::FinalizeEvaluatorInitialization { - previous_epoch_and_height: None, - }, - ); - } else { - let k = best_tip.constants().k.as_u32(); - let (epoch, slot) = to_epoch_and_slot( - &best_tip.consensus_state().curr_global_slot_since_hard_fork, - ); - let previous_epoch = epoch.saturating_sub(1); - let last_height = if slot < k { - let found = store - .state() - .transition_frontier - .best_chain - .iter() - .rev() - .find(|b| { - b.consensus_state().epoch_count.as_u32() == previous_epoch - }); - - if let Some(block) = found { - block.height() - } else { - Default::default() - } - } else if let Some(root_block) = store.state().transition_frontier.root() { - root_block.height() - } else { - Default::default() - }; - store.dispatch( - BlockProducerVrfEvaluatorAction::FinalizeEvaluatorInitialization { - previous_epoch_and_height: Some((previous_epoch, last_height)), - }, - ); - } - } - } - BlockProducerVrfEvaluatorAction::FinalizeEvaluatorInitialization { .. } => {} - BlockProducerVrfEvaluatorAction::CheckEpochEvaluability { - root_block_epoch: _, - best_tip_global_slot, - best_tip_epoch, - best_tip_slot, - next_epoch_first_slot, - current_epoch: _, - is_next_epoch_seed_finalized: _, - staking_epoch_data: _, - next_epoch_data: _, - } => { - let vrf_evaluator_state = store.state().block_producer.vrf_evaluator_with_config(); - - if let Some((vrf_evaluator_state, config)) = vrf_evaluator_state { - if let Some(epoch_data) = vrf_evaluator_state.epoch_context().get_epoch_data() { - store.dispatch( - BlockProducerVrfEvaluatorAction::InitializeEpochEvaluation { - staking_epoch_data: epoch_data, - producer: config.pub_key.clone().into(), - best_tip_global_slot, - best_tip_epoch, - best_tip_slot, - next_epoch_first_slot, - }, - ); - } else { - // If None is returned, than we are waiting for evaluation - store.dispatch(BlockProducerVrfEvaluatorAction::WaitForNextEvaluation); - } - - store.dispatch(BlockProducerVrfEvaluatorAction::CleanupOldSlots { - best_tip_epoch, - }); - } - } - BlockProducerVrfEvaluatorAction::InitializeEpochEvaluation { .. } => { - store.dispatch(BlockProducerVrfEvaluatorAction::BeginDelegatorTableConstruction); - } - BlockProducerVrfEvaluatorAction::BeginDelegatorTableConstruction => { - let (staking_ledger_hash, producer) = - match store.state().block_producer.vrf_delegator_table_inputs() { - Some((v1, v2)) => (v1.clone(), v2.clone()), - None => return, - }; - if store.dispatch(LedgerReadAction::Init { - request: LedgerReadRequest::DelegatorTable(staking_ledger_hash, producer), - callback: LedgerReadInitCallback::None, - }) { - // TODO(binier): have pending action. - } else { - unreachable!() - } - } - BlockProducerVrfEvaluatorAction::FinalizeDelegatorTableConstruction { .. } => { - let Some(( - current_global_slot, - BlockProducerVrfEvaluatorStatus::EpochDelegatorTableSuccess { - best_tip_epoch, - best_tip_slot, - best_tip_global_slot, - next_epoch_first_slot, - staking_epoch_data, - .. - }, - )) = None.or_else(|| { - let cur_global_slot = store.state().cur_global_slot()?; - let status = &store.state().block_producer.vrf_evaluator()?.status; - - Some((cur_global_slot, status)) - }) - else { - // error here! - return; - }; - - store.dispatch(BlockProducerVrfEvaluatorAction::SelectInitialSlot { - current_global_slot, - best_tip_epoch: *best_tip_epoch, - best_tip_slot: *best_tip_slot, - best_tip_global_slot: *best_tip_global_slot, - next_epoch_first_slot: *next_epoch_first_slot, - staking_epoch_data: staking_epoch_data.clone(), - }); - } - BlockProducerVrfEvaluatorAction::BeginEpochEvaluation { - latest_evaluated_global_slot, - best_tip_epoch: current_epoch_number, - .. - } => { - if store.state().block_producer.vrf_evaluator().is_some() { - store.dispatch(BlockProducerVrfEvaluatorAction::ContinueEpochEvaluation { - latest_evaluated_global_slot, - epoch_number: current_epoch_number, - }); - } - } - BlockProducerVrfEvaluatorAction::ContinueEpochEvaluation { .. } => { - if let Some(vrf_evaluator_state) = store.state().block_producer.vrf_evaluator() { - if let Some(vrf_input) = vrf_evaluator_state.construct_vrf_input() { - store.dispatch(BlockProducerVrfEvaluatorAction::EvaluateSlot { vrf_input }); - } - } - } - BlockProducerVrfEvaluatorAction::FinishEpochEvaluation { .. } => {} - BlockProducerVrfEvaluatorAction::WaitForNextEvaluation { .. } => {} - BlockProducerVrfEvaluatorAction::SelectInitialSlot { - best_tip_epoch: current_epoch_number, - best_tip_global_slot: current_best_tip_global_slot, - best_tip_slot: current_best_tip_slot, - staking_epoch_data, - .. - } => { - if let Some(initial_slot) = store - .state() - .block_producer - .vrf_evaluator() - .and_then(|v| v.initial_slot()) - { - store.dispatch(BlockProducerVrfEvaluatorAction::BeginEpochEvaluation { - best_tip_epoch: current_epoch_number, - best_tip_global_slot: current_best_tip_global_slot, - best_tip_slot: current_best_tip_slot, - staking_epoch_data, - latest_evaluated_global_slot: initial_slot, - }); - } - } - BlockProducerVrfEvaluatorAction::CleanupOldSlots { .. } => {} - BlockProducerVrfEvaluatorAction::InterruptEpochEvaluation { .. } => {} - } - } -} diff --git a/node/src/block_producer/vrf_evaluator/block_producer_vrf_evaluator_reducer.rs b/node/src/block_producer/vrf_evaluator/block_producer_vrf_evaluator_reducer.rs index 37b53fb660..93e789260a 100644 --- a/node/src/block_producer/vrf_evaluator/block_producer_vrf_evaluator_reducer.rs +++ b/node/src/block_producer/vrf_evaluator/block_producer_vrf_evaluator_reducer.rs @@ -1,18 +1,40 @@ +use openmina_core::bug_condition; +use vrf::VrfEvaluationOutput; + +use crate::{ + block_producer::to_epoch_and_slot, + block_producer_effectful::vrf_evaluator_effectful::BlockProducerVrfEvaluatorEffectfulAction, + ledger::read::{LedgerReadAction, LedgerReadInitCallback, LedgerReadRequest}, + BlockProducerAction, Substate, +}; + use super::{ BlockProducerVrfEvaluatorAction, BlockProducerVrfEvaluatorActionWithMetaRef, BlockProducerVrfEvaluatorState, BlockProducerVrfEvaluatorStatus, PendingEvaluation, - VrfWonSlotWithHash, + SlotPositionInEpoch, VrfWonSlotWithHash, }; impl BlockProducerVrfEvaluatorState { - pub fn reducer(&mut self, action: BlockProducerVrfEvaluatorActionWithMetaRef<'_>) { + pub fn reducer( + mut state_context: Substate, + action: BlockProducerVrfEvaluatorActionWithMetaRef<'_>, + ) { + let Ok(state) = state_context.get_substate_mut() else { + return; + }; + let (action, meta) = action.split(); match action { BlockProducerVrfEvaluatorAction::EvaluateSlot { vrf_input } => { - self.status = BlockProducerVrfEvaluatorStatus::SlotEvaluationPending { + state.status = BlockProducerVrfEvaluatorStatus::SlotEvaluationPending { time: meta.time(), global_slot: vrf_input.global_slot, }; + + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(BlockProducerVrfEvaluatorEffectfulAction::EvaluateSlot { + vrf_input: vrf_input.clone(), + }); } BlockProducerVrfEvaluatorAction::ProcessSlotEvaluationSuccess { vrf_output, @@ -20,7 +42,7 @@ impl BlockProducerVrfEvaluatorState { } => { let global_slot_evaluated = match &vrf_output { vrf::VrfEvaluationOutput::SlotWon(won_slot_data) => { - self.won_slots.insert( + state.won_slots.insert( won_slot_data.global_slot, VrfWonSlotWithHash::new( won_slot_data.clone(), @@ -31,36 +53,122 @@ impl BlockProducerVrfEvaluatorState { } vrf::VrfEvaluationOutput::SlotLost(global_slot) => *global_slot, }; - self.set_latest_evaluated_global_slot(&global_slot_evaluated); + state.set_latest_evaluated_global_slot(&global_slot_evaluated); - self.status = BlockProducerVrfEvaluatorStatus::SlotEvaluationReceived { + state.status = BlockProducerVrfEvaluatorStatus::SlotEvaluationReceived { time: meta.time(), global_slot: global_slot_evaluated, + }; + + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + + if let Some(vrf_evaluator_state) = state.block_producer.vrf_evaluator() { + if let Some(pending_evaluation) = vrf_evaluator_state.current_evaluation() { + dispatcher.push(BlockProducerVrfEvaluatorAction::CheckEpochBounds { + epoch_number: pending_evaluation.epoch_number, + latest_evaluated_global_slot: vrf_output.global_slot(), + }); + } + } + + if matches!(vrf_output, VrfEvaluationOutput::SlotWon(_)) { + dispatcher.push(BlockProducerAction::WonSlotSearch); } } BlockProducerVrfEvaluatorAction::CheckEpochBounds { epoch_number, latest_evaluated_global_slot, } => { - let epoch_current_bound = Self::evaluate_epoch_bounds(latest_evaluated_global_slot); - self.status = BlockProducerVrfEvaluatorStatus::EpochBoundsCheck { + let latest_evaluated_global_slot = *latest_evaluated_global_slot; + let epoch_number = *epoch_number; + + let epoch_current_bound = + Self::evaluate_epoch_bounds(&latest_evaluated_global_slot); + state.status = BlockProducerVrfEvaluatorStatus::EpochBoundsCheck { time: meta.time(), - epoch_number: *epoch_number, - latest_evaluated_global_slot: *latest_evaluated_global_slot, + epoch_number, + latest_evaluated_global_slot, epoch_current_bound, }; + + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + + if let Some(epoch_bound) = state + .block_producer + .vrf_evaluator() + .and_then(|s| s.get_epoch_bound_from_check()) + { + match epoch_bound { + SlotPositionInEpoch::Beginning | SlotPositionInEpoch::Within => { + dispatcher.push( + BlockProducerVrfEvaluatorAction::ContinueEpochEvaluation { + latest_evaluated_global_slot, + epoch_number, + }, + ); + } + SlotPositionInEpoch::End => { + dispatcher.push( + BlockProducerVrfEvaluatorAction::FinishEpochEvaluation { + latest_evaluated_global_slot, + epoch_number, + }, + ); + } + } + } } - BlockProducerVrfEvaluatorAction::InitializeEvaluator { .. } => { - self.status = - BlockProducerVrfEvaluatorStatus::InitialisationPending { time: meta.time() } + BlockProducerVrfEvaluatorAction::InitializeEvaluator { best_tip } => { + state.status = + BlockProducerVrfEvaluatorStatus::InitialisationPending { time: meta.time() }; + + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + + // Note: pure function, but needs access to other parts of the state + if state.block_producer.vrf_evaluator().is_some() { + if best_tip.consensus_state().epoch_count.as_u32() == 0 { + dispatcher.push( + BlockProducerVrfEvaluatorAction::FinalizeEvaluatorInitialization { + previous_epoch_and_height: None, + }, + ); + } else { + let k = best_tip.constants().k.as_u32(); + let (epoch, slot) = to_epoch_and_slot( + &best_tip.consensus_state().curr_global_slot_since_hard_fork, + ); + let previous_epoch = epoch.saturating_sub(1); + let last_height = if slot < k { + let found = + state.transition_frontier.best_chain.iter().rev().find(|b| { + b.consensus_state().epoch_count.as_u32() == previous_epoch + }); + + if let Some(block) = found { + block.height() + } else { + Default::default() + } + } else if let Some(root_block) = state.transition_frontier.root() { + root_block.height() + } else { + Default::default() + }; + dispatcher.push( + BlockProducerVrfEvaluatorAction::FinalizeEvaluatorInitialization { + previous_epoch_and_height: Some((previous_epoch, last_height)), + }, + ); + } + } } BlockProducerVrfEvaluatorAction::FinalizeEvaluatorInitialization { previous_epoch_and_height, } => { if let Some((epoch, last_height)) = previous_epoch_and_height { - self.initialize_evaluator(*epoch, *last_height); + state.initialize_evaluator(*epoch, *last_height); } - self.status = + state.status = BlockProducerVrfEvaluatorStatus::InitialisationComplete { time: meta.time() } } BlockProducerVrfEvaluatorAction::CheckEpochEvaluability { @@ -70,24 +178,50 @@ impl BlockProducerVrfEvaluatorState { root_block_epoch, staking_epoch_data, next_epoch_data, - best_tip_slot: _, - best_tip_global_slot: _, - next_epoch_first_slot: _, + best_tip_slot, + best_tip_global_slot, + next_epoch_first_slot, } => { - self.status = BlockProducerVrfEvaluatorStatus::ReadinessCheck { + let best_tip_epoch = *best_tip_epoch; + + state.status = BlockProducerVrfEvaluatorStatus::ReadinessCheck { time: meta.time(), current_epoch: *current_epoch, is_next_epoch_seed_finalized: *is_next_epoch_seed_finalized, - best_tip_epoch: *best_tip_epoch, + best_tip_epoch, root_block_epoch: *root_block_epoch, - is_current_epoch_evaluated: self.is_epoch_evaluated(*best_tip_epoch), - is_next_epoch_evaluated: self.is_epoch_evaluated(best_tip_epoch + 1), - last_evaluated_epoch: self.last_evaluated_epoch(), + is_current_epoch_evaluated: state.is_epoch_evaluated(best_tip_epoch), + is_next_epoch_evaluated: state.is_epoch_evaluated(best_tip_epoch + 1), + last_evaluated_epoch: state.last_evaluated_epoch(), staking_epoch_data: staking_epoch_data.clone(), next_epoch_data: next_epoch_data.clone(), }; - self.set_epoch_context(); + state.set_epoch_context(); + + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + let vrf_evaluator_state = state.block_producer.vrf_evaluator_with_config(); + + if let Some((vrf_evaluator_state, config)) = vrf_evaluator_state { + if let Some(epoch_data) = vrf_evaluator_state.epoch_context().get_epoch_data() { + dispatcher.push( + BlockProducerVrfEvaluatorAction::InitializeEpochEvaluation { + staking_epoch_data: epoch_data, + producer: config.pub_key.clone().into(), + best_tip_global_slot: *best_tip_global_slot, + best_tip_epoch, + best_tip_slot: *best_tip_slot, + next_epoch_first_slot: *next_epoch_first_slot, + }, + ); + } else { + // If None is returned, than we are waiting for evaluation + dispatcher.push(BlockProducerVrfEvaluatorAction::WaitForNextEvaluation); + } + + dispatcher + .push(BlockProducerVrfEvaluatorAction::CleanupOldSlots { best_tip_epoch }); + } } BlockProducerVrfEvaluatorAction::InitializeEpochEvaluation { best_tip_epoch, @@ -97,17 +231,20 @@ impl BlockProducerVrfEvaluatorState { staking_epoch_data, producer, } => { - self.status = BlockProducerVrfEvaluatorStatus::ReadyToEvaluate { + state.status = BlockProducerVrfEvaluatorStatus::ReadyToEvaluate { time: meta.time(), best_tip_epoch: *best_tip_epoch, - is_current_epoch_evaluated: self.is_epoch_evaluated(*best_tip_epoch), - is_next_epoch_evaluated: self.is_epoch_evaluated(best_tip_epoch + 1), + is_current_epoch_evaluated: state.is_epoch_evaluated(*best_tip_epoch), + is_next_epoch_evaluated: state.is_epoch_evaluated(best_tip_epoch + 1), best_tip_slot: *best_tip_slot, best_tip_global_slot: *best_tip_global_slot, next_epoch_first_slot: *next_epoch_first_slot, staking_epoch_data: staking_epoch_data.clone(), producer: producer.clone(), - } + }; + + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(BlockProducerVrfEvaluatorAction::BeginDelegatorTableConstruction); } BlockProducerVrfEvaluatorAction::BeginDelegatorTableConstruction => { let BlockProducerVrfEvaluatorStatus::ReadyToEvaluate { @@ -120,11 +257,11 @@ impl BlockProducerVrfEvaluatorState { time: _, is_current_epoch_evaluated: _, is_next_epoch_evaluated: _, - } = &self.status + } = &state.status else { return; }; - self.status = BlockProducerVrfEvaluatorStatus::EpochDelegatorTablePending { + state.status = BlockProducerVrfEvaluatorStatus::EpochDelegatorTablePending { time: meta.time(), best_tip_epoch: *best_tip_epoch, staking_epoch_ledger_hash: staking_epoch_data.ledger.clone(), @@ -133,7 +270,19 @@ impl BlockProducerVrfEvaluatorState { next_epoch_first_slot: *next_epoch_first_slot, staking_epoch_data: staking_epoch_data.clone(), producer: producer.clone(), - } + }; + + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + let (staking_ledger_hash, producer) = + match state.block_producer.vrf_delegator_table_inputs() { + Some((v1, v2)) => (v1.clone(), v2.clone()), + None => return, + }; + + dispatcher.push(LedgerReadAction::Init { + request: LedgerReadRequest::DelegatorTable(staking_ledger_hash, producer), + callback: LedgerReadInitCallback::None, + }) } BlockProducerVrfEvaluatorAction::FinalizeDelegatorTableConstruction { delegator_table, @@ -147,15 +296,16 @@ impl BlockProducerVrfEvaluatorState { producer, time: _, staking_epoch_ledger_hash: _, - } = &self.status + } = &state.status else { + bug_condition!("Invalid state for `BlockProducerVrfEvaluatorAction::FinalizeDelegatorTableConstruction` expected: `BlockProducerVrfEvaluatorStatus::EpochDelegatorTablePending`, found: {:?}", state.status); return; }; let mut staking_epoch_data = staking_epoch_data.clone(); staking_epoch_data.delegator_table = delegator_table.clone(); - self.status = BlockProducerVrfEvaluatorStatus::EpochDelegatorTableSuccess { + state.status = BlockProducerVrfEvaluatorStatus::EpochDelegatorTableSuccess { time: meta.time(), best_tip_epoch: *best_tip_epoch, staking_epoch_ledger_hash: staking_epoch_data.ledger.clone(), @@ -164,7 +314,40 @@ impl BlockProducerVrfEvaluatorState { next_epoch_first_slot: *next_epoch_first_slot, staking_epoch_data: staking_epoch_data.clone(), producer: producer.clone(), - } + }; + + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + let get_slot_and_status = || { + let cur_global_slot = state.cur_global_slot()?; + let status = &state.block_producer.vrf_evaluator()?.status; + + Some((cur_global_slot, status)) + }; + + let Some(( + current_global_slot, + BlockProducerVrfEvaluatorStatus::EpochDelegatorTableSuccess { + best_tip_epoch, + best_tip_slot, + best_tip_global_slot, + next_epoch_first_slot, + staking_epoch_data, + .. + }, + )) = get_slot_and_status() + else { + bug_condition!("Invalid state for `BlockProducerVrfEvaluatorAction::FinalizeDelegatorTableConstruction`"); + return; + }; + + dispatcher.push(BlockProducerVrfEvaluatorAction::SelectInitialSlot { + current_global_slot, + best_tip_epoch: *best_tip_epoch, + best_tip_slot: *best_tip_slot, + best_tip_global_slot: *best_tip_global_slot, + next_epoch_first_slot: *next_epoch_first_slot, + staking_epoch_data: staking_epoch_data.clone(), + }); } BlockProducerVrfEvaluatorAction::BeginEpochEvaluation { best_tip_epoch, @@ -173,28 +356,47 @@ impl BlockProducerVrfEvaluatorState { best_tip_slot: _, best_tip_global_slot: _, } => { - self.set_pending_evaluation(PendingEvaluation { - epoch_number: *best_tip_epoch, + let latest_evaluated_global_slot = *latest_evaluated_global_slot; + let epoch_number = *best_tip_epoch; + + state.set_pending_evaluation(PendingEvaluation { + epoch_number, epoch_data: staking_epoch_data.clone(), - latest_evaluated_slot: *latest_evaluated_global_slot, + latest_evaluated_slot: latest_evaluated_global_slot, }); - self.status = BlockProducerVrfEvaluatorStatus::EpochEvaluationPending { + state.status = BlockProducerVrfEvaluatorStatus::EpochEvaluationPending { time: meta.time(), - epoch_number: *best_tip_epoch, + epoch_number, epoch_data: staking_epoch_data.clone(), - latest_evaluated_global_slot: *latest_evaluated_global_slot, + latest_evaluated_global_slot, + }; + + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + if state.block_producer.vrf_evaluator().is_some() { + dispatcher.push(BlockProducerVrfEvaluatorAction::ContinueEpochEvaluation { + latest_evaluated_global_slot, + epoch_number, + }); } } BlockProducerVrfEvaluatorAction::ContinueEpochEvaluation { epoch_number, latest_evaluated_global_slot, } => { - if let Some(pending_evaluation) = self.current_evaluation() { - self.status = BlockProducerVrfEvaluatorStatus::EpochEvaluationPending { + if let Some(pending_evaluation) = state.current_evaluation() { + state.status = BlockProducerVrfEvaluatorStatus::EpochEvaluationPending { time: meta.time(), epoch_number: *epoch_number, epoch_data: pending_evaluation.epoch_data, latest_evaluated_global_slot: *latest_evaluated_global_slot, + }; + } + + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + if let Some(vrf_evaluator_state) = state.block_producer.vrf_evaluator() { + if let Some(vrf_input) = vrf_evaluator_state.construct_vrf_input() { + dispatcher + .push(BlockProducerVrfEvaluatorAction::EvaluateSlot { vrf_input }); } } } @@ -202,41 +404,56 @@ impl BlockProducerVrfEvaluatorState { epoch_number, latest_evaluated_global_slot: _, } => { - self.unset_pending_evaluation(); - self.status = BlockProducerVrfEvaluatorStatus::EpochEvaluationSuccess { + state.unset_pending_evaluation(); + state.status = BlockProducerVrfEvaluatorStatus::EpochEvaluationSuccess { time: meta.time(), epoch_number: *epoch_number, }; - self.set_last_evaluated_epoch(); + state.set_last_evaluated_epoch(); } BlockProducerVrfEvaluatorAction::WaitForNextEvaluation => { - self.status = + state.status = BlockProducerVrfEvaluatorStatus::WaitingForNextEvaluation { time: meta.time() }; } BlockProducerVrfEvaluatorAction::SelectInitialSlot { best_tip_epoch, current_global_slot, next_epoch_first_slot, - best_tip_slot: _, - best_tip_global_slot: _, - staking_epoch_data: _, + best_tip_slot: current_best_tip_slot, + best_tip_global_slot: current_best_tip_global_slot, + staking_epoch_data, } => { - let (epoch_number, initial_slot) = match self.epoch_context() { + let (epoch_number, initial_slot) = match state.epoch_context() { super::EpochContext::Current(_) => (*best_tip_epoch, *current_global_slot), super::EpochContext::Next(_) => (best_tip_epoch + 1, next_epoch_first_slot - 1), super::EpochContext::Waiting => todo!(), }; - self.status = BlockProducerVrfEvaluatorStatus::InitialSlotSelection { + state.status = BlockProducerVrfEvaluatorStatus::InitialSlotSelection { time: meta.time(), epoch_number, initial_slot, + }; + + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + if let Some(initial_slot) = state + .block_producer + .vrf_evaluator() + .and_then(|v| v.initial_slot()) + { + dispatcher.push(BlockProducerVrfEvaluatorAction::BeginEpochEvaluation { + best_tip_epoch: *best_tip_epoch, + best_tip_global_slot: *current_best_tip_global_slot, + best_tip_slot: *current_best_tip_slot, + staking_epoch_data: staking_epoch_data.clone(), + latest_evaluated_global_slot: initial_slot, + }); } } BlockProducerVrfEvaluatorAction::CleanupOldSlots { best_tip_epoch } => { - self.cleanup_old_won_slots(best_tip_epoch); + state.cleanup_old_won_slots(best_tip_epoch); } BlockProducerVrfEvaluatorAction::InterruptEpochEvaluation { reason } => { - self.status = BlockProducerVrfEvaluatorStatus::EpochEvaluationInterrupted { + state.status = BlockProducerVrfEvaluatorStatus::EpochEvaluationInterrupted { time: meta.time(), reason: reason.clone(), }; diff --git a/node/src/block_producer/vrf_evaluator/mod.rs b/node/src/block_producer/vrf_evaluator/mod.rs index 1c8f624618..0cfe2e0014 100644 --- a/node/src/block_producer/vrf_evaluator/mod.rs +++ b/node/src/block_producer/vrf_evaluator/mod.rs @@ -9,11 +9,6 @@ pub use block_producer_vrf_evaluator_actions::*; mod block_producer_vrf_evaluator_reducer; -mod block_producer_vrf_evaluator_effects; - -mod block_producer_vrf_evaluator_service; -pub use block_producer_vrf_evaluator_service::*; - use std::collections::BTreeMap; use std::sync::Arc; diff --git a/node/src/block_producer_effectful/block_producer_effectful_actions.rs b/node/src/block_producer_effectful/block_producer_effectful_actions.rs new file mode 100644 index 0000000000..a81b4702b9 --- /dev/null +++ b/node/src/block_producer_effectful/block_producer_effectful_actions.rs @@ -0,0 +1,26 @@ +use super::vrf_evaluator_effectful::BlockProducerVrfEvaluatorEffectfulAction; +use crate::block_producer::{BlockProducerWonSlot, BlockProducerWonSlotDiscardReason}; +use openmina_core::ActionEvent; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)] +pub enum BlockProducerEffectfulAction { + VrfEvaluator(BlockProducerVrfEvaluatorEffectfulAction), + WonSlot { + won_slot: BlockProducerWonSlot, + }, + WonSlotDiscard { + reason: BlockProducerWonSlotDiscardReason, + }, + StagedLedgerDiffCreateInit, + StagedLedgerDiffCreateSuccess, + BlockUnprovenBuild, + BlockProveInit, + BlockProveSuccess, +} + +impl redux::EnablingCondition for BlockProducerEffectfulAction { + fn is_enabled(&self, _state: &crate::State, _time: redux::Timestamp) -> bool { + true + } +} diff --git a/node/src/block_producer/block_producer_effects.rs b/node/src/block_producer_effectful/block_producer_effectful_effects.rs similarity index 52% rename from node/src/block_producer/block_producer_effects.rs rename to node/src/block_producer_effectful/block_producer_effectful_effects.rs index 66bca390a4..798ddaf009 100644 --- a/node/src/block_producer/block_producer_effects.rs +++ b/node/src/block_producer_effectful/block_producer_effectful_effects.rs @@ -1,122 +1,28 @@ +use crate::{ + block_producer::BlockProducerCurrentState, + ledger::write::{LedgerWriteAction, LedgerWriteRequest}, + BlockProducerAction, Store, +}; use mina_p2p_messages::v2::{ BlockchainSnarkBlockchainStableV2, ConsensusStakeProofStableV2, MinaStateSnarkTransitionValueStableV2, ProverExtendBlockchainInputStableV2, }; -use openmina_core::bug_condition; -use openmina_core::consensus::in_seed_update_range; - -use crate::account::AccountSecretKey; -use crate::ledger::write::{LedgerWriteAction, LedgerWriteRequest}; -use crate::transition_frontier::sync::TransitionFrontierSyncAction; -use crate::{Store, TransactionPoolAction}; +use openmina_node_account::AccountSecretKey; +use redux::ActionWithMeta; -use super::vrf_evaluator::{BlockProducerVrfEvaluatorAction, InterruptReason}; -use super::{ - next_epoch_first_slot, to_epoch_and_slot, BlockProducerAction, BlockProducerActionWithMeta, - BlockProducerCurrentState, -}; +use super::BlockProducerEffectfulAction; pub fn block_producer_effects( store: &mut Store, - action: BlockProducerActionWithMeta, + action: ActionWithMeta, ) { let (action, meta) = action.split(); match action { - BlockProducerAction::VrfEvaluator(a) => { - // TODO: does the order matter? can this clone be avoided? - let has_won_slot = match &a { - BlockProducerVrfEvaluatorAction::ProcessSlotEvaluationSuccess { - vrf_output, - .. - } => { - matches!(vrf_output, vrf::VrfEvaluationOutput::SlotWon(_)) - } - _ => false, - }; + BlockProducerEffectfulAction::VrfEvaluator(a) => { a.effects(&meta, store); - if has_won_slot { - store.dispatch(BlockProducerAction::WonSlotSearch); - } } - BlockProducerAction::BestTipUpdate { best_tip } => { - let global_slot = best_tip - .consensus_state() - .curr_global_slot_since_hard_fork - .clone(); - - let (best_tip_epoch, best_tip_slot) = to_epoch_and_slot(&global_slot); - let root_block_epoch = if let Some(root_block) = - store.state().transition_frontier.root() - { - let root_block_global_slot = root_block.curr_global_slot_since_hard_fork(); - to_epoch_and_slot(root_block_global_slot).0 - } else { - bug_condition!("Expected to find a block at the root of the transition frontier but there was none"); - best_tip_epoch.saturating_sub(1) - }; - let next_epoch_first_slot = next_epoch_first_slot(&global_slot); - let current_epoch = store.state().current_epoch(); - let current_slot = store.state().current_slot(); - - store.dispatch(BlockProducerVrfEvaluatorAction::InitializeEvaluator { - best_tip: best_tip.clone(), - }); - - // None if the evaluator is not evaluating - let currenty_evaluated_epoch = store - .state() - .block_producer - .vrf_evaluator() - .and_then(|vrf_evaluator| vrf_evaluator.currently_evaluated_epoch()); - - if let Some(currently_evaluated_epoch) = currenty_evaluated_epoch { - // if we receive a block with higher epoch than the current one, interrupt the evaluation - if currently_evaluated_epoch < best_tip_epoch { - store.dispatch(BlockProducerVrfEvaluatorAction::InterruptEpochEvaluation { - reason: InterruptReason::BestTipWithHigherEpoch, - }); - } - } - - let is_next_epoch_seed_finalized = if let Some(current_slot) = current_slot { - !in_seed_update_range(current_slot, best_tip.constants()) - } else { - false - }; - - store.dispatch(BlockProducerVrfEvaluatorAction::CheckEpochEvaluability { - current_epoch, - is_next_epoch_seed_finalized, - root_block_epoch, - best_tip_epoch, - best_tip_slot, - best_tip_global_slot: best_tip.global_slot(), - next_epoch_first_slot, - staking_epoch_data: Box::new(best_tip.consensus_state().staking_epoch_data.clone()), - next_epoch_data: Box::new(best_tip.consensus_state().next_epoch_data.clone()), - }); - - if let Some(reason) = store - .state() - .block_producer - .with(None, |bp| bp.current.won_slot_should_discard(&best_tip)) - { - store.dispatch(BlockProducerAction::WonSlotDiscard { reason }); - } else { - store.dispatch(BlockProducerAction::WonSlotSearch); - } - } - BlockProducerAction::WonSlotSearch => { - if let Some(won_slot) = store.state().block_producer.with(None, |bp| { - let best_tip = store.state().transition_frontier.best_tip()?; - let cur_global_slot = store.state().cur_global_slot()?; - bp.vrf_evaluator.next_won_slot(cur_global_slot, best_tip) - }) { - store.dispatch(BlockProducerAction::WonSlot { won_slot }); - } - } - BlockProducerAction::WonSlot { won_slot } => { + BlockProducerEffectfulAction::WonSlot { won_slot } => { if let Some(stats) = store.service.stats() { stats.block_producer().scheduled(meta.time(), &won_slot); } @@ -124,17 +30,7 @@ pub fn block_producer_effects( store.dispatch(BlockProducerAction::WonSlotProduceInit); } } - BlockProducerAction::WonSlotWait => {} - BlockProducerAction::WonSlotProduceInit => { - store.dispatch(BlockProducerAction::WonSlotTransactionsGet); - } - BlockProducerAction::WonSlotTransactionsGet => { - store.dispatch(TransactionPoolAction::CollectTransactionsByFee); - } - BlockProducerAction::WonSlotTransactionsSuccess { .. } => { - store.dispatch(BlockProducerAction::StagedLedgerDiffCreateInit); - } - BlockProducerAction::StagedLedgerDiffCreateInit => { + BlockProducerEffectfulAction::StagedLedgerDiffCreateInit => { if let Some(stats) = store.service.stats() { stats .block_producer() @@ -195,8 +91,7 @@ pub fn block_producer_effects( ), }); } - BlockProducerAction::StagedLedgerDiffCreatePending => {} - BlockProducerAction::StagedLedgerDiffCreateSuccess { .. } => { + BlockProducerEffectfulAction::StagedLedgerDiffCreateSuccess => { if let Some(stats) = store.service.stats() { stats .block_producer() @@ -204,7 +99,7 @@ pub fn block_producer_effects( } store.dispatch(BlockProducerAction::BlockUnprovenBuild); } - BlockProducerAction::BlockUnprovenBuild => { + BlockProducerEffectfulAction::BlockUnprovenBuild => { if let Some(stats) = store.service.stats() { let bp = &store.state.get().block_producer; if let Some((block_hash, block)) = bp.with(None, |bp| match &bp.current { @@ -221,7 +116,7 @@ pub fn block_producer_effects( store.dispatch(BlockProducerAction::BlockProveInit); } - BlockProducerAction::BlockProveInit => { + BlockProducerEffectfulAction::BlockProveInit => { let service = &mut store.service; if let Some(stats) = service.stats() { @@ -293,46 +188,13 @@ pub fn block_producer_effects( service.prove(block_hash, input); store.dispatch(BlockProducerAction::BlockProvePending); } - BlockProducerAction::BlockProvePending => {} - BlockProducerAction::BlockProveSuccess { .. } => { + BlockProducerEffectfulAction::BlockProveSuccess => { if let Some(stats) = store.service.stats() { stats.block_producer().proof_create_end(meta.time()); } store.dispatch(BlockProducerAction::BlockProduced); } - BlockProducerAction::BlockProduced => { - store.dispatch(BlockProducerAction::BlockInject); - } - BlockProducerAction::BlockInject => { - let Some((best_tip, root_block, blocks_inbetween)) = None.or_else(|| { - let (best_tip, chain) = store.state().block_producer.produced_block_with_chain()?; - let mut iter = chain.iter(); - let root_block = iter.next()?.block_with_hash(); - let blocks_inbetween = iter.map(|b| b.hash().clone()).collect(); - Some((best_tip.clone(), root_block.clone(), blocks_inbetween)) - }) else { - return; - }; - - let previous_root_snarked_ledger_hash = store - .state() - .transition_frontier - .root() - .map(|b| b.snarked_ledger_hash().clone()); - - if store.dispatch(TransitionFrontierSyncAction::BestTipUpdate { - previous_root_snarked_ledger_hash, - best_tip: best_tip.clone(), - root_block, - blocks_inbetween, - }) { - store.dispatch(BlockProducerAction::BlockInjected); - } - } - BlockProducerAction::BlockInjected => { - store.dispatch(BlockProducerAction::WonSlotSearch); - } - BlockProducerAction::WonSlotDiscard { reason } => { + BlockProducerEffectfulAction::WonSlotDiscard { reason } => { if let Some(stats) = store.service.stats() { stats.block_producer().discarded(meta.time(), reason); } diff --git a/node/src/block_producer/block_producer_service.rs b/node/src/block_producer_effectful/block_producer_effectful_service.rs similarity index 100% rename from node/src/block_producer/block_producer_service.rs rename to node/src/block_producer_effectful/block_producer_effectful_service.rs diff --git a/node/src/block_producer_effectful/mod.rs b/node/src/block_producer_effectful/mod.rs new file mode 100644 index 0000000000..40ce48fda6 --- /dev/null +++ b/node/src/block_producer_effectful/mod.rs @@ -0,0 +1,10 @@ +pub mod vrf_evaluator_effectful; + +mod block_producer_effectful_actions; +pub use block_producer_effectful_actions::*; + +mod block_producer_effectful_effects; +pub use block_producer_effectful_effects::*; + +mod block_producer_effectful_service; +pub use block_producer_effectful_service::*; diff --git a/node/src/block_producer_effectful/vrf_evaluator_effectful/block_producer_vrf_evaluator_effectful_actions.rs b/node/src/block_producer_effectful/vrf_evaluator_effectful/block_producer_vrf_evaluator_effectful_actions.rs new file mode 100644 index 0000000000..01fcf6f521 --- /dev/null +++ b/node/src/block_producer_effectful/vrf_evaluator_effectful/block_producer_vrf_evaluator_effectful_actions.rs @@ -0,0 +1,20 @@ +use crate::block_producer::vrf_evaluator::VrfEvaluatorInput; +use openmina_core::ActionEvent; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)] +pub enum BlockProducerVrfEvaluatorEffectfulAction { + EvaluateSlot { vrf_input: VrfEvaluatorInput }, +} + +impl redux::EnablingCondition for BlockProducerVrfEvaluatorEffectfulAction { + fn is_enabled(&self, _state: &crate::State, _time: redux::Timestamp) -> bool { + true + } +} + +impl From for crate::Action { + fn from(value: BlockProducerVrfEvaluatorEffectfulAction) -> Self { + Self::BlockProducerEffectful(crate::BlockProducerEffectfulAction::VrfEvaluator(value)) + } +} diff --git a/node/src/block_producer_effectful/vrf_evaluator_effectful/block_producer_vrf_evaluator_effectful_effects.rs b/node/src/block_producer_effectful/vrf_evaluator_effectful/block_producer_vrf_evaluator_effectful_effects.rs new file mode 100644 index 0000000000..1a0d6bf41c --- /dev/null +++ b/node/src/block_producer_effectful/vrf_evaluator_effectful/block_producer_vrf_evaluator_effectful_effects.rs @@ -0,0 +1,15 @@ +use crate::Service; +use crate::Store; +use redux::ActionMeta; + +use super::BlockProducerVrfEvaluatorEffectfulAction; + +impl BlockProducerVrfEvaluatorEffectfulAction { + pub fn effects(self, _: &ActionMeta, store: &mut Store) { + match self { + BlockProducerVrfEvaluatorEffectfulAction::EvaluateSlot { vrf_input } => { + store.service.evaluate(vrf_input); + } + } + } +} diff --git a/node/src/block_producer/vrf_evaluator/block_producer_vrf_evaluator_service.rs b/node/src/block_producer_effectful/vrf_evaluator_effectful/block_producer_vrf_evaluator_effectful_service.rs similarity index 65% rename from node/src/block_producer/vrf_evaluator/block_producer_vrf_evaluator_service.rs rename to node/src/block_producer_effectful/vrf_evaluator_effectful/block_producer_vrf_evaluator_effectful_service.rs index 84b3a0564d..f66525d360 100644 --- a/node/src/block_producer/vrf_evaluator/block_producer_vrf_evaluator_service.rs +++ b/node/src/block_producer_effectful/vrf_evaluator_effectful/block_producer_vrf_evaluator_effectful_service.rs @@ -1,4 +1,4 @@ -use super::VrfEvaluatorInput; +use crate::block_producer::vrf_evaluator::VrfEvaluatorInput; pub trait BlockProducerVrfEvaluatorService: redux::Service { fn evaluate(&mut self, data: VrfEvaluatorInput); diff --git a/node/src/block_producer_effectful/vrf_evaluator_effectful/mod.rs b/node/src/block_producer_effectful/vrf_evaluator_effectful/mod.rs new file mode 100644 index 0000000000..c55fbe7da8 --- /dev/null +++ b/node/src/block_producer_effectful/vrf_evaluator_effectful/mod.rs @@ -0,0 +1,7 @@ +mod block_producer_vrf_evaluator_effectful_actions; +pub use block_producer_vrf_evaluator_effectful_actions::*; + +mod block_producer_vrf_evaluator_effectful_effects; + +mod block_producer_vrf_evaluator_effectful_service; +pub use block_producer_vrf_evaluator_effectful_service::*; diff --git a/node/src/consensus/consensus_reducer.rs b/node/src/consensus/consensus_reducer.rs index 19fd383106..2db1931633 100644 --- a/node/src/consensus/consensus_reducer.rs +++ b/node/src/consensus/consensus_reducer.rs @@ -268,6 +268,7 @@ impl ConsensusState { best_tip, root_block, blocks_inbetween, + on_success: None, }); } ConsensusAction::P2pBestTipUpdate { best_tip } => { diff --git a/node/src/effects.rs b/node/src/effects.rs index b94fca3399..88c051216a 100644 --- a/node/src/effects.rs +++ b/node/src/effects.rs @@ -1,7 +1,8 @@ use openmina_core::log::system_time; use rand::prelude::*; -use crate::block_producer::{block_producer_effects, BlockProducerAction}; +use crate::block_producer::BlockProducerAction; +use crate::block_producer_effectful::block_producer_effects; use crate::event_source::event_source_effects; use crate::external_snark_worker::external_snark_worker_effects; use crate::ledger::ledger_effects; @@ -81,7 +82,8 @@ pub fn effects(store: &mut Store, action: ActionWithMeta) { Action::SnarkPoolEffect(action) => { snark_pool_effects(store, meta.with_action(action)); } - Action::BlockProducer(action) => { + Action::BlockProducer(_) => {} + Action::BlockProducerEffectful(action) => { block_producer_effects(store, meta.with_action(action)); } Action::ExternalSnarkWorker(action) => { diff --git a/node/src/ledger/ledger_service.rs b/node/src/ledger/ledger_service.rs index 301ec0788c..7e9846e4fb 100644 --- a/node/src/ledger/ledger_service.rs +++ b/node/src/ledger/ledger_service.rs @@ -49,7 +49,7 @@ use openmina_core::{block::AppliedBlock, constants::constraint_constants}; use mina_signer::CompressedPubKey; use openmina_core::block::ArcBlockWithHash; -use crate::block_producer::StagedLedgerDiffCreateOutput; +use crate::block_producer_effectful::StagedLedgerDiffCreateOutput; use crate::p2p::channels::rpc::StagedLedgerAuxAndPendingCoinbases; use crate::rpc::{ RpcScanStateSummaryBlockTransaction, RpcScanStateSummaryScanStateJob, diff --git a/node/src/ledger/write/mod.rs b/node/src/ledger/write/mod.rs index 88a8646a65..64f8c3ea81 100644 --- a/node/src/ledger/write/mod.rs +++ b/node/src/ledger/write/mod.rs @@ -16,7 +16,7 @@ use ledger::scan_state::scan_state::AvailableJobMessage; use mina_p2p_messages::v2; use serde::{Deserialize, Serialize}; -use crate::block_producer::StagedLedgerDiffCreateOutput; +use crate::block_producer_effectful::StagedLedgerDiffCreateOutput; use crate::core::block::ArcBlockWithHash; use crate::core::snark::{Snark, SnarkJobId}; use crate::transition_frontier::sync::ledger::staged::StagedLedgerAuxAndPendingCoinbasesValid; diff --git a/node/src/lib.rs b/node/src/lib.rs index 7e18daf5e8..76f23d7dbb 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -30,6 +30,7 @@ pub mod recorder; pub mod stats; pub mod block_producer; +pub mod block_producer_effectful; pub mod consensus; pub mod daemon_json; pub mod event_source; diff --git a/node/src/reducer.rs b/node/src/reducer.rs index 4809b93858..9b5a08befe 100644 --- a/node/src/reducer.rs +++ b/node/src/reducer.rs @@ -2,7 +2,8 @@ use openmina_core::{bug_condition, error, Substate}; use p2p::{P2pAction, P2pEffectfulAction, P2pInitializeAction, P2pState}; use crate::{ - rpc::RpcState, Action, ActionWithMeta, ConsensusAction, EventSourceAction, P2p, State, + rpc::RpcState, state::BlockProducerState, Action, ActionWithMeta, ConsensusAction, + EventSourceAction, P2p, State, }; pub fn reducer( @@ -81,11 +82,10 @@ pub fn reducer( ); } Action::TransactionPoolEffect(_) => {} - Action::BlockProducer(a) => { - state - .block_producer - .reducer(meta.with_action(a), &state.transition_frontier.best_chain); + Action::BlockProducer(action) => { + BlockProducerState::reducer(Substate::new(state, dispatcher), meta.with_action(action)); } + Action::BlockProducerEffectful(_) => {} Action::ExternalSnarkWorker(a) => { state.external_snark_worker.reducer(meta.with_action(a)); } diff --git a/node/src/service.rs b/node/src/service.rs index f29474cf1f..c656a6e579 100644 --- a/node/src/service.rs +++ b/node/src/service.rs @@ -1,5 +1,5 @@ -pub use crate::block_producer::vrf_evaluator::BlockProducerVrfEvaluatorService; -pub use crate::block_producer::BlockProducerService; +pub use crate::block_producer_effectful::vrf_evaluator_effectful::BlockProducerVrfEvaluatorService; +pub use crate::block_producer_effectful::BlockProducerService; pub use crate::event_source::EventSourceService; pub use crate::external_snark_worker::ExternalSnarkWorkerService; pub use crate::ledger::LedgerService; diff --git a/node/src/state.rs b/node/src/state.rs index 76de2ce522..f62f3abdd3 100644 --- a/node/src/state.rs +++ b/node/src/state.rs @@ -24,6 +24,7 @@ use snark::block_verify::SnarkBlockVerifyState; use snark::user_command_verify::SnarkUserCommandVerifyState; use snark::work_verify::SnarkWorkVerifyState; +use crate::block_producer::vrf_evaluator::BlockProducerVrfEvaluatorState; pub use crate::block_producer::BlockProducerState; pub use crate::consensus::ConsensusState; use crate::external_snark_worker::ExternalSnarkWorkers; @@ -128,6 +129,24 @@ impl openmina_core::SubstateAccess for State } } +impl SubstateAccess for State { + fn substate(&self) -> openmina_core::SubstateResult<&BlockProducerVrfEvaluatorState> { + self.block_producer + .as_ref() + .map(|state| &state.vrf_evaluator) + .ok_or_else(|| "Block producer VRF evaluator state unavailable".to_owned()) + } + + fn substate_mut( + &mut self, + ) -> openmina_core::SubstateResult<&mut BlockProducerVrfEvaluatorState> { + self.block_producer + .as_mut() + .map(|state| &mut state.vrf_evaluator) + .ok_or_else(|| "Block producer VRF evaluator state unavailable".to_owned()) + } +} + impl openmina_core::SubstateAccess for State { fn substate(&self) -> openmina_core::SubstateResult<&TransitionFrontierSyncLedgerSnarkedState> { self.transition_frontier diff --git a/node/src/transition_frontier/sync/transition_frontier_sync_actions.rs b/node/src/transition_frontier/sync/transition_frontier_sync_actions.rs index 7ac2b1416c..c2c37d4857 100644 --- a/node/src/transition_frontier/sync/transition_frontier_sync_actions.rs +++ b/node/src/transition_frontier/sync/transition_frontier_sync_actions.rs @@ -2,6 +2,7 @@ use mina_p2p_messages::v2::{LedgerHash, StateHash}; use openmina_core::block::ArcBlockWithHash; use openmina_core::consensus::consensus_take; use openmina_core::ActionEvent; +use redux::Callback; use serde::{Deserialize, Serialize}; use crate::ledger::write::CommitResult; @@ -45,6 +46,7 @@ pub enum TransitionFrontierSyncAction { best_tip: ArcBlockWithHash, root_block: ArcBlockWithHash, blocks_inbetween: Vec, + on_success: Option>, }, /// Staking Ledger sync is pending #[action_event(level = info)] diff --git a/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs b/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs index 496c71c936..e121f1ab44 100644 --- a/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs +++ b/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs @@ -49,6 +49,7 @@ impl TransitionFrontierSyncAction { TransitionFrontierSyncAction::BestTipUpdate { previous_root_snarked_ledger_hash, best_tip, + on_success, .. } => { // TODO(tizoc): this is currently required because how how complicated the BestTipUpdate reducer is, @@ -72,6 +73,9 @@ impl TransitionFrontierSyncAction { store.dispatch(TransitionFrontierSyncAction::BlocksNextApplyInit); // TODO(binier): cleanup ledgers + if let Some(callback) = on_success { + store.dispatch_callback(callback.clone(), ()); + } } // TODO(tizoc): this action is never called with the current implementation, // either remove it or figure out how to recover it as a reaction to diff --git a/node/src/transition_frontier/sync/transition_frontier_sync_reducer.rs b/node/src/transition_frontier/sync/transition_frontier_sync_reducer.rs index e1809a8c19..029a2b7667 100644 --- a/node/src/transition_frontier/sync/transition_frontier_sync_reducer.rs +++ b/node/src/transition_frontier/sync/transition_frontier_sync_reducer.rs @@ -44,6 +44,7 @@ impl TransitionFrontierSyncState { best_tip, root_block, blocks_inbetween, + .. } => match state { Self::StakingLedgerPending(substate) | Self::NextEpochLedgerPending(substate) diff --git a/p2p/src/connection/incoming/p2p_connection_incoming_reducer.rs b/p2p/src/connection/incoming/p2p_connection_incoming_reducer.rs index a47c5c14a6..0b227ee79e 100644 --- a/p2p/src/connection/incoming/p2p_connection_incoming_reducer.rs +++ b/p2p/src/connection/incoming/p2p_connection_incoming_reducer.rs @@ -252,7 +252,8 @@ impl P2pConnectionIncomingState { rpc_id: rpc_id.take(), }; - state_context.into_dispatcher().push(P2pConnectionIncomingEffectfulAction::ConnectionAuthorizationEncryptAndSend { peer_id, other_pub_key, auth }); + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(P2pConnectionIncomingEffectfulAction::ConnectionAuthorizationEncryptAndSend { peer_id, other_pub_key, auth }); } else { bug_condition!( "Invalid state for `P2pConnectionIncomingAction::FinalizePending`: {:?}", diff --git a/p2p/src/connection/outgoing/p2p_connection_outgoing_reducer.rs b/p2p/src/connection/outgoing/p2p_connection_outgoing_reducer.rs index 4994b9bd49..de0acbd268 100644 --- a/p2p/src/connection/outgoing/p2p_connection_outgoing_reducer.rs +++ b/p2p/src/connection/outgoing/p2p_connection_outgoing_reducer.rs @@ -315,8 +315,9 @@ impl P2pConnectionOutgoingState { state ); } - state_context - .into_dispatcher() + + let dispatcher = state_context.into_dispatcher(); + dispatcher .push(P2pConnectionOutgoingEffectfulAction::AnswerSet { peer_id, answer }); Ok(()) } @@ -362,7 +363,8 @@ impl P2pConnectionOutgoingState { } }; - state_context.into_dispatcher().push( + let dispatcher = state_context.into_dispatcher(); + dispatcher.push( P2pConnectionOutgoingEffectfulAction::ConnectionAuthorizationEncryptAndSend { peer_id, other_pub_key,