Skip to content

Commit 74ce6c9

Browse files
committed
Updated tests
1 parent ea699b0 commit 74ce6c9

File tree

7 files changed

+144
-154
lines changed

7 files changed

+144
-154
lines changed

p2p/src/connection/incoming/p2p_connection_incoming_reducer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,10 +160,10 @@ impl P2pConnectionIncomingState {
160160
);
161161
return Ok(());
162162
};
163-
let signaling = signaling.clone();
163+
let signaling = *signaling;
164164
*state = Self::AnswerReady {
165165
time: meta.time(),
166-
signaling: signaling.clone(),
166+
signaling,
167167
offer: offer.clone(),
168168
answer: answer.clone(),
169169
rpc_id: rpc_id.take(),

p2p/src/peer/p2p_peer_reducer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ impl P2pPeerState {
3434
});
3535

3636
if let Some(dial_opts) = dial_opts {
37-
let _ = peer_state.dial_opts.insert(dial_opts);
37+
peer_state.dial_opts.get_or_insert(dial_opts);
3838
}
3939
Ok(())
4040
}

p2p/testing/src/cluster.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ impl Cluster {
367367

368368
pub fn add_rust_node(&mut self, config: RustNodeConfig) -> Result<RustNodeId> {
369369
let override_fn = config.override_fn;
370+
let reducer_override_fn = config.override_reducer;
370371
let node_idx = self.rust_nodes.len();
371372
let (event_sender, event_receiver) = mpsc::unbounded_channel();
372373
let (config, secret_key) = self.rust_node_config(config)?;
@@ -381,7 +382,7 @@ impl Cluster {
381382
);
382383

383384
let store = crate::redux::Store::new(
384-
|state, action, dispatcher| {
385+
reducer_override_fn.unwrap_or(|state, action, dispatcher| {
385386
let meta = action.meta().clone();
386387
let action = action.action();
387388

@@ -400,7 +401,7 @@ impl Cluster {
400401
if let Err(error) = result {
401402
openmina_core::warn!(time; "error = {error}");
402403
}
403-
},
404+
}),
404405
override_fn.unwrap_or(|store, action| {
405406
let (action, meta) = action.split();
406407
match action {

p2p/testing/src/redux.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ impl State {
6060
pub fn state(&self) -> &P2pState {
6161
&self.0
6262
}
63+
pub fn state_mut(&mut self) -> &mut P2pState {
64+
&mut self.0
65+
}
6366
}
6467

6568
impl EnablingCondition<State> for Action {

p2p/testing/src/rust_node.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::{
66

77
use futures::Stream;
88
use p2p::{P2pAction, P2pEvent, P2pLimits, P2pState, P2pTimeouts, PeerId};
9-
use redux::{Effects, EnablingCondition, SubStore};
9+
use redux::{Effects, EnablingCondition, Reducer, SubStore};
1010
use tokio::sync::mpsc;
1111

1212
use crate::{
@@ -28,6 +28,7 @@ pub struct RustNodeConfig {
2828
pub limits: P2pLimits,
2929
pub discovery: bool,
3030
pub override_fn: Option<Effects<State, ClusterService, Action>>,
31+
pub override_reducer: Option<Reducer<State, Action>>,
3132
}
3233

3334
impl RustNodeConfig {
@@ -63,6 +64,11 @@ impl RustNodeConfig {
6364
self.override_fn = Some(override_fn);
6465
self
6566
}
67+
68+
pub fn with_override_reducer(mut self, override_fn: Reducer<State, Action>) -> Self {
69+
self.override_reducer = Some(override_fn);
70+
self
71+
}
6672
}
6773

6874
pub struct RustNode {

p2p/tests/identify.rs

Lines changed: 78 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@ use multiaddr::{multiaddr, Multiaddr};
77
use p2p::{
88
identity::SecretKey,
99
network::identify::{
10-
stream::P2pNetworkIdentifyStreamState, P2pNetworkIdentify, P2pNetworkIdentifyAction,
11-
P2pNetworkIdentifyStreamAction,
10+
stream_effectful::P2pNetworkIdentifyStreamEffectfulAction, P2pNetworkIdentify,
11+
P2pNetworkIdentifyEffectfulAction, P2pNetworkIdentifyStreamAction,
1212
},
1313
token::{self, DiscoveryAlgorithm},
14-
Data, P2pAction, P2pNetworkAction, P2pNetworkYamuxAction, PeerId,
14+
Data, P2pEffectfulAction, P2pNetworkEffectfulAction, P2pNetworkYamuxAction, PeerId,
1515
};
1616
use p2p_testing::{
1717
cluster::{Cluster, ClusterBuilder, ClusterEvent, Listener},
18-
event::{event_mapper_effect, RustNodeEvent},
18+
event::{allow_disconnections, event_mapper_effect, RustNodeEvent},
1919
futures::TryStreamExt,
2020
predicates::{async_fn, listener_is_ready, peer_is_connected},
2121
redux::{Action, State},
@@ -103,12 +103,12 @@ async fn rust_node_to_rust_node() -> anyhow::Result<()> {
103103
}
104104

105105
#[tokio::test]
106-
#[ignore = "TODO: Add override for reducer"]
107106
/// Test that even if bad node spams many different listen_addrs we don't end up with duplicates
108107
async fn test_bad_node() -> anyhow::Result<()> {
109108
let mut cluster = ClusterBuilder::new()
110109
.ports_with_len(100)
111110
.idle_duration(Duration::from_millis(100))
111+
.is_error(allow_disconnections)
112112
.start()
113113
.await?;
114114

@@ -154,6 +154,9 @@ async fn test_bad_node() -> anyhow::Result<()> {
154154

155155
let expected_addrs = [
156156
multiaddr!(Ip4([127, 0, 0, 1]), Tcp(bad_node_port)),
157+
multiaddr!(Unix("domain.com")),
158+
multiaddr!(Dns("domain.com"), Tcp(10530u16)),
159+
multiaddr!(Https),
157160
multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10500u16)),
158161
multiaddr!(Ip6([0; 16]), Tcp(10500u16)),
159162
multiaddr!(Ip6([1; 16]), Tcp(10500u16)),
@@ -171,103 +174,81 @@ fn bad_node_effects(
171174
action: ActionWithMeta<Action>,
172175
) {
173176
{
174-
let (action, _meta) = action.split();
177+
let (action, meta) = action.split();
175178
match action {
176179
Action::P2p(a) => {
177-
match a.clone() {
178-
P2pAction::Network(P2pNetworkAction::Identify(
179-
P2pNetworkIdentifyAction::Stream(P2pNetworkIdentifyStreamAction::New {
180-
addr,
181-
peer_id,
182-
stream_id,
183-
..
184-
}),
185-
)) => {
186-
let state = store
187-
.state()
188-
.state()
189-
.network
190-
.scheduler
191-
.identify_state
192-
.find_identify_stream_state(&peer_id, &stream_id)
193-
.expect("Unable to find identify stream");
194-
195-
if let P2pNetworkIdentifyStreamState::SendIdentify = state {
196-
let listen_addrs = vec![
197-
multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10500u16)),
198-
multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10500u16)),
199-
multiaddr!(Ip6([0; 16]), Tcp(10500u16)),
200-
multiaddr!(Ip6([0; 16]), Tcp(10500u16)),
201-
multiaddr!(Ip6([1; 16]), Tcp(10500u16)),
202-
multiaddr!(Ip6([1; 16]), Tcp(10500u16)),
203-
multiaddr!(Dns("domain.com"), Tcp(10530u16)),
204-
multiaddr!(Dns("domain.com"), Tcp(10530u16)),
205-
multiaddr!(Dns("domain.com"), Tcp(10530u16)),
206-
multiaddr!(Dns("domain.com"), Tcp(10530u16)),
207-
multiaddr!(Unix("domain.com")),
208-
multiaddr!(Https),
209-
];
210-
211-
let public_key = Some(SecretKey::rand().public_key());
212-
213-
let protocols = vec![
214-
token::StreamKind::Identify(
215-
token::IdentifyAlgorithm::Identify1_0_0,
216-
),
217-
token::StreamKind::Broadcast(
218-
p2p::token::BroadcastAlgorithm::Meshsub1_1_0,
219-
),
220-
p2p::token::StreamKind::Rpc(token::RpcAlgorithm::Rpc0_0_1),
221-
p2p::token::StreamKind::Discovery(
222-
DiscoveryAlgorithm::Kademlia1_0_0,
223-
),
224-
];
225-
226-
let identify_msg = P2pNetworkIdentify {
227-
protocol_version: Some("ipfs/0.1.0".to_string()),
228-
agent_version: Some("openmina".to_owned()),
229-
public_key,
230-
listen_addrs,
231-
observed_addr: None,
232-
protocols,
233-
};
234-
235-
let mut out = Vec::new();
236-
let identify_msg_proto =
237-
identify_msg.to_proto_message().expect("serialized message");
238-
239-
prost::Message::encode_length_delimited(&identify_msg_proto, &mut out)
240-
.expect("Error converting message");
241-
242-
store.dispatch(Action::P2p(
243-
P2pNetworkYamuxAction::OutgoingData {
244-
addr,
245-
stream_id,
246-
data: Data(out.into_boxed_slice()),
247-
flags: Default::default(),
248-
}
249-
.into(),
250-
));
251-
252-
store.dispatch(Action::P2p(
253-
P2pNetworkIdentifyStreamAction::Close {
254-
addr,
255-
peer_id,
256-
stream_id,
257-
}
258-
.into(),
259-
));
260-
}
261-
}
262-
_ => {
263-
// p2p_effects(store, meta.with_action(a.clone()));
264-
}
265-
}
266180
event_mapper_effect(store, a);
267181
}
268-
Action::Idle(_) | Action::P2pEffectful(_) => {
269-
// p2p_timeout_effects(store, &meta);
182+
Action::P2pEffectful(P2pEffectfulAction::Network(
183+
P2pNetworkEffectfulAction::Identify(P2pNetworkIdentifyEffectfulAction::Stream(
184+
P2pNetworkIdentifyStreamEffectfulAction::SendIdentify {
185+
addr,
186+
peer_id,
187+
stream_id,
188+
},
189+
)),
190+
)) => {
191+
let listen_addrs = vec![
192+
multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10500u16)),
193+
multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10500u16)),
194+
multiaddr!(Ip6([0; 16]), Tcp(10500u16)),
195+
multiaddr!(Ip6([0; 16]), Tcp(10500u16)),
196+
multiaddr!(Ip6([1; 16]), Tcp(10500u16)),
197+
multiaddr!(Ip6([1; 16]), Tcp(10500u16)),
198+
multiaddr!(Dns("domain.com"), Tcp(10530u16)),
199+
multiaddr!(Dns("domain.com"), Tcp(10530u16)),
200+
multiaddr!(Dns("domain.com"), Tcp(10530u16)),
201+
multiaddr!(Dns("domain.com"), Tcp(10530u16)),
202+
multiaddr!(Unix("domain.com")),
203+
multiaddr!(Https),
204+
];
205+
206+
let public_key = Some(SecretKey::rand().public_key());
207+
208+
let protocols = vec![
209+
token::StreamKind::Identify(token::IdentifyAlgorithm::Identify1_0_0),
210+
token::StreamKind::Broadcast(p2p::token::BroadcastAlgorithm::Meshsub1_1_0),
211+
p2p::token::StreamKind::Rpc(token::RpcAlgorithm::Rpc0_0_1),
212+
p2p::token::StreamKind::Discovery(DiscoveryAlgorithm::Kademlia1_0_0),
213+
];
214+
215+
let identify_msg = P2pNetworkIdentify {
216+
protocol_version: Some("ipfs/0.1.0".to_string()),
217+
agent_version: Some("openmina".to_owned()),
218+
public_key,
219+
listen_addrs,
220+
observed_addr: None,
221+
protocols,
222+
};
223+
224+
let mut out = Vec::new();
225+
let identify_msg_proto =
226+
identify_msg.to_proto_message().expect("serialized message");
227+
228+
prost::Message::encode_length_delimited(&identify_msg_proto, &mut out)
229+
.expect("Error converting message");
230+
231+
store.dispatch(Action::P2p(
232+
P2pNetworkYamuxAction::OutgoingData {
233+
addr,
234+
stream_id,
235+
data: Data(out.into_boxed_slice()),
236+
flags: Default::default(),
237+
}
238+
.into(),
239+
));
240+
241+
store.dispatch(Action::P2p(
242+
P2pNetworkIdentifyStreamAction::Close {
243+
addr,
244+
peer_id,
245+
stream_id,
246+
}
247+
.into(),
248+
));
270249
}
250+
Action::P2pEffectful(action) => action.effects(meta, store),
251+
_ => {}
271252
};
272253
}
273254
}

0 commit comments

Comments
 (0)