Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ mina-p2p-messages = { workspace = true }
poseidon = { workspace = true }
hex = "0.4.3"
ark-ff = { workspace = true }
libp2p-identity = { version = "=0.2.7", features = [
"serde",
"peerid"
] }

[target.'cfg(not(target_family = "wasm"))'.dependencies]
redux = { workspace = true, features = ["serializable_callbacks"] }
Expand Down
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod constants;
pub mod dummy;

pub mod block;
pub mod p2p;
pub mod snark;
pub mod transaction;

Expand Down
16 changes: 16 additions & 0 deletions core/src/p2p.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/// TODO: These types and methods should be moved to `p2p` crate, they are here because they are used in `snark` crates callbacks
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy)]
pub struct P2pNetworkPubsubMessageCacheId {
pub source: libp2p_identity::PeerId,
pub seqno: u64,
}

impl P2pNetworkPubsubMessageCacheId {
pub fn to_raw_bytes(&self) -> Vec<u8> {
let mut message_id = self.source.to_base58();
message_id.push_str(&self.seqno.to_string());
message_id.into_bytes()
}
}
33 changes: 33 additions & 0 deletions core/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,36 @@ mod transaction_with_hash;
pub use transaction_with_hash::*;

pub use mina_p2p_messages::v2::{MinaBaseUserCommandStableV2 as Transaction, TransactionHash};

use crate::{p2p::P2pNetworkPubsubMessageCacheId, requests::RpcId};

/// TODO: Types and methods bellow, should be moved to `node` crate, they are here because they are used in `snark` crates callbacks
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, Default)]
pub enum TransactionPoolMessageSource {
Rpc {
id: RpcId,
},
Pubsub {
id: P2pNetworkPubsubMessageCacheId,
},
#[default]
None,
}

