Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ mina-p2p-messages = { workspace = true }
poseidon = { workspace = true }
hex = "0.4.3"
ark-ff = { workspace = true }
libp2p-identity = { version = "=0.2.7", features = [
"serde",
"peerid"
] }

[target.'cfg(not(target_family = "wasm"))'.dependencies]
redux = { workspace = true, features = ["serializable_callbacks"] }
Expand Down
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod constants;
pub mod dummy;

pub mod block;
pub mod p2p;
pub mod snark;
pub mod transaction;

Expand Down
16 changes: 16 additions & 0 deletions core/src/p2p.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/// TODO: These types and methods should be moved to `p2p` crate, they are here because they are used in `snark` crates callbacks
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy)]
pub struct P2pNetworkPubsubMessageCacheId {
pub source: libp2p_identity::PeerId,
pub seqno: u64,
}

impl P2pNetworkPubsubMessageCacheId {
pub fn to_raw_bytes(&self) -> Vec<u8> {
let mut message_id = self.source.to_base58();
message_id.push_str(&self.seqno.to_string());
message_id.into_bytes()
}
}
33 changes: 33 additions & 0 deletions core/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,36 @@ mod transaction_with_hash;
pub use transaction_with_hash::*;

pub use mina_p2p_messages::v2::{MinaBaseUserCommandStableV2 as Transaction, TransactionHash};

use crate::{p2p::P2pNetworkPubsubMessageCacheId, requests::RpcId};

/// TODO: Types and methods bellow, should be moved to `node` crate, they are here because they are used in `snark` crates callbacks
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, Default)]
pub enum TransactionPoolMessageSource {
Rpc {
id: RpcId,
},
Pubsub {
id: P2pNetworkPubsubMessageCacheId,
},
#[default]
None,
}

