diff --git a/Cargo.lock b/Cargo.lock index 849639cdcc..abbb20c595 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5143,6 +5143,7 @@ dependencies = [ "hex", "js-sys", "lazy_static", + "libp2p-identity", "md5", "mina-hasher", "mina-p2p-messages", diff --git a/core/Cargo.toml b/core/Cargo.toml index 2ba5c357db..c4cf287f4c 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -36,6 +36,10 @@ mina-p2p-messages = { workspace = true } poseidon = { workspace = true } hex = "0.4.3" ark-ff = { workspace = true } +libp2p-identity = { version = "=0.2.7", features = [ + "serde", + "peerid" +] } [target.'cfg(not(target_family = "wasm"))'.dependencies] redux = { workspace = true, features = ["serializable_callbacks"] } diff --git a/core/src/lib.rs b/core/src/lib.rs index 967b6276ff..7d2cbb07f0 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -17,6 +17,7 @@ pub mod constants; pub mod dummy; pub mod block; +pub mod p2p; pub mod snark; pub mod transaction; diff --git a/core/src/p2p.rs b/core/src/p2p.rs new file mode 100644 index 0000000000..09463b9543 --- /dev/null +++ b/core/src/p2p.rs @@ -0,0 +1,16 @@ +/// TODO: These types and methods should be moved to `p2p` crate, they are here because they are used in `snark` crates callbacks +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy)] +pub struct P2pNetworkPubsubMessageCacheId { + pub source: libp2p_identity::PeerId, + pub seqno: u64, +} + +impl P2pNetworkPubsubMessageCacheId { + pub fn to_raw_bytes(&self) -> Vec { + let mut message_id = self.source.to_base58(); + message_id.push_str(&self.seqno.to_string()); + message_id.into_bytes() + } +} diff --git a/core/src/transaction/mod.rs b/core/src/transaction/mod.rs index 4a4a9e5938..636c5ab18f 100644 --- a/core/src/transaction/mod.rs +++ b/core/src/transaction/mod.rs @@ -5,3 +5,36 @@ mod transaction_with_hash; pub use transaction_with_hash::*; pub use mina_p2p_messages::v2::{MinaBaseUserCommandStableV2 as Transaction, TransactionHash}; + +use crate::{p2p::P2pNetworkPubsubMessageCacheId, requests::RpcId}; + +/// TODO: Types and methods bellow, should be moved to `node` crate, they are here because they are used in `snark` crates callbacks +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, Default)] +pub enum TransactionPoolMessageSource { + Rpc { + id: RpcId, + }, + Pubsub { + id: P2pNetworkPubsubMessageCacheId, + }, + #[default] + None, +} + +impl TransactionPoolMessageSource { + pub fn rpc(id: RpcId) -> Self { + Self::Rpc { id } + } + + pub fn pubsub(id: P2pNetworkPubsubMessageCacheId) -> Self { + Self::Pubsub { id } + } + + pub fn is_sender_local(&self) -> bool { + matches!(self, Self::Rpc { .. }) + } + + pub fn is_libp2p(&self) -> bool { + matches!(self, Self::Pubsub { .. }) + } +} diff --git a/node/src/rpc/rpc_reducer.rs b/node/src/rpc/rpc_reducer.rs index 707d0fe90a..bee7aaf85b 100644 --- a/node/src/rpc/rpc_reducer.rs +++ b/node/src/rpc/rpc_reducer.rs @@ -2,7 +2,7 @@ use openmina_core::{ block::AppliedBlock, bug_condition, requests::{RequestId, RpcId, RpcIdType}, - transaction::TransactionWithHash, + transaction::{TransactionPoolMessageSource, TransactionWithHash}, }; use p2p::{ connection::{incoming::P2pConnectionIncomingAction, outgoing::P2pConnectionOutgoingAction}, @@ -515,7 +515,7 @@ impl RpcState { dispatcher.push(RpcAction::TransactionInjectPending { rpc_id: *rpc_id }); dispatcher.push(TransactionPoolAction::StartVerify { commands: commands_with_hash, - from_rpc: Some(*rpc_id), + from_source: TransactionPoolMessageSource::rpc(*rpc_id), }); } RpcAction::TransactionInjectPending { rpc_id } => { diff --git a/node/src/snark_pool/candidate/snark_pool_candidate_actions.rs b/node/src/snark_pool/candidate/snark_pool_candidate_actions.rs index f35ce599a1..1f5caf60a7 100644 --- a/node/src/snark_pool/candidate/snark_pool_candidate_actions.rs +++ b/node/src/snark_pool/candidate/snark_pool_candidate_actions.rs @@ -49,6 +49,7 @@ pub enum SnarkPoolCandidateAction { WorkVerifyError { peer_id: PeerId, verify_id: SnarkWorkVerifyId, + batch: Vec, }, WorkVerifySuccess { peer_id: PeerId, diff --git a/node/src/snark_pool/candidate/snark_pool_candidate_reducer.rs b/node/src/snark_pool/candidate/snark_pool_candidate_reducer.rs index c11769c5fd..5ee9e3aa74 100644 --- a/node/src/snark_pool/candidate/snark_pool_candidate_reducer.rs +++ b/node/src/snark_pool/candidate/snark_pool_candidate_reducer.rs @@ -1,11 +1,11 @@ use std::collections::BTreeMap; use crate::{p2p_ready, SnarkPoolAction}; -use openmina_core::snark::Snark; +use openmina_core::snark::{Snark, SnarkJobId}; use p2p::{ channels::rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcRequest}, disconnection::{P2pDisconnectionAction, P2pDisconnectionReason}, - PeerId, + BroadcastMessageId, P2pNetworkPubsubAction, PeerId, }; use snark::{work_verify::SnarkWorkVerifyAction, work_verify_effectful::SnarkWorkVerifyId}; @@ -124,10 +124,11 @@ impl SnarkPoolCandidatesState { } }), on_error: redux::callback!( - on_snark_pool_candidate_work_verify_error((req_id: SnarkWorkVerifyId, sender: String)) -> crate::Action { + on_snark_pool_candidate_work_verify_error((req_id: SnarkWorkVerifyId, sender: String, batch: Vec)) -> crate::Action { SnarkPoolCandidateAction::WorkVerifyError { peer_id: sender.parse().unwrap(), verify_id: req_id, + batch } }), }); @@ -144,7 +145,11 @@ impl SnarkPoolCandidatesState { } => { state.verify_pending(meta.time(), peer_id, *verify_id, job_ids); } - SnarkPoolCandidateAction::WorkVerifyError { peer_id, verify_id } => { + SnarkPoolCandidateAction::WorkVerifyError { + peer_id, + verify_id, + batch, + } => { state.verify_result(meta.time(), peer_id, *verify_id, Err(())); // TODO(binier): blacklist peer @@ -154,6 +159,22 @@ impl SnarkPoolCandidatesState { peer_id, reason: P2pDisconnectionReason::SnarkPoolVerifyError, }); + + // TODO: This is not correct. We are rejecting all snark messages, but the fact that the batch + // failed to verify means that there is at least one invalid snark in the batch, not that all of them + // are invalid. + // Instead, what should happen here is that we split the batch in two and try to verify the two batches + // again. Repeating until batches don't fail to verify anymore, or each batch is of size 1. + // It may also be worth capping the batch sizes to 10. + for snark_job_id in batch { + dispatcher.push(P2pNetworkPubsubAction::RejectMessage { + message_id: Some(BroadcastMessageId::Snark { + job_id: snark_job_id.clone(), + }), + peer_id: None, + reason: "Snark work verification failed".to_string(), + }); + } } SnarkPoolCandidateAction::WorkVerifySuccess { peer_id, diff --git a/node/src/snark_pool/snark_pool_reducer.rs b/node/src/snark_pool/snark_pool_reducer.rs index ad0cd19057..6ef294a0ab 100644 --- a/node/src/snark_pool/snark_pool_reducer.rs +++ b/node/src/snark_pool/snark_pool_reducer.rs @@ -1,12 +1,11 @@ use std::collections::BTreeMap; +use crate::{snark_pool::JobCommitment, ExternalSnarkWorkerAction, SnarkerStrategy}; use openmina_core::snark::{SnarkJobCommitment, SnarkJobId}; use p2p::channels::{ snark::P2pChannelsSnarkAction, snark_job_commitment::P2pChannelsSnarkJobCommitmentAction, }; -use crate::{snark_pool::JobCommitment, ExternalSnarkWorkerAction, SnarkerStrategy}; - use super::{ JobState, SnarkPoolAction, SnarkPoolActionWithMetaRef, SnarkPoolEffectfulAction, SnarkPoolState, SnarkWork, @@ -202,8 +201,6 @@ impl SnarkPoolState { } } - // TODO: libp2p logic already broadcasts everything right now and doesn't - // wait for validation, thad needs to be fixed. See #952 dispatcher.push(P2pChannelsSnarkAction::Libp2pBroadcast { snark: snark.clone(), nonce: 0, diff --git a/node/src/state.rs b/node/src/state.rs index 4c829a4d9d..a59784129f 100644 --- a/node/src/state.rs +++ b/node/src/state.rs @@ -4,7 +4,9 @@ use std::time::Duration; use malloc_size_of_derive::MallocSizeOf; use mina_p2p_messages::v2; use openmina_core::constants::PROTOCOL_VERSION; -use openmina_core::transaction::{TransactionInfo, TransactionWithHash}; +use openmina_core::transaction::{ + TransactionInfo, TransactionPoolMessageSource, TransactionWithHash, +}; use p2p::P2pNetworkPubsubMessageCacheId; use rand::prelude::*; @@ -539,10 +541,10 @@ impl P2p { } )), on_p2p_channels_transaction_libp2p_received: Some(redux::callback!( - on_p2p_channels_transaction_libp2p_received(transaction: Box) -> crate::Action { + on_p2p_channels_transaction_libp2p_received((transactions: Vec, id: P2pNetworkPubsubMessageCacheId)) -> crate::Action { TransactionPoolAction::StartVerify { - commands: std::iter::once(*transaction).collect(), - from_rpc: None + commands: transactions.into_iter().collect(), + from_source: TransactionPoolMessageSource::pubsub(id), } } )), diff --git a/node/src/transaction_pool/candidate/transaction_pool_candidate_reducer.rs b/node/src/transaction_pool/candidate/transaction_pool_candidate_reducer.rs index 14b589fef6..76e97aa978 100644 --- a/node/src/transaction_pool/candidate/transaction_pool_candidate_reducer.rs +++ b/node/src/transaction_pool/candidate/transaction_pool_candidate_reducer.rs @@ -1,6 +1,7 @@ #![allow(clippy::unit_arg)] use crate::{p2p_ready, TransactionPoolAction}; +use openmina_core::transaction::TransactionPoolMessageSource; use p2p::{ channels::rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcRequest}, PeerId, @@ -103,7 +104,7 @@ impl TransactionPoolCandidatesState { let transaction_hashes = batch.iter().map(|tx| tx.hash().clone()).collect(); dispatcher.push(TransactionPoolAction::StartVerify { commands: batch.into_iter().collect(), - from_rpc: None, + from_source: TransactionPoolMessageSource::None, }); dispatcher.push(TransactionPoolCandidateAction::VerifyPending { peer_id, diff --git a/node/src/transaction_pool/transaction_pool_actions.rs b/node/src/transaction_pool/transaction_pool_actions.rs index f271feff89..181d9b2217 100644 --- a/node/src/transaction_pool/transaction_pool_actions.rs +++ b/node/src/transaction_pool/transaction_pool_actions.rs @@ -8,8 +8,14 @@ use ledger::{ }, Account, AccountId, }; -use mina_p2p_messages::{list::List, v2}; -use openmina_core::{requests::RpcId, transaction::TransactionWithHash, ActionEvent}; +use mina_p2p_messages::{ + list::List, + v2::{self}, +}; +use openmina_core::{ + transaction::{TransactionPoolMessageSource, TransactionWithHash}, + ActionEvent, +}; use redux::Callback; use serde::{Deserialize, Serialize}; @@ -24,16 +30,16 @@ pub enum TransactionPoolAction { Candidate(TransactionPoolCandidateAction), StartVerify { commands: List, - from_rpc: Option, + from_source: TransactionPoolMessageSource, }, StartVerifyWithAccounts { accounts: BTreeMap, pending_id: PendingId, - from_rpc: Option, + from_source: TransactionPoolMessageSource, }, VerifySuccess { valids: Vec, - from_rpc: Option, + from_source: TransactionPoolMessageSource, }, #[action_event(level = warn, fields(debug(errors)))] VerifyError { @@ -48,9 +54,7 @@ pub enum TransactionPoolAction { ApplyVerifiedDiff { best_tip_hash: v2::LedgerHash, diff: DiffVerified, - /// Diff was crearted locally, or from remote peer ? - is_sender_local: bool, - from_rpc: Option, + from_source: TransactionPoolMessageSource, }, ApplyVerifiedDiffWithAccounts { accounts: BTreeMap, @@ -127,7 +131,7 @@ impl redux::EnablingCondition for TransactionPoolAction { type TransactionPoolEffectfulActionCallback = Callback<( BTreeMap, Option, - Option, + TransactionPoolMessageSource, )>; #[derive(Serialize, Deserialize, Debug, Clone)] @@ -137,7 +141,7 @@ pub enum TransactionPoolEffectfulAction { ledger_hash: v2::LedgerHash, on_result: TransactionPoolEffectfulActionCallback, pending_id: Option, - from_rpc: Option, + from_source: TransactionPoolMessageSource, }, } diff --git a/node/src/transaction_pool/transaction_pool_effects.rs b/node/src/transaction_pool/transaction_pool_effects.rs index f508b140b6..56ec52e4e3 100644 --- a/node/src/transaction_pool/transaction_pool_effects.rs +++ b/node/src/transaction_pool/transaction_pool_effects.rs @@ -17,7 +17,7 @@ impl TransactionPoolEffectfulAction { ledger_hash, on_result, pending_id, - from_rpc, + from_source, } => { openmina_core::log::info!( openmina_core::log::system_time(); @@ -49,7 +49,7 @@ impl TransactionPoolEffectfulAction { .map(|account| (account.id(), account)) .collect::>(); - store.dispatch_callback(on_result, (accounts, pending_id, from_rpc)); + store.dispatch_callback(on_result, (accounts, pending_id, from_source)); } } } diff --git a/node/src/transaction_pool/transaction_pool_reducer.rs b/node/src/transaction_pool/transaction_pool_reducer.rs index c1b4a4e1c4..50f37b213c 100644 --- a/node/src/transaction_pool/transaction_pool_reducer.rs +++ b/node/src/transaction_pool/transaction_pool_reducer.rs @@ -9,10 +9,11 @@ use ledger::{ use openmina_core::{ bug_condition, constants::constraint_constants, - requests::RpcId, - transaction::{Transaction, TransactionWithHash}, + transaction::{Transaction, TransactionPoolMessageSource, TransactionWithHash}, +}; +use p2p::{ + channels::transaction::P2pChannelsTransactionAction, BroadcastMessageId, P2pNetworkPubsubAction, }; -use p2p::channels::transaction::P2pChannelsTransactionAction; use redux::callback; use snark::user_command_verify::{SnarkUserCommandVerifyAction, SnarkUserCommandVerifyId}; use std::collections::{BTreeMap, BTreeSet}; @@ -57,7 +58,10 @@ impl TransactionPoolState { meta.with_action(a), ); } - TransactionPoolAction::StartVerify { commands, from_rpc } => { + TransactionPoolAction::StartVerify { + commands, + from_source, + } => { let Ok(commands) = commands .iter() .map(TransactionWithHash::body) @@ -79,17 +83,17 @@ impl TransactionPoolState { dispatcher.push(TransactionPoolEffectfulAction::FetchAccounts { account_ids, ledger_hash: best_tip_hash.clone(), - on_result: callback!(fetch_to_verify((accounts: BTreeMap, id: Option, from_rpc: Option)) -> crate::Action { - TransactionPoolAction::StartVerifyWithAccounts { accounts, pending_id: id.unwrap(), from_rpc } + on_result: callback!(fetch_to_verify((accounts: BTreeMap, id: Option, from_source: TransactionPoolMessageSource)) -> crate::Action { + TransactionPoolAction::StartVerifyWithAccounts { accounts, pending_id: id.unwrap(), from_source } }), pending_id: Some(pending_id), - from_rpc: *from_rpc, + from_source: *from_source, }); } TransactionPoolAction::StartVerifyWithAccounts { accounts, pending_id, - from_rpc, + from_source, } => { let TransactionPoolAction::StartVerify { commands, .. } = substate.pending_actions.remove(pending_id).unwrap() @@ -120,14 +124,14 @@ impl TransactionPoolState { dispatcher.push(SnarkUserCommandVerifyAction::Init { req_id, commands: verifiable, - from_rpc: *from_rpc, + from_source: *from_source, on_success: callback!( on_snark_user_command_verify_success( - (req_id: SnarkUserCommandVerifyId, valids: Vec, from_rpc: Option) + (req_id: SnarkUserCommandVerifyId, valids: Vec, from_source: TransactionPoolMessageSource) ) -> crate::Action { TransactionPoolAction::VerifySuccess { valids, - from_rpc, + from_source, } } ), @@ -135,9 +139,7 @@ impl TransactionPoolState { on_snark_user_command_verify_error( (req_id: SnarkUserCommandVerifyId, errors: Vec) ) -> crate::Action { - TransactionPoolAction::VerifyError { - errors - } + TransactionPoolAction::VerifyError { errors } } ) }); @@ -148,11 +150,24 @@ impl TransactionPoolState { dispatcher.push(TransactionPoolAction::VerifyError { errors: errors.clone(), }); - if let Some(rpc_id) = from_rpc { - dispatcher.push(RpcAction::TransactionInjectFailure { - rpc_id: *rpc_id, - errors, - }) + + match from_source { + TransactionPoolMessageSource::Rpc { id } => { + dispatcher.push(RpcAction::TransactionInjectFailure { + rpc_id: *id, + errors, + }); + } + TransactionPoolMessageSource::Pubsub { id } => { + dispatcher.push(P2pNetworkPubsubAction::RejectMessage { + message_id: Some(BroadcastMessageId::MessageId { + message_id: *id, + }), + peer_id: None, + reason: "Transaction diff rejected".to_owned(), + }) + } + TransactionPoolMessageSource::None => {} } }; match e { @@ -169,7 +184,10 @@ impl TransactionPoolState { } } } - TransactionPoolAction::VerifySuccess { valids, from_rpc } => { + TransactionPoolAction::VerifySuccess { + valids, + from_source, + } => { let valids = valids .iter() .cloned() @@ -182,8 +200,7 @@ impl TransactionPoolState { dispatcher.push(TransactionPoolAction::ApplyVerifiedDiff { best_tip_hash, diff, - is_sender_local: from_rpc.is_some(), - from_rpc: *from_rpc, + from_source: *from_source, }); } TransactionPoolAction::VerifyError { .. } => { @@ -197,11 +214,11 @@ impl TransactionPoolState { dispatcher.push(TransactionPoolEffectfulAction::FetchAccounts { account_ids, ledger_hash: best_tip_hash.clone(), - on_result: callback!(fetch_for_best_tip((accounts: BTreeMap, id: Option, from_rpc: Option)) -> crate::Action { + on_result: callback!(fetch_for_best_tip((accounts: BTreeMap, id: Option, from_source: TransactionPoolMessageSource)) -> crate::Action { TransactionPoolAction::BestTipChangedWithAccounts { accounts } }), pending_id: None, - from_rpc: None, + from_source: TransactionPoolMessageSource::None, }); } TransactionPoolAction::BestTipChangedWithAccounts { accounts } => { @@ -220,8 +237,7 @@ impl TransactionPoolState { TransactionPoolAction::ApplyVerifiedDiff { best_tip_hash, diff, - is_sender_local: _, - from_rpc, + from_source, } => { let account_ids = substate.pool.get_accounts_to_apply_diff(diff); let pending_id = substate.make_action_pending(action); @@ -230,14 +246,14 @@ impl TransactionPoolState { dispatcher.push(TransactionPoolEffectfulAction::FetchAccounts { account_ids, ledger_hash: best_tip_hash.clone(), - on_result: callback!(fetch_for_apply((accounts: BTreeMap, id: Option, from_rpc: Option)) -> crate::Action { + on_result: callback!(fetch_for_apply((accounts: BTreeMap, id: Option, from_rpc: TransactionPoolMessageSource)) -> crate::Action { TransactionPoolAction::ApplyVerifiedDiffWithAccounts { accounts, pending_id: id.unwrap(), } }), pending_id: Some(pending_id), - from_rpc: *from_rpc, + from_source: *from_source, }); } TransactionPoolAction::ApplyVerifiedDiffWithAccounts { @@ -247,24 +263,22 @@ impl TransactionPoolState { let TransactionPoolAction::ApplyVerifiedDiff { best_tip_hash: _, diff, - is_sender_local, - from_rpc, + from_source, } = substate.pending_actions.remove(pending_id).unwrap() else { panic!() }; + let is_sender_local = from_source.is_sender_local(); // Note(adonagy): Action for rebroadcast, in his action we can use forget_check - let (rpc_action, was_accepted, accepted, rejected) = match substate - .pool - .unsafe_apply( - meta.time(), - global_slot_from_genesis, - global_slot, - &diff, - accounts, - is_sender_local, - ) { + let (was_accepted, accepted, rejected) = match substate.pool.unsafe_apply( + meta.time(), + global_slot_from_genesis, + global_slot, + &diff, + accounts, + is_sender_local, + ) { Ok((ApplyDecision::Accept, accepted, rejected, dropped)) => { for hash in dropped { substate.dpool.remove(&hash); @@ -275,20 +289,11 @@ impl TransactionPoolState { hash: tx.hash.clone(), }); } - let rpc_action = - from_rpc.map(|rpc_id| RpcAction::TransactionInjectSuccess { - rpc_id, - response: accepted.clone(), - }); - (rpc_action, true, accepted, rejected) + + (true, accepted, rejected) } Ok((ApplyDecision::Reject, accepted, rejected, _)) => { - let rpc_action = - from_rpc.map(|rpc_id| RpcAction::TransactionInjectRejected { - rpc_id, - response: rejected.clone(), - }); - (rpc_action, false, accepted, rejected) + (false, accepted, rejected) } Err(e) => { crate::core::warn!(meta.time(); kind = "TransactionPoolUnsafeApplyError", summary = e); @@ -297,12 +302,37 @@ impl TransactionPoolState { }; let dispatcher = state.into_dispatcher(); - if let Some(rpc_action) = rpc_action { - dispatcher.push(rpc_action); + + // TODO: use callbacks + match (was_accepted, from_source) { + (true, TransactionPoolMessageSource::Rpc { id }) => { + dispatcher.push(RpcAction::TransactionInjectSuccess { + rpc_id: id, + response: accepted.clone(), + }); + } + (true, TransactionPoolMessageSource::Pubsub { id }) => { + dispatcher.push(P2pNetworkPubsubAction::BroadcastValidatedMessage { + message_id: BroadcastMessageId::MessageId { message_id: id }, + }); + } + (false, TransactionPoolMessageSource::Rpc { id }) => { + dispatcher.push(RpcAction::TransactionInjectRejected { + rpc_id: id, + response: rejected.clone(), + }); + } + (false, TransactionPoolMessageSource::Pubsub { id }) => { + dispatcher.push(P2pNetworkPubsubAction::RejectMessage { + message_id: Some(BroadcastMessageId::MessageId { message_id: id }), + peer_id: None, + reason: "Rejected transaction diff".to_owned(), + }); + } + (_, TransactionPoolMessageSource::None) => {} } - // TODO: libp2p logic already broadcasts everything right now and doesn't - // wait for validation, thad needs to be fixed. See #952 - if was_accepted { + + if was_accepted && !from_source.is_libp2p() { dispatcher.push(TransactionPoolAction::Rebroadcast { accepted, rejected, @@ -324,14 +354,14 @@ impl TransactionPoolState { dispatcher.push(TransactionPoolEffectfulAction::FetchAccounts { account_ids: account_ids.union(&uncommitted).cloned().collect(), ledger_hash: best_tip_hash.clone(), - on_result: callback!(fetch_for_diff((accounts: BTreeMap, id: Option, from_rpc: Option)) -> crate::Action { + on_result: callback!(fetch_for_diff((accounts: BTreeMap, id: Option, from_source: TransactionPoolMessageSource)) -> crate::Action { TransactionPoolAction::ApplyTransitionFrontierDiffWithAccounts { accounts, pending_id: id.unwrap(), } }), pending_id: Some(pending_id), - from_rpc: None, + from_source: TransactionPoolMessageSource::None, }); } TransactionPoolAction::ApplyTransitionFrontierDiffWithAccounts { diff --git a/p2p/src/channels/snark/p2p_channels_snark_actions.rs b/p2p/src/channels/snark/p2p_channels_snark_actions.rs index dba737da18..8468d91402 100644 --- a/p2p/src/channels/snark/p2p_channels_snark_actions.rs +++ b/p2p/src/channels/snark/p2p_channels_snark_actions.rs @@ -51,6 +51,9 @@ pub enum P2pChannelsSnarkAction { snark: Box, nonce: u32, }, + /// Checks if a snark has already been received from pubsub network, ff it has, it broadcasts a validated message. + /// If not, it constructs a new message with the snark and broadcasts it to the network, + /// either directly or by rebroadcasting it if it was received from a WebRTC connection. Libp2pBroadcast { snark: Snark, nonce: u32, diff --git a/p2p/src/channels/snark/p2p_channels_snark_reducer.rs b/p2p/src/channels/snark/p2p_channels_snark_reducer.rs index bb6ab1e57b..fb31df3f99 100644 --- a/p2p/src/channels/snark/p2p_channels_snark_reducer.rs +++ b/p2p/src/channels/snark/p2p_channels_snark_reducer.rs @@ -218,7 +218,21 @@ impl P2pChannelsSnarkState { nonce, is_local, } => { - let dispatcher = state_context.into_dispatcher(); + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + let p2p_state: &P2pState = state.substate()?; + let pubsub_state = &p2p_state.network.scheduler.broadcast_state.mcache; + + let message_id = crate::BroadcastMessageId::Snark { + job_id: snark.job_id(), + }; + + if pubsub_state.contains_broadcast_id(&message_id) { + dispatcher + .push(P2pNetworkPubsubAction::BroadcastValidatedMessage { message_id }); + + return Ok(()); + }; + let message = Box::new((snark.statement(), (&snark).into())); let message = v2::NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(message); let nonce = nonce.into(); diff --git a/p2p/src/channels/transaction/p2p_channels_transaction_actions.rs b/p2p/src/channels/transaction/p2p_channels_transaction_actions.rs index fc75742ba8..38650150c7 100644 --- a/p2p/src/channels/transaction/p2p_channels_transaction_actions.rs +++ b/p2p/src/channels/transaction/p2p_channels_transaction_actions.rs @@ -1,5 +1,5 @@ -use openmina_core::transaction::Transaction; use openmina_core::ActionEvent; +use openmina_core::{p2p::P2pNetworkPubsubMessageCacheId, transaction::Transaction}; use serde::{Deserialize, Serialize}; use crate::{channels::P2pChannelsAction, P2pState, PeerId}; @@ -46,8 +46,9 @@ pub enum P2pChannelsTransactionAction { }, Libp2pReceived { peer_id: PeerId, - transaction: Box, + transactions: Vec, nonce: u32, + message_id: P2pNetworkPubsubMessageCacheId, }, Libp2pBroadcast { transaction: Box, diff --git a/p2p/src/channels/transaction/p2p_channels_transaction_reducer.rs b/p2p/src/channels/transaction/p2p_channels_transaction_reducer.rs index 9b9e483cde..c9c0a874aa 100644 --- a/p2p/src/channels/transaction/p2p_channels_transaction_reducer.rs +++ b/p2p/src/channels/transaction/p2p_channels_transaction_reducer.rs @@ -212,7 +212,11 @@ impl P2pChannelsTransactionState { } Ok(()) } - P2pChannelsTransactionAction::Libp2pReceived { transaction, .. } => { + P2pChannelsTransactionAction::Libp2pReceived { + transactions, + message_id, + .. + } => { let (dispatcher, state) = state_context.into_dispatcher_and_state(); let p2p_state: &P2pState = state.substate()?; @@ -220,9 +224,13 @@ impl P2pChannelsTransactionState { .callbacks .on_p2p_channels_transaction_libp2p_received { - if let Ok(transaction) = TransactionWithHash::try_new(*transaction) { - dispatcher.push_callback(callback.clone(), Box::new(transaction)); - } + let transactions = transactions + .into_iter() + .map(TransactionWithHash::try_new) + .filter_map(Result::ok) + .collect(); + + dispatcher.push_callback(callback.clone(), (transactions, message_id)); } Ok(()) diff --git a/p2p/src/network/pubsub/mod.rs b/p2p/src/network/pubsub/mod.rs index 79ab3d009f..a9ebc12b9c 100644 --- a/p2p/src/network/pubsub/mod.rs +++ b/p2p/src/network/pubsub/mod.rs @@ -1,14 +1,13 @@ pub mod pb { include!(concat!(env!("OUT_DIR"), "/gossipsub.rs")); } - +pub use openmina_core::p2p::P2pNetworkPubsubMessageCacheId; mod p2p_network_pubsub_actions; pub use self::p2p_network_pubsub_actions::P2pNetworkPubsubAction; mod p2p_network_pubsub_state; pub use self::p2p_network_pubsub_state::{ - P2pNetworkPubsubClientState, P2pNetworkPubsubClientTopicState, P2pNetworkPubsubMessageCacheId, - P2pNetworkPubsubState, + P2pNetworkPubsubClientState, P2pNetworkPubsubClientTopicState, P2pNetworkPubsubState, }; #[cfg(feature = "p2p-libp2p")] @@ -18,6 +17,7 @@ mod p2p_network_pubsub_reducer; const TOPIC: &str = "coda/consensus-messages/0.0.1"; pub mod pubsub_effectful; +use openmina_core::snark::SnarkJobId; pub use pubsub_effectful::P2pNetworkPubsubEffectfulAction; use binprot::BinProtWrite; @@ -32,6 +32,9 @@ pub enum BroadcastMessageId { BlockHash { hash: mina_p2p_messages::v2::StateHash, }, + Snark { + job_id: SnarkJobId, + }, MessageId { message_id: P2pNetworkPubsubMessageCacheId, }, diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs b/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs index 63ff30c9d4..71caedc43a 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs @@ -1,7 +1,7 @@ -use super::{p2p_network_pubsub_state::P2pNetworkPubsubMessageCacheId, pb, BroadcastMessageId}; +use super::{pb, BroadcastMessageId}; use crate::{token::BroadcastAlgorithm, ConnectionAddr, Data, P2pState, PeerId, StreamId}; use mina_p2p_messages::gossip::GossipNetMessageV2; -use openmina_core::ActionEvent; +use openmina_core::{p2p::P2pNetworkPubsubMessageCacheId, ActionEvent}; use serde::{Deserialize, Serialize}; /// Actions that can occur within the P2P Network PubSub system. diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs b/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs index 95b616f33a..5a43766672 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs @@ -5,7 +5,9 @@ use mina_p2p_messages::{ gossip::{self, GossipNetMessageV2}, v2::NetworkPoolSnarkPoolDiffVersionedStableV2, }; -use openmina_core::{block::BlockWithHash, bug_condition, fuzz_maybe, fuzzed_maybe, Substate}; +use openmina_core::{ + block::BlockWithHash, bug_condition, fuzz_maybe, fuzzed_maybe, snark::Snark, Substate, +}; use redux::{Dispatcher, Timestamp}; use crate::{ @@ -421,20 +423,10 @@ impl P2pNetworkPubsubState { Ok(sig) => Some(sig.to_bytes().to_vec()), }; - let message_state = match &message { - GossipNetMessageV2::NewState(block) => { - P2pNetworkPubsubMessageCacheMessage::PreValidatedBlockMessage { - block_hash: block.try_hash()?, - message: msg, - peer_id: source_peer_id, - time, - } - } - _ => P2pNetworkPubsubMessageCacheMessage::PreValidated { - message: msg, - peer_id: source_peer_id, - time, - }, + let message_state = P2pNetworkPubsubMessageCacheMessage::Validated { + message: msg, + peer_id: source_peer_id, + time, }; pubsub_state.mcache.map.insert(message_id, message_state); @@ -555,6 +547,19 @@ impl P2pNetworkPubsubState { time, } } + GossipNetMessageV2::SnarkPoolDiff { + message: NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(snark), + .. + } => { + let snark: Snark = snark.1.clone().into(); + let job_id = snark.job_id(); + P2pNetworkPubsubMessageCacheMessage::PreValidatedSnark { + job_id, + message, + peer_id, + time, + } + } _ => P2pNetworkPubsubMessageCacheMessage::PreValidated { message, peer_id, @@ -568,59 +573,55 @@ impl P2pNetworkPubsubState { let dispatcher = state_context.into_dispatcher(); + // TODO: for transaction proof we track source, we should do that for `BestTipUpdate` and for `SnarkPoolDiff` match content { GossipNetMessageV2::NewState(block) => { let best_tip = BlockWithHash::try_new(block.clone())?; dispatcher.push(P2pPeerAction::BestTipUpdate { peer_id, best_tip }); - return Ok(()); } GossipNetMessageV2::TransactionPoolDiff { message, nonce } => { let nonce = nonce.as_u32(); - for transaction in message.0 { - dispatcher.push(P2pChannelsTransactionAction::Libp2pReceived { - peer_id, - transaction: Box::new(transaction), - nonce, - }); - } + dispatcher.push(P2pChannelsTransactionAction::Libp2pReceived { + peer_id, + transactions: message.0.into_iter().collect(), + nonce, + message_id, + }); } GossipNetMessageV2::SnarkPoolDiff { message: NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(work), nonce, } => { + let snark: Snark = work.1.into(); dispatcher.push(P2pChannelsSnarkAction::Libp2pReceived { peer_id, - snark: Box::new(work.1.into()), + snark: Box::new(snark), nonce: nonce.as_u32(), }); } _ => {} } - - dispatcher.push(P2pNetworkPubsubAction::BroadcastValidatedMessage { - message_id: super::BroadcastMessageId::MessageId { message_id }, - }); Ok(()) } P2pNetworkPubsubAction::BroadcastValidatedMessage { message_id } => { - let Some((mcache_message_id, message)) = + let Some((message_id, _)) = pubsub_state.mcache.get_message_id_and_message(&message_id) else { bug_condition!("Message with id: {:?} not found", message_id); return Ok(()); }; + + let Some(message) = pubsub_state.mcache.map.get(&message_id) else { + bug_condition!("Message with id: {:?} not found", message_id); + return Ok(()); + }; + let raw_message = message.message().clone(); - let peer_id = message.peer_id(); + let peer_id = *message.peer_id(); - pubsub_state.reduce_incoming_validated_message( - mcache_message_id, - peer_id, - &raw_message, - ); + pubsub_state.reduce_incoming_validated_message(message_id, peer_id, &raw_message); - let Some((_message_id, message)) = - pubsub_state.mcache.get_message_id_and_message(&message_id) - else { + let Some(message) = pubsub_state.mcache.map.get_mut(&message_id) else { bug_condition!("Message with id: {:?} not found", message_id); return Ok(()); }; @@ -628,11 +629,10 @@ impl P2pNetworkPubsubState { *message = P2pNetworkPubsubMessageCacheMessage::Validated { message: raw_message, peer_id, - time: message.time(), + time: *message.time(), }; let (dispatcher, state) = state_context.into_dispatcher_and_state(); - Self::broadcast(dispatcher, state) } P2pNetworkPubsubAction::PruneMessages {} => { @@ -641,7 +641,7 @@ impl P2pNetworkPubsubState { .map .iter() .filter_map(|(message_id, message)| { - if message.time() + MAX_MESSAGE_KEEP_DURATION > time { + if (*message.time() + MAX_MESSAGE_KEEP_DURATION) <= time { Some(message_id.to_owned()) } else { None @@ -659,25 +659,28 @@ impl P2pNetworkPubsubState { peer_id, .. } => { - let mut peer_id = peer_id; + let mut involved_peers = peer_id.into_iter().collect::>(); + let mut add_peer = |peer: &PeerId| { + if !involved_peers.contains(peer) { + involved_peers.push(*peer); + } + }; + if let Some(message_id) = message_id { - let Some((_message_id, message)) = + let Some((message_id, message)) = pubsub_state.mcache.get_message_id_and_message(&message_id) else { bug_condition!("Message not found for id: {:?}", message_id); return Ok(()); }; - if peer_id.is_none() { - peer_id = Some(message.peer_id()); - } - - pubsub_state.mcache.remove_message(_message_id); + add_peer(message.peer_id()); + pubsub_state.mcache.remove_message(message_id); } let dispatcher = state_context.into_dispatcher(); - if let Some(peer_id) = peer_id { + for peer_id in involved_peers { dispatcher.push(P2pDisconnectionAction::Init { peer_id, reason: P2pDisconnectionReason::InvalidMessage, @@ -715,6 +718,10 @@ impl P2pNetworkPubsubState { Ok(()) } + /// Queues a validated message for propagation to other peers in the pubsub network. + /// For peers that are "on mesh" for the message's topic, queues the full message. + /// For other peers, queues an IHAVE control message to notify about message availability. + /// The original sender is excluded from propagation. fn reduce_incoming_validated_message( &mut self, message_id: P2pNetworkPubsubMessageCacheId, diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_state.rs b/p2p/src/network/pubsub/p2p_network_pubsub_state.rs index e09f9dfda6..8f5eecaa5d 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_state.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_state.rs @@ -3,7 +3,11 @@ use crate::{token::BroadcastAlgorithm, ConnectionAddr, PeerId, StreamId}; use libp2p_identity::ParseError; use mina_p2p_messages::gossip::GossipNetMessageV2; -use openmina_core::{snark::Snark, transaction::Transaction}; +use openmina_core::{ + p2p::P2pNetworkPubsubMessageCacheId, + snark::{Snark, SnarkJobId}, + transaction::Transaction, +}; use redux::Timestamp; use serde::{Deserialize, Serialize}; use std::{ @@ -175,7 +179,7 @@ pub struct P2pNetworkPubsubClientState { impl P2pNetworkPubsubClientState { pub fn publish(&mut self, message: &pb::Message) { - let Ok(id) = P2pNetworkPubsubMessageCacheId::compute_message_id(message) else { + let Ok(id) = compute_message_id(message) else { self.message.publish.push(message.clone()); return; }; @@ -228,7 +232,12 @@ pub enum P2pNetworkPubsubMessageCacheMessage { peer_id: PeerId, time: Timestamp, }, - // This is temporary handling for transactions and snark pool + PreValidatedSnark { + job_id: SnarkJobId, + message: pb::Message, + peer_id: PeerId, + time: Timestamp, + }, PreValidated { message: pb::Message, peer_id: PeerId, @@ -241,62 +250,41 @@ pub enum P2pNetworkPubsubMessageCacheMessage { }, } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy)] -pub struct P2pNetworkPubsubMessageCacheId { - pub source: libp2p_identity::PeerId, - pub seqno: u64, -} - -impl P2pNetworkPubsubMessageCacheId { - // TODO: what if wasm32? - // How to test it? - pub fn compute_message_id( - message: &pb::Message, - ) -> Result { - let source = source_from_message(message)?; +// TODO: what if wasm32? +// How to test it? +pub fn compute_message_id( + message: &pb::Message, +) -> Result { + let source = source_from_message(message)?; - let seqno = message - .seqno - .as_ref() - .and_then(|b| <[u8; 8]>::try_from(b.as_slice()).ok()) - .map(u64::from_be_bytes) - .unwrap_or_default(); + let seqno = message + .seqno + .as_ref() + .and_then(|b| <[u8; 8]>::try_from(b.as_slice()).ok()) + .map(u64::from_be_bytes) + .unwrap_or_default(); - Ok(P2pNetworkPubsubMessageCacheId { source, seqno }) - } + Ok(P2pNetworkPubsubMessageCacheId { source, seqno }) +} - pub fn to_raw_bytes(&self) -> Vec { - let mut message_id = self.source.to_base58(); - message_id.push_str(&self.seqno.to_string()); - message_id.into_bytes() - } +macro_rules! enum_field { + ($field:ident: $field_type:ty) => { + pub fn $field(&self) -> &$field_type { + match self { + Self::Init { $field, .. } + | Self::PreValidated { $field, .. } + | Self::PreValidatedBlockMessage { $field, .. } + | Self::PreValidatedSnark { $field, .. } + | Self::Validated { $field, .. } => $field, + } + } + }; } impl P2pNetworkPubsubMessageCacheMessage { - pub fn message(&self) -> &pb::Message { - match self { - Self::Init { message, .. } => message, - Self::PreValidated { message, .. } => message, - Self::PreValidatedBlockMessage { message, .. } => message, - Self::Validated { message, .. } => message, - } - } - pub fn time(&self) -> Timestamp { - *match self { - Self::Init { time, .. } => time, - Self::PreValidated { time, .. } => time, - Self::PreValidatedBlockMessage { time, .. } => time, - Self::Validated { time, .. } => time, - } - } - pub fn peer_id(&self) -> PeerId { - *match self { - Self::Init { peer_id, .. } => peer_id, - Self::PreValidated { peer_id, .. } => peer_id, - Self::PreValidatedBlockMessage { peer_id, .. } => peer_id, - Self::Validated { peer_id, .. } => peer_id, - } - } + enum_field!(message: pb::Message); + enum_field!(time: Timestamp); + enum_field!(peer_id: PeerId); } impl P2pNetworkPubsubMessageCache { @@ -309,7 +297,7 @@ impl P2pNetworkPubsubMessageCache { peer_id: PeerId, time: Timestamp, ) -> Result { - let id = P2pNetworkPubsubMessageCacheId::compute_message_id(&message)?; + let id = compute_message_id(&message)?; self.map.insert( id, P2pNetworkPubsubMessageCacheMessage::Init { @@ -339,13 +327,19 @@ impl P2pNetworkPubsubMessageCache { pub fn contains_broadcast_id(&self, message_id: &BroadcastMessageId) -> bool { match message_id { - super::BroadcastMessageId::BlockHash { hash } => self + BroadcastMessageId::BlockHash { hash } => self .map .values() .any(|message| matches!(message, P2pNetworkPubsubMessageCacheMessage::PreValidatedBlockMessage { block_hash, .. } if block_hash == hash)), - super::BroadcastMessageId::MessageId { message_id } => { + BroadcastMessageId::MessageId { message_id } => { self.map.contains_key(message_id) - } + }, + BroadcastMessageId::Snark { job_id: snark_job_id } => { + self + .map + .values() + .any(|message| matches!(message, P2pNetworkPubsubMessageCacheMessage::PreValidatedSnark { job_id,.. } if job_id == snark_job_id)) + }, } } @@ -357,7 +351,7 @@ impl P2pNetworkPubsubMessageCache { &mut P2pNetworkPubsubMessageCacheMessage, )> { match message_id { - super::BroadcastMessageId::BlockHash { hash } => { + BroadcastMessageId::BlockHash { hash } => { self.map .iter_mut() .find_map(|(message_id, message)| match message { @@ -368,18 +362,34 @@ impl P2pNetworkPubsubMessageCache { _ => None, }) } - super::BroadcastMessageId::MessageId { message_id } => self + BroadcastMessageId::MessageId { message_id } => self .map .get_mut(message_id) .map(|content| (*message_id, content)), + BroadcastMessageId::Snark { + job_id: snark_job_id, + } => { + self.map + .iter_mut() + .find_map(|(message_id, message)| match message { + P2pNetworkPubsubMessageCacheMessage::PreValidatedSnark { + job_id, .. + } if job_id == snark_job_id => Some((*message_id, message)), + _ => None, + }) + } } } - pub fn remove_message(&mut self, message_id: P2pNetworkPubsubMessageCacheId) { - let _ = self.map.remove(&message_id); + pub fn remove_message( + &mut self, + message_id: P2pNetworkPubsubMessageCacheId, + ) -> Option { + let message = self.map.remove(&message_id); if let Some(position) = self.queue.iter().position(|id| id == &message_id) { self.queue.remove(position); } + message } pub fn get_message_from_raw_message_id( diff --git a/p2p/src/p2p_state.rs b/p2p/src/p2p_state.rs index c1f57f4aa9..64bf872b5f 100644 --- a/p2p/src/p2p_state.rs +++ b/p2p/src/p2p_state.rs @@ -518,7 +518,8 @@ pub struct P2pCallbacks { /// Callback for [`P2pChannelsTransactionAction::Received`] pub on_p2p_channels_transaction_received: OptionalCallback<(PeerId, Box)>, /// Callback for [`P2pChannelsTransactionAction::Libp2pReceived`] - pub on_p2p_channels_transaction_libp2p_received: OptionalCallback>, + pub on_p2p_channels_transaction_libp2p_received: + OptionalCallback<(Vec, P2pNetworkPubsubMessageCacheId)>, /// Callback for [`P2pChannelsSnarkJobCommitmentAction::Received`] pub on_p2p_channels_snark_job_commitment_received: OptionalCallback<(PeerId, Box)>, diff --git a/snark/src/user_command_verify/snark_user_command_verify_actions.rs b/snark/src/user_command_verify/snark_user_command_verify_actions.rs index 54f936b907..294ebda6e0 100644 --- a/snark/src/user_command_verify/snark_user_command_verify_actions.rs +++ b/snark/src/user_command_verify/snark_user_command_verify_actions.rs @@ -2,7 +2,7 @@ use ledger::scan_state::transaction_logic::{valid, verifiable, WithStatus}; use redux::Callback; use serde::{Deserialize, Serialize}; -use openmina_core::{requests::RpcId, ActionEvent}; +use openmina_core::{transaction::TransactionPoolMessageSource, ActionEvent}; use super::{SnarkUserCommandVerifyError, SnarkUserCommandVerifyId}; @@ -14,7 +14,7 @@ pub type SnarkUserCommandVerifyActionWithMetaRef<'a> = pub(super) type OnSuccess = Callback<( SnarkUserCommandVerifyId, Vec, - Option, + TransactionPoolMessageSource, )>; #[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)] @@ -24,7 +24,7 @@ pub enum SnarkUserCommandVerifyAction { Init { req_id: SnarkUserCommandVerifyId, commands: Vec>, - from_rpc: Option, + from_source: TransactionPoolMessageSource, on_success: OnSuccess, on_error: Callback<(SnarkUserCommandVerifyId, Vec)>, }, diff --git a/snark/src/user_command_verify/snark_user_command_verify_reducer.rs b/snark/src/user_command_verify/snark_user_command_verify_reducer.rs index 3e4803087e..182fc3c609 100644 --- a/snark/src/user_command_verify/snark_user_command_verify_reducer.rs +++ b/snark/src/user_command_verify/snark_user_command_verify_reducer.rs @@ -23,7 +23,7 @@ pub fn reducer( SnarkUserCommandVerifyAction::Init { commands, req_id, - from_rpc, + from_source, on_success, on_error, } => { @@ -32,7 +32,7 @@ pub fn reducer( substate.jobs.add(SnarkUserCommandVerifyStatus::Init { time: meta.time(), commands: commands.clone(), - from_rpc: *from_rpc, + from_source: *from_source, on_success: on_success.clone(), on_error: on_error.clone(), }); @@ -54,7 +54,7 @@ pub fn reducer( }; let SnarkUserCommandVerifyStatus::Init { commands, - from_rpc, + from_source, on_success, on_error, .. @@ -67,7 +67,7 @@ pub fn reducer( *req = SnarkUserCommandVerifyStatus::Pending { time: meta.time(), commands: std::mem::take(commands), - from_rpc: std::mem::take(from_rpc), + from_source: std::mem::take(from_source), on_success: on_success.clone(), on_error: on_error.clone(), }; @@ -102,7 +102,7 @@ pub fn reducer( return; }; let SnarkUserCommandVerifyStatus::Pending { - from_rpc, + from_source, on_success, .. } = req @@ -111,7 +111,7 @@ pub fn reducer( return; }; - let from_rpc = std::mem::take(from_rpc); + let from_source = std::mem::take(from_source); let commands: Vec = commands.clone(); let on_success = on_success.clone(); @@ -123,7 +123,7 @@ pub fn reducer( // Dispatch let dispatcher = state.into_dispatcher(); - dispatcher.push_callback(on_success, (*req_id, commands, from_rpc)); + dispatcher.push_callback(on_success, (*req_id, commands, from_source)); dispatcher.push(SnarkUserCommandVerifyAction::Finish { req_id: *req_id }); } SnarkUserCommandVerifyAction::Finish { req_id } => { diff --git a/snark/src/user_command_verify/snark_user_command_verify_state.rs b/snark/src/user_command_verify/snark_user_command_verify_state.rs index a97486adb6..eef2697e49 100644 --- a/snark/src/user_command_verify/snark_user_command_verify_state.rs +++ b/snark/src/user_command_verify/snark_user_command_verify_state.rs @@ -4,7 +4,7 @@ use ledger::scan_state::transaction_logic::{valid, verifiable, WithStatus}; use redux::Callback; use serde::{Deserialize, Serialize}; -use openmina_core::requests::{PendingRequests, RpcId}; +use openmina_core::{requests::PendingRequests, transaction::TransactionPoolMessageSource}; use crate::{TransactionVerifier, VerifierSRS}; @@ -47,14 +47,14 @@ pub enum SnarkUserCommandVerifyStatus { Init { time: redux::Timestamp, commands: Vec>, - from_rpc: Option, + from_source: TransactionPoolMessageSource, on_success: super::OnSuccess, on_error: Callback<(SnarkUserCommandVerifyId, Vec)>, }, Pending { time: redux::Timestamp, commands: Vec>, - from_rpc: Option, + from_source: TransactionPoolMessageSource, on_success: super::OnSuccess, on_error: Callback<(SnarkUserCommandVerifyId, Vec)>, }, diff --git a/snark/src/work_verify/snark_work_verify_actions.rs b/snark/src/work_verify/snark_work_verify_actions.rs index 287f835771..286cdaccab 100644 --- a/snark/src/work_verify/snark_work_verify_actions.rs +++ b/snark/src/work_verify/snark_work_verify_actions.rs @@ -1,4 +1,4 @@ -use openmina_core::SubstateAccess; +use openmina_core::{snark::SnarkJobId, SubstateAccess}; use serde::{Deserialize, Serialize}; use openmina_core::{snark::Snark, ActionEvent}; @@ -17,7 +17,7 @@ pub enum SnarkWorkVerifyAction { batch: Vec, sender: String, on_success: redux::Callback<(SnarkWorkVerifyId, String, Vec)>, - on_error: redux::Callback<(SnarkWorkVerifyId, String)>, + on_error: redux::Callback<(SnarkWorkVerifyId, String, Vec)>, }, Pending { req_id: SnarkWorkVerifyId, diff --git a/snark/src/work_verify/snark_work_verify_reducer.rs b/snark/src/work_verify/snark_work_verify_reducer.rs index e55d78acd6..ec4d4bab89 100644 --- a/snark/src/work_verify/snark_work_verify_reducer.rs +++ b/snark/src/work_verify/snark_work_verify_reducer.rs @@ -1,4 +1,4 @@ -use openmina_core::{bug_condition, Substate, SubstateAccess}; +use openmina_core::{bug_condition, snark::Snark, Substate, SubstateAccess}; use redux::EnablingCondition; use crate::work_verify_effectful::SnarkWorkVerifyEffectfulAction; @@ -94,16 +94,17 @@ pub fn reducer( }; let callback = on_error.clone(); let sender = std::mem::take(sender); - + let batch = std::mem::take(batch); + let job_ids = batch.iter().map(Snark::job_id).collect(); *req = SnarkWorkVerifyStatus::Error { time: meta.time(), - batch: std::mem::take(batch), + batch, sender: sender.clone(), error: error.clone(), }; // Dispatch let dispatcher = state_context.into_dispatcher(); - dispatcher.push_callback(callback, (*req_id, sender)); + dispatcher.push_callback(callback, (*req_id, sender, job_ids)); dispatcher.push(SnarkWorkVerifyAction::Finish { req_id: *req_id }); } SnarkWorkVerifyAction::Success { req_id } => { diff --git a/snark/src/work_verify/snark_work_verify_state.rs b/snark/src/work_verify/snark_work_verify_state.rs index 048ca41da9..cdd7f7eb1c 100644 --- a/snark/src/work_verify/snark_work_verify_state.rs +++ b/snark/src/work_verify/snark_work_verify_state.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; -use openmina_core::requests::PendingRequests; use openmina_core::snark::Snark; +use openmina_core::{requests::PendingRequests, snark::SnarkJobId}; use crate::{TransactionVerifier, VerifierSRS}; @@ -50,14 +50,14 @@ pub enum SnarkWorkVerifyStatus { // `PeerId` here. sender: String, on_success: redux::Callback<(SnarkWorkVerifyId, String, Vec)>, - on_error: redux::Callback<(SnarkWorkVerifyId, String)>, + on_error: redux::Callback<(SnarkWorkVerifyId, String, Vec)>, }, Pending { time: redux::Timestamp, batch: Vec, sender: String, on_success: redux::Callback<(SnarkWorkVerifyId, String, Vec)>, - on_error: redux::Callback<(SnarkWorkVerifyId, String)>, + on_error: redux::Callback<(SnarkWorkVerifyId, String, Vec)>, }, Error { time: redux::Timestamp,