Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions node/common/src/service/archive/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
#[cfg(not(target_arch = "wasm32"))]
use mina_p2p_messages::v2::{self};
use node::core::{channels::mpsc, thread};
use node::ledger::write::BlockApplyResult;
#[cfg(not(target_arch = "wasm32"))]
use std::env;
use std::io::Write;

#[cfg(not(target_arch = "wasm32"))]
use mina_p2p_messages::v2::PrecomputedBlock;
#[cfg(not(target_arch = "wasm32"))]
use openmina_core::NetworkConfig;
#[cfg(not(target_arch = "wasm32"))]
use std::net::SocketAddr;

use super::NodeService;
Expand All @@ -21,8 +26,11 @@ pub mod config;

use config::ArchiveStorageOptions;

#[cfg(not(target_arch = "wasm32"))]
const ARCHIVE_SEND_RETRIES: u8 = 5;
#[cfg(not(target_arch = "wasm32"))]
const MAX_EVENT_COUNT: u64 = 100;
#[cfg(not(target_arch = "wasm32"))]
const RETRY_INTERVAL_MS: u64 = 1000;

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -244,9 +252,9 @@ impl ArchiveService {
// Note: Placeholder for the wasm implementation, if we decide to include an archive mode in the future
#[cfg(target_arch = "wasm32")]
fn run(
mut archive_receiver: mpsc::UnboundedReceiver<BlockApplyResult>,
options: ArchiveStorageOptions,
work_dir: String,
mut _archive_receiver: mpsc::UnboundedReceiver<BlockApplyResult>,
_options: ArchiveStorageOptions,
_work_dir: String,
) {
unimplemented!()
}
Expand Down Expand Up @@ -284,14 +292,14 @@ impl ArchiveService {

#[cfg(target_arch = "wasm32")]
fn start_wasm(
archive_receiver: mpsc::UnboundedReceiver<BlockApplyResult>,
options: ArchiveStorageOptions,
work_dir: String,
_archive_receiver: mpsc::UnboundedReceiver<BlockApplyResult>,
_options: ArchiveStorageOptions,
_work_dir: String,
) {
thread::Builder::new()
.name("openmina_archive".to_owned())
.spawn(move || {
Self::run(archive_receiver, options, work_dir);
Self::run(_archive_receiver, _options, _work_dir);
})
.unwrap();
}
Expand Down
5 changes: 3 additions & 2 deletions node/src/block_producer/block_producer_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use openmina_core::{
global_sub_window, in_same_checkpoint_window, in_seed_update_range, relative_sub_window,
},
};
#[cfg(feature = "p2p-libp2p")]
use p2p::P2pNetworkPubsubAction;
use redux::{callback, Dispatcher, Timestamp};

Expand Down Expand Up @@ -369,10 +370,10 @@ impl BlockProducerEnabled {
bug_condition!("Invalid state for `BlockProducerAction::BlockInjected` expected: `BlockProducerCurrentState::Produced`, found: {:?}", state.current);
}

let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
let (dispatcher, _global_state) = state_context.into_dispatcher_and_state();

#[cfg(feature = "p2p-libp2p")]
broadcast_injected_block(global_state, dispatcher);
broadcast_injected_block(_global_state, dispatcher);

dispatcher.push(BlockProducerAction::WonSlotSearch);
}
Expand Down
10 changes: 6 additions & 4 deletions p2p/src/channels/rpc/p2p_channels_rpc_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use super::{
P2pChannelsRpcAction, P2pChannelsRpcState, P2pRpcLocalState, P2pRpcRemotePendingRequestState,
P2pRpcRemoteState, P2pRpcResponse, RpcChannelMsg, MAX_P2P_RPC_REMOTE_CONCURRENT_REQUESTS,
};
#[cfg(feature = "p2p-libp2p")]
use crate::P2pNetworkRpcAction;
use crate::{
channels::{ChannelId, ChannelMsg, MsgId, P2pChannelsEffectfulAction},
P2pNetworkRpcAction, P2pPeerAction, P2pState,
P2pPeerAction, P2pState,
};
use openmina_core::{block::BlockWithHash, bug_condition, error, Substate};
use redux::ActionWithMeta;
Expand All @@ -23,7 +25,7 @@ impl P2pChannelsRpcState {
let (action, meta) = action.split();
let p2p_state = state_context.get_substate_mut()?;
let peer_id = *action.peer_id();
let is_libp2p = p2p_state.is_libp2p_peer(&peer_id);
let _is_libp2p = p2p_state.is_libp2p_peer(&peer_id);
let peer_state = &mut p2p_state
.get_ready_peer_mut(&peer_id)
.ok_or_else(|| format!("Peer state not found for: {action:?}"))?
Expand Down Expand Up @@ -96,7 +98,7 @@ impl P2pChannelsRpcState {
let dispatcher = state_context.into_dispatcher();

#[cfg(feature = "p2p-libp2p")]
if is_libp2p {
if _is_libp2p {
if let Some((query, data)) =
super::libp2p::internal_request_into_libp2p(*request.clone(), id)
{
Expand Down Expand Up @@ -231,7 +233,7 @@ impl P2pChannelsRpcState {
let dispatcher = state_context.into_dispatcher();

#[cfg(feature = "p2p-libp2p")]
if is_libp2p {
if _is_libp2p {
if let Some(response) = response {
if let Some((response, data)) =
super::libp2p::internal_response_into_libp2p(*response, id)
Expand Down
5 changes: 4 additions & 1 deletion p2p/src/channels/snark/p2p_channels_snark_reducer.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use openmina_core::{bug_condition, Substate};
use redux::ActionWithMeta;

#[cfg(feature = "p2p-libp2p")]
use crate::P2pNetworkPubsubAction;
use crate::{
channels::{ChannelId, MsgId, P2pChannelsEffectfulAction},
P2pNetworkPubsubAction, P2pState,
P2pState,
};

use super::{
P2pChannelsSnarkAction, P2pChannelsSnarkState, SnarkPropagationChannelMsg,
SnarkPropagationState,
};
#[cfg(feature = "p2p-libp2p")]
use mina_p2p_messages::{gossip::GossipNetMessageV2, v2};

impl P2pChannelsSnarkState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ use super::{
P2pChannelsTransactionAction, P2pChannelsTransactionState, TransactionPropagationChannelMsg,
TransactionPropagationState,
};
#[cfg(feature = "p2p-libp2p")]
use crate::P2pNetworkPubsubAction;
use crate::{
channels::{ChannelId, MsgId, P2pChannelsEffectfulAction},
P2pNetworkPubsubAction, P2pState,
P2pState,
};
#[cfg(feature = "p2p-libp2p")]
use mina_p2p_messages::{gossip::GossipNetMessageV2, v2};
use openmina_core::{bug_condition, transaction::TransactionWithHash, Substate};
use redux::ActionWithMeta;
Expand Down
52 changes: 36 additions & 16 deletions p2p/src/connection/incoming/p2p_connection_incoming_reducer.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,48 @@
#[cfg(feature = "p2p-libp2p")]
use std::net::{IpAddr, SocketAddr};

use openmina_core::{bug_condition, debug, warn, Substate};
use redux::{ActionWithMeta, Dispatcher, Timestamp};
use openmina_core::{bug_condition, Substate};
#[cfg(feature = "p2p-libp2p")]
use openmina_core::{debug, warn};
use redux::ActionWithMeta;
#[cfg(feature = "p2p-libp2p")]
use redux::{Dispatcher, Timestamp};

use crate::{
channels::signaling::exchange::P2pChannelsSignalingExchangeAction,
connection::{
incoming::P2pConnectionIncomingError,
incoming_effectful::P2pConnectionIncomingEffectfulAction,
outgoing::{P2pConnectionOutgoingInitLibp2pOpts, P2pConnectionOutgoingInitOpts},
P2pConnectionResponse, P2pConnectionState,
incoming_effectful::P2pConnectionIncomingEffectfulAction, P2pConnectionResponse,
P2pConnectionState,
},
disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
webrtc::{Host, HttpSignalingInfo, SignalingMethod},
ConnectionAddr, P2pNetworkSchedulerAction, P2pPeerAction, P2pPeerState, P2pPeerStatus,
P2pState, PeerId,
disconnection::P2pDisconnectionAction,
P2pPeerAction, P2pPeerState, P2pPeerStatus, P2pState,
};

#[cfg(feature = "p2p-libp2p")]
use crate::{
connection::outgoing::{P2pConnectionOutgoingInitLibp2pOpts, P2pConnectionOutgoingInitOpts},
disconnection::P2pDisconnectionReason,
ConnectionAddr, P2pNetworkSchedulerAction, PeerId,
};

#[cfg(not(feature = "p2p-libp2p"))]
use crate::connection::outgoing::P2pConnectionOutgoingInitOpts;

#[cfg(feature = "p2p-webrtc")]
use crate::webrtc::{HttpSignalingInfo, SignalingMethod};

#[cfg(feature = "p2p-libp2p")]
use crate::p2p_network::Host;

use super::{
super::{incoming::P2pConnectionIncomingState, RejectionReason},
IncomingSignalingMethod, P2pConnectionIncomingAction,
super::incoming::P2pConnectionIncomingState, IncomingSignalingMethod,
P2pConnectionIncomingAction,
};

#[cfg(feature = "p2p-libp2p")]
use super::super::RejectionReason;

impl P2pConnectionIncomingState {
/// Substate is accessed
pub fn reducer<Action, State>(
Expand All @@ -34,10 +54,10 @@ impl P2pConnectionIncomingState {
Action: crate::P2pActionTrait<State>,
{
let (action, meta) = action.split();
let time = meta.time();
let _time = meta.time();
let peer_id = *action.peer_id();
let p2p_state = state_context.get_substate_mut()?;
let my_id = p2p_state.my_id();
let _my_id = p2p_state.my_id();

match action {
P2pConnectionIncomingAction::Init { opts, rpc_id } => {
Expand Down Expand Up @@ -411,7 +431,7 @@ impl P2pConnectionIncomingState {
}
Ok(())
}
P2pConnectionIncomingAction::FinalizePendingLibp2p { addr, .. } => {
P2pConnectionIncomingAction::FinalizePendingLibp2p { addr: _addr, .. } => {
#[cfg(feature = "p2p-libp2p")]
{
let state = p2p_state
Expand All @@ -430,12 +450,12 @@ impl P2pConnectionIncomingState {
identify: None,
});

Self::reduce_finalize_libp2p_pending(state, addr, time, my_id, peer_id);
Self::reduce_finalize_libp2p_pending(state, _addr, _time, _my_id, peer_id);

let (dispatcher, state) = state_context.into_dispatcher_and_state();
let p2p_state: &P2pState = state.substate()?;
Self::dispatch_finalize_libp2p_pending(
dispatcher, p2p_state, my_id, peer_id, time, addr,
dispatcher, p2p_state, _my_id, peer_id, _time, _addr,
);
}

Expand Down
15 changes: 11 additions & 4 deletions p2p/src/connection/outgoing/p2p_connection_outgoing_reducer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#[cfg(feature = "p2p-libp2p")]
use std::net::SocketAddr;

use openmina_core::{bug_condition, warn, Substate};
#[cfg(feature = "p2p-libp2p")]
use openmina_core::warn;
use openmina_core::{bug_condition, Substate};
use redux::ActionWithMeta;

use crate::{
Expand All @@ -11,16 +14,20 @@ use crate::{
},
disconnection::P2pDisconnectionAction,
webrtc::Host,
P2pNetworkKadRequestAction, P2pNetworkSchedulerAction, P2pPeerAction, P2pPeerState,
P2pPeerStatus, P2pState,
P2pPeerAction, P2pPeerState, P2pPeerStatus, P2pState,
};

use super::{
libp2p_opts::P2pConnectionOutgoingInitLibp2pOptsTryToSocketAddrError,
P2pConnectionOutgoingAction, P2pConnectionOutgoingError, P2pConnectionOutgoingInitOpts,
P2pConnectionOutgoingState,
};

#[cfg(feature = "p2p-libp2p")]
use super::libp2p_opts::P2pConnectionOutgoingInitLibp2pOptsTryToSocketAddrError;

#[cfg(feature = "p2p-libp2p")]
use crate::{P2pNetworkKadRequestAction, P2pNetworkSchedulerAction};

impl P2pConnectionOutgoingState {
/// Substate is accessed
pub fn reducer<Action, State>(
Expand Down
11 changes: 7 additions & 4 deletions p2p/src/disconnection/p2p_disconnection_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ use rand::prelude::*;
use redux::ActionWithMeta;

use crate::{
disconnection_effectful::P2pDisconnectionEffectfulAction, P2pNetworkSchedulerAction,
P2pPeerAction, P2pPeerStatus, P2pState,
disconnection_effectful::P2pDisconnectionEffectfulAction, P2pPeerAction, P2pPeerStatus,
P2pState,
};

#[cfg(feature = "p2p-libp2p")]
use crate::P2pNetworkSchedulerAction;

use super::{P2pDisconnectedState, P2pDisconnectionAction, P2pDisconnectionReason};

/// Do not disconnect peer for this duration just for freeing up peer space.
Expand Down Expand Up @@ -51,7 +54,7 @@ impl P2pDisconnectedState {
}
Ok(())
}
P2pDisconnectionAction::Init { peer_id, reason } => {
P2pDisconnectionAction::Init { peer_id, reason: _reason } => {
let Some(peer) = p2p_state.peers.get_mut(&peer_id) else {
bug_condition!("Invalid state for: `P2pDisconnectionAction::Init`");
return Ok(());
Expand All @@ -73,7 +76,7 @@ impl P2pDisconnectedState {
for addr in connections {
dispatcher.push(P2pNetworkSchedulerAction::Disconnect {
addr,
reason: reason.clone(),
reason: _reason.clone(),
});
}

Expand Down
7 changes: 7 additions & 0 deletions p2p/src/identify/p2p_identify_reducer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#[cfg(feature = "p2p-libp2p")]
use openmina_core::{bug_condition, Substate};
#[cfg(feature = "p2p-libp2p")]
use redux::ActionWithMeta;

#[cfg(feature = "p2p-libp2p")]
use crate::{
connection::outgoing::P2pConnectionOutgoingInitOpts,
disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
Expand All @@ -9,6 +12,10 @@ use crate::{
P2pNetworkKademliaAction, P2pNetworkYamuxAction, P2pState, YamuxStreamKind,
};

#[cfg(not(feature = "p2p-libp2p"))]
use crate::P2pState;

#[cfg(feature = "p2p-libp2p")]
use super::P2pIdentifyAction;

impl P2pState {
Expand Down
2 changes: 1 addition & 1 deletion p2p/src/p2p_effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl P2pEffectfulAction {
#[cfg(feature = "p2p-libp2p")]
P2pEffectfulAction::Network(action) => action.effects(&meta, store),
#[cfg(not(feature = "p2p-libp2p"))]
P2pEffectfulAction::Network(action) => {}
P2pEffectfulAction::Network(_action) => {}
}
}
}
16 changes: 10 additions & 6 deletions p2p/src/p2p_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ use crate::{
P2pConnectionState,
},
disconnection::{P2pDisconnectedState, P2pDisconnectionAction},
P2pAction, P2pNetworkKadKey, P2pNetworkKademliaAction, P2pNetworkPnetAction,
P2pNetworkPubsubAction, P2pNetworkRpcAction, P2pNetworkSelectAction, P2pNetworkState,
P2pPeerState, P2pState, PeerId,
P2pAction, P2pPeerState, P2pState,
};
use openmina_core::{bug_condition, Substate};
use redux::{ActionMeta, ActionWithMeta, Dispatcher, Timestamp};

#[cfg(feature = "p2p-libp2p")]
use crate::{
P2pNetworkKadKey, P2pNetworkKademliaAction, P2pNetworkPnetAction, P2pNetworkPubsubAction,
P2pNetworkRpcAction, P2pNetworkSelectAction, P2pNetworkState, PeerId,
};

impl P2pState {
pub fn reducer<State, Action>(
mut state_context: Substate<Action, State, Self>,
Expand All @@ -24,7 +28,7 @@ impl P2pState {
State: crate::P2pStateTrait,
Action: crate::P2pActionTrait<State>,
{
let Ok(state) = state_context.get_substate_mut() else {
let Ok(_state) = state_context.get_substate_mut() else {
bug_condition!("no P2pState");
return Ok(());
};
Expand Down Expand Up @@ -204,14 +208,14 @@ impl P2pState {
fn p2p_discovery<State, Action>(
&self,
dispatcher: &mut Dispatcher<Action, State>,
time: Timestamp,
_time: Timestamp,
) -> Result<(), String>
where
State: crate::P2pStateTrait,
Action: crate::P2pActionTrait<State>,
{
let config = &self.config;
let timeouts = &config.timeouts;
let _timeouts = &config.timeouts;

if !config.peer_discovery {
return Ok(());
Expand Down
Loading
Loading