impl TransactionPoolMessageSource {
pub fn rpc(id: RpcId) -> Self {
Self::Rpc { id }
}

pub fn pubsub(id: P2pNetworkPubsubMessageCacheId) -> Self {
Self::Pubsub { id }
}

pub fn is_sender_local(&self) -> bool {
matches!(self, Self::Rpc { .. })
}

pub fn is_libp2p(&self) -> bool {
matches!(self, Self::Pubsub { .. })
}
}
4 changes: 2 additions & 2 deletions node/src/rpc/rpc_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use openmina_core::{
block::AppliedBlock,
bug_condition,
requests::{RequestId, RpcId, RpcIdType},
transaction::TransactionWithHash,
transaction::{TransactionPoolMessageSource, TransactionWithHash},
};
use p2p::{
connection::{incoming::P2pConnectionIncomingAction, outgoing::P2pConnectionOutgoingAction},
Expand Down Expand Up @@ -515,7 +515,7 @@ impl RpcState {
dispatcher.push(RpcAction::TransactionInjectPending { rpc_id: *rpc_id });
dispatcher.push(TransactionPoolAction::StartVerify {
commands: commands_with_hash,
from_rpc: Some(*rpc_id),
from_source: TransactionPoolMessageSource::rpc(*rpc_id),
});
}
RpcAction::TransactionInjectPending { rpc_id } => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub enum SnarkPoolCandidateAction {
WorkVerifyError {
peer_id: PeerId,
verify_id: SnarkWorkVerifyId,
batch: Vec<SnarkJobId>,
},
WorkVerifySuccess {
peer_id: PeerId,
Expand Down
29 changes: 25 additions & 4 deletions node/src/snark_pool/candidate/snark_pool_candidate_reducer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::BTreeMap;

use crate::{p2p_ready, SnarkPoolAction};
use openmina_core::snark::Snark;
use openmina_core::snark::{Snark, SnarkJobId};
use p2p::{
channels::rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcRequest},
disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
PeerId,
BroadcastMessageId, P2pNetworkPubsubAction, PeerId,
};
use snark::{work_verify::SnarkWorkVerifyAction, work_verify_effectful::SnarkWorkVerifyId};

Expand Down Expand Up @@ -124,10 +124,11 @@ impl SnarkPoolCandidatesState {
}
}),
on_error: redux::callback!(
on_snark_pool_candidate_work_verify_error((req_id: SnarkWorkVerifyId, sender: String)) -> crate::Action {
on_snark_pool_candidate_work_verify_error((req_id: SnarkWorkVerifyId, sender: String, batch: Vec<SnarkJobId>)) -> crate::Action {
SnarkPoolCandidateAction::WorkVerifyError {
peer_id: sender.parse().unwrap(),
verify_id: req_id,
batch
}
}),
});
Expand All @@ -144,7 +145,11 @@ impl SnarkPoolCandidatesState {
} => {
state.verify_pending(meta.time(), peer_id, *verify_id, job_ids);
}
SnarkPoolCandidateAction::WorkVerifyError { peer_id, verify_id } => {
SnarkPoolCandidateAction::WorkVerifyError {
peer_id,
verify_id,
batch,
} => {
state.verify_result(meta.time(), peer_id, *verify_id, Err(()));

// TODO(binier): blacklist peer
Expand All @@ -154,6 +159,22 @@ impl SnarkPoolCandidatesState {
peer_id,
reason: P2pDisconnectionReason::SnarkPoolVerifyError,
});

// TODO: This is not correct. We are rejecting all snark messages, but the fact that the batch
// failed to verify means that there is at least one invalid snark in the batch, not that all of them
// are invalid.
// Instead, what should happen here is that we split the batch in two and try to verify the two batches
// again. Repeating until batches don't fail to verify anymore, or each batch is of size 1.
// It may also be worth capping the batch sizes to 10.
for snark_job_id in batch {
dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
message_id: Some(BroadcastMessageId::Snark {
job_id: snark_job_id.clone(),
}),
peer_id: None,
reason: "Snark work verification failed".to_string(),
});
}
}
SnarkPoolCandidateAction::WorkVerifySuccess {
peer_id,
Expand Down
5 changes: 1 addition & 4 deletions node/src/snark_pool/snark_pool_reducer.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::collections::BTreeMap;

use crate::{snark_pool::JobCommitment, ExternalSnarkWorkerAction, SnarkerStrategy};
use openmina_core::snark::{SnarkJobCommitment, SnarkJobId};
use p2p::channels::{
snark::P2pChannelsSnarkAction, snark_job_commitment::P2pChannelsSnarkJobCommitmentAction,
};

use crate::{snark_pool::JobCommitment, ExternalSnarkWorkerAction, SnarkerStrategy};

use super::{
JobState, SnarkPoolAction, SnarkPoolActionWithMetaRef, SnarkPoolEffectfulAction,
SnarkPoolState, SnarkWork,
Expand Down Expand Up @@ -202,8 +201,6 @@ impl SnarkPoolState {
}
}

// TODO: libp2p logic already broadcasts everything right now and doesn't
// wait for validation, thad needs to be fixed. See #952
dispatcher.push(P2pChannelsSnarkAction::Libp2pBroadcast {
snark: snark.clone(),
nonce: 0,
Expand Down
10 changes: 6 additions & 4 deletions node/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use std::time::Duration;
use malloc_size_of_derive::MallocSizeOf;
use mina_p2p_messages::v2;
use openmina_core::constants::PROTOCOL_VERSION;
use openmina_core::transaction::{TransactionInfo, TransactionWithHash};
use openmina_core::transaction::{
TransactionInfo, TransactionPoolMessageSource, TransactionWithHash,
};
use p2p::P2pNetworkPubsubMessageCacheId;
use rand::prelude::*;

Expand Down Expand Up @@ -539,10 +541,10 @@ impl P2p {
}
)),
on_p2p_channels_transaction_libp2p_received: Some(redux::callback!(
on_p2p_channels_transaction_libp2p_received(transaction: Box<TransactionWithHash>) -> crate::Action {
on_p2p_channels_transaction_libp2p_received((transactions: Vec<TransactionWithHash>, id: P2pNetworkPubsubMessageCacheId)) -> crate::Action {
TransactionPoolAction::StartVerify {
commands: std::iter::once(*transaction).collect(),
from_rpc: None
commands: transactions.into_iter().collect(),
from_source: TransactionPoolMessageSource::pubsub(id),
}
}
)),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![allow(clippy::unit_arg)]

use crate::{p2p_ready, TransactionPoolAction};
use openmina_core::transaction::TransactionPoolMessageSource;
use p2p::{
channels::rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcRequest},
PeerId,
Expand Down Expand Up @@ -103,7 +104,7 @@ impl TransactionPoolCandidatesState {
let transaction_hashes = batch.iter().map(|tx| tx.hash().clone()).collect();
dispatcher.push(TransactionPoolAction::StartVerify {
commands: batch.into_iter().collect(),
from_rpc: None,
from_source: TransactionPoolMessageSource::None,
});
dispatcher.push(TransactionPoolCandidateAction::VerifyPending {
peer_id,
Expand Down
24 changes: 14 additions & 10 deletions node/src/transaction_pool/transaction_pool_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@ use ledger::{
},
Account, AccountId,
};
use mina_p2p_messages::{list::List, v2};
use openmina_core::{requests::RpcId, transaction::TransactionWithHash, ActionEvent};
use mina_p2p_messages::{
list::List,
v2::{self},
};
use openmina_core::{
transaction::{TransactionPoolMessageSource, TransactionWithHash},
ActionEvent,
};
use redux::Callback;
use serde::{Deserialize, Serialize};

Expand All @@ -24,16 +30,16 @@ pub enum TransactionPoolAction {
Candidate(TransactionPoolCandidateAction),
StartVerify {
commands: List<TransactionWithHash>,
from_rpc: Option<RpcId>,
from_source: TransactionPoolMessageSource,
},
StartVerifyWithAccounts {
accounts: BTreeMap<AccountId, Account>,
pending_id: PendingId,
from_rpc: Option<RpcId>,
from_source: TransactionPoolMessageSource,
},
VerifySuccess {
valids: Vec<valid::UserCommand>,
from_rpc: Option<RpcId>,
from_source: TransactionPoolMessageSource,
},
#[action_event(level = warn, fields(debug(errors)))]
VerifyError {
Expand All @@ -48,9 +54,7 @@ pub enum TransactionPoolAction {
ApplyVerifiedDiff {
best_tip_hash: v2::LedgerHash,
diff: DiffVerified,
/// Diff was crearted locally, or from remote peer ?
is_sender_local: bool,
from_rpc: Option<RpcId>,
from_source: TransactionPoolMessageSource,
},
ApplyVerifiedDiffWithAccounts {
accounts: BTreeMap<AccountId, Account>,
Expand Down Expand Up @@ -127,7 +131,7 @@ impl redux::EnablingCondition<crate::State> for TransactionPoolAction {
type TransactionPoolEffectfulActionCallback = Callback<(
BTreeMap<AccountId, Account>,
Option<PendingId>,
Option<RpcId>,
TransactionPoolMessageSource,
)>;

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand All @@ -137,7 +141,7 @@ pub enum TransactionPoolEffectfulAction {
ledger_hash: v2::LedgerHash,
on_result: TransactionPoolEffectfulActionCallback,
pending_id: Option<PendingId>,
from_rpc: Option<RpcId>,
from_source: TransactionPoolMessageSource,
},
}

Expand Down
4 changes: 2 additions & 2 deletions node/src/transaction_pool/transaction_pool_effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl TransactionPoolEffectfulAction {
ledger_hash,
on_result,
pending_id,
from_rpc,
from_source,
} => {
openmina_core::log::info!(
openmina_core::log::system_time();
Expand Down Expand Up @@ -49,7 +49,7 @@ impl TransactionPoolEffectfulAction {
.map(|account| (account.id(), account))
.collect::<BTreeMap<_, _>>();

store.dispatch_callback(on_result, (accounts, pending_id, from_rpc));
store.dispatch_callback(on_result, (accounts, pending_id, from_source));
}
}
}
Expand Down
Loading
Loading