impl TransactionPoolMessageSource {
pub fn rpc(id: RpcId) -> Self {
Self::Rpc { id }
}

pub fn pubsub(id: P2pNetworkPubsubMessageCacheId) -> Self {
Self::Pubsub { id }
}

pub fn is_sender_local(&self) -> bool {
matches!(self, Self::Rpc { .. })
}

pub fn is_libp2p(&self) -> bool {
matches!(self, Self::Pubsub { .. })
}
}
4 changes: 3 additions & 1 deletion node/src/action_kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ pub enum ActionKind {
P2pNetworkPnetEffectfulOutgoingData,
P2pNetworkPnetEffectfulSetupNonce,
P2pNetworkPubsubBroadcast,
P2pNetworkPubsubBroadcastMessage,
P2pNetworkPubsubBroadcastSigned,
P2pNetworkPubsubBroadcastValidatedMessage,
P2pNetworkPubsubGraft,
Expand Down Expand Up @@ -719,7 +720,7 @@ pub enum ActionKind {
}

impl ActionKind {
pub const COUNT: u16 = 609;
pub const COUNT: u16 = 610;
}

impl std::fmt::Display for ActionKind {
Expand Down Expand Up @@ -2003,6 +2004,7 @@ impl ActionKindGet for P2pNetworkPubsubAction {
Self::BroadcastValidatedMessage { .. } => {
ActionKind::P2pNetworkPubsubBroadcastValidatedMessage
}
Self::BroadcastMessage { .. } => ActionKind::P2pNetworkPubsubBroadcastMessage,
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions node/src/rpc/rpc_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use openmina_core::{
block::AppliedBlock,
bug_condition,
requests::{RequestId, RpcId, RpcIdType},
transaction::TransactionWithHash,
transaction::{TransactionPoolMessageSource, TransactionWithHash},
};
use p2p::{
connection::{incoming::P2pConnectionIncomingAction, outgoing::P2pConnectionOutgoingAction},
Expand Down Expand Up @@ -515,7 +515,7 @@ impl RpcState {
dispatcher.push(RpcAction::TransactionInjectPending { rpc_id: *rpc_id });
dispatcher.push(TransactionPoolAction::StartVerify {
commands: commands_with_hash,
from_rpc: Some(*rpc_id),
from_source: TransactionPoolMessageSource::rpc(*rpc_id),
});
}
RpcAction::TransactionInjectPending { rpc_id } => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub enum SnarkPoolCandidateAction {
WorkVerifyError {
peer_id: PeerId,
verify_id: SnarkWorkVerifyId,
batch: Vec<SnarkJobId>,
},
WorkVerifySuccess {
peer_id: PeerId,
Expand Down
24 changes: 20 additions & 4 deletions node/src/snark_pool/candidate/snark_pool_candidate_reducer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::BTreeMap;

use crate::{p2p_ready, SnarkPoolAction};
use openmina_core::snark::Snark;
use openmina_core::snark::{Snark, SnarkJobId};
use p2p::{
channels::rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcRequest},
disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
PeerId,
BroadcastMessageId, P2pNetworkPubsubAction, PeerId,
};
use snark::{work_verify::SnarkWorkVerifyAction, work_verify_effectful::SnarkWorkVerifyId};

Expand Down Expand Up @@ -124,10 +124,11 @@ impl SnarkPoolCandidatesState {
}
}),
on_error: redux::callback!(
on_snark_pool_candidate_work_verify_error((req_id: SnarkWorkVerifyId, sender: String)) -> crate::Action {
on_snark_pool_candidate_work_verify_error((req_id: SnarkWorkVerifyId, sender: String, batch: Vec<SnarkJobId>)) -> crate::Action {
SnarkPoolCandidateAction::WorkVerifyError {
peer_id: sender.parse().unwrap(),
verify_id: req_id,
batch
}
}),
});
Expand All @@ -144,7 +145,11 @@ impl SnarkPoolCandidatesState {
} => {
state.verify_pending(meta.time(), peer_id, *verify_id, job_ids);
}
SnarkPoolCandidateAction::WorkVerifyError { peer_id, verify_id } => {
SnarkPoolCandidateAction::WorkVerifyError {
peer_id,
verify_id,
batch,
} => {
state.verify_result(meta.time(), peer_id, *verify_id, Err(()));

// TODO(binier): blacklist peer
Expand All @@ -154,6 +159,17 @@ impl SnarkPoolCandidatesState {
peer_id,
reason: P2pDisconnectionReason::SnarkPoolVerifyError,
});

// TODO: this is incorrect, only one snark in batch could be invalid but we would reject that whole batch
for snark_job_id in batch {
dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
message_id: Some(BroadcastMessageId::Snark {
job_id: snark_job_id.clone(),
}),
peer_id: None,
reason: "Snark work verification failed".to_string(),
});
}
}
SnarkPoolCandidateAction::WorkVerifySuccess {
peer_id,
Expand Down
5 changes: 1 addition & 4 deletions node/src/snark_pool/snark_pool_reducer.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::collections::BTreeMap;

use crate::{snark_pool::JobCommitment, ExternalSnarkWorkerAction, SnarkerStrategy};
use openmina_core::snark::{SnarkJobCommitment, SnarkJobId};
use p2p::channels::{
snark::P2pChannelsSnarkAction, snark_job_commitment::P2pChannelsSnarkJobCommitmentAction,
};

use crate::{snark_pool::JobCommitment, ExternalSnarkWorkerAction, SnarkerStrategy};

use super::{
JobState, SnarkPoolAction, SnarkPoolActionWithMetaRef, SnarkPoolEffectfulAction,
SnarkPoolState, SnarkWork,
Expand Down Expand Up @@ -202,8 +201,6 @@ impl SnarkPoolState {
}
}

// TODO: libp2p logic already broadcasts everything right now and doesn't
// wait for validation, thad needs to be fixed. See #952
dispatcher.push(P2pChannelsSnarkAction::Libp2pBroadcast {
snark: snark.clone(),
nonce: 0,
Expand Down
10 changes: 6 additions & 4 deletions node/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use std::time::Duration;
use malloc_size_of_derive::MallocSizeOf;
use mina_p2p_messages::v2;
use openmina_core::constants::PROTOCOL_VERSION;
use openmina_core::transaction::{TransactionInfo, TransactionWithHash};
use openmina_core::transaction::{
TransactionInfo, TransactionPoolMessageSource, TransactionWithHash,
};
use p2p::P2pNetworkPubsubMessageCacheId;
use rand::prelude::*;

Expand Down Expand Up @@ -539,10 +541,10 @@ impl P2p {
}
)),
on_p2p_channels_transaction_libp2p_received: Some(redux::callback!(
on_p2p_channels_transaction_libp2p_received(transaction: Box<TransactionWithHash>) -> crate::Action {
on_p2p_channels_transaction_libp2p_received((transactions: Vec<TransactionWithHash>, id: P2pNetworkPubsubMessageCacheId)) -> crate::Action {
TransactionPoolAction::StartVerify {
commands: std::iter::once(*transaction).collect(),
from_rpc: None
commands: transactions.into_iter().collect(),
from_source: TransactionPoolMessageSource::pubsub(id),
}
}
)),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![allow(clippy::unit_arg)]

use crate::{p2p_ready, TransactionPoolAction};
use openmina_core::transaction::TransactionPoolMessageSource;
use p2p::{
channels::rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcRequest},
PeerId,
Expand Down Expand Up @@ -103,7 +104,7 @@ impl TransactionPoolCandidatesState {
let transaction_hashes = batch.iter().map(|tx| tx.hash().clone()).collect();
dispatcher.push(TransactionPoolAction::StartVerify {
commands: batch.into_iter().collect(),
from_rpc: None,
from_source: TransactionPoolMessageSource::None,
});
dispatcher.push(TransactionPoolCandidateAction::VerifyPending {
peer_id,
Expand Down
24 changes: 14 additions & 10 deletions node/src/transaction_pool/transaction_pool_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@ use ledger::{
},
Account, AccountId,
};
use mina_p2p_messages::{list::List, v2};
use openmina_core::{requests::RpcId, transaction::TransactionWithHash, ActionEvent};
use mina_p2p_messages::{
list::List,
v2::{self},
};
use openmina_core::{
transaction::{TransactionPoolMessageSource, TransactionWithHash},
ActionEvent,
};
use redux::Callback;
use serde::{Deserialize, Serialize};

Expand All @@ -24,16 +30,16 @@ pub enum TransactionPoolAction {
Candidate(TransactionPoolCandidateAction),
StartVerify {
commands: List<TransactionWithHash>,
from_rpc: Option<RpcId>,
from_source: TransactionPoolMessageSource,
},
StartVerifyWithAccounts {
accounts: BTreeMap<AccountId, Account>,
pending_id: PendingId,
from_rpc: Option<RpcId>,
from_source: TransactionPoolMessageSource,
},
VerifySuccess {
valids: Vec<valid::UserCommand>,
from_rpc: Option<RpcId>,
from_source: TransactionPoolMessageSource,
},
#[action_event(level = warn, fields(debug(errors)))]
VerifyError {
Expand All @@ -48,9 +54,7 @@ pub enum TransactionPoolAction {
ApplyVerifiedDiff {
best_tip_hash: v2::LedgerHash,
diff: DiffVerified,
/// Diff was crearted locally, or from remote peer ?
is_sender_local: bool,
from_rpc: Option<RpcId>,
from_source: TransactionPoolMessageSource,
},
ApplyVerifiedDiffWithAccounts {
accounts: BTreeMap<AccountId, Account>,
Expand Down Expand Up @@ -127,7 +131,7 @@ impl redux::EnablingCondition<crate::State> for TransactionPoolAction {
type TransactionPoolEffectfulActionCallback = Callback<(
BTreeMap<AccountId, Account>,
Option<PendingId>,
Option<RpcId>,
TransactionPoolMessageSource,
)>;

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand All @@ -137,7 +141,7 @@ pub enum TransactionPoolEffectfulAction {
ledger_hash: v2::LedgerHash,
on_result: TransactionPoolEffectfulActionCallback,
pending_id: Option<PendingId>,
from_rpc: Option<RpcId>,
from_source: TransactionPoolMessageSource,
},
}

Expand Down
4 changes: 2 additions & 2 deletions node/src/transaction_pool/transaction_pool_effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl TransactionPoolEffectfulAction {
ledger_hash,
on_result,
pending_id,
from_rpc,
from_source,
} => {
openmina_core::log::info!(
openmina_core::log::system_time();
Expand Down Expand Up @@ -49,7 +49,7 @@ impl TransactionPoolEffectfulAction {
.map(|account| (account.id(), account))
.collect::<BTreeMap<_, _>>();

store.dispatch_callback(on_result, (accounts, pending_id, from_rpc));
store.dispatch_callback(on_result, (accounts, pending_id, from_source));
}
}
}
Expand Down
Loading
Loading