Skip to content

Commit 52f4321

Browse files
committed
test(p2p): notify statemachine on listening
1 parent 7d65640 commit 52f4321

File tree

17 files changed

+438
-4
lines changed

17 files changed

+438
-4
lines changed

node/src/action_kind.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ use crate::p2p::discovery::{
9393
P2pDiscoveryKademliaBootstrapAction, P2pDiscoveryKademliaFailureAction,
9494
P2pDiscoveryKademliaInitAction, P2pDiscoveryKademliaSuccessAction, P2pDiscoverySuccessAction,
9595
};
96+
use crate::p2p::listen::{
97+
P2pListenAction, P2pListenClosedAction, P2pListenErrorAction, P2pListenExpiredAction,
98+
P2pListenNewAction,
99+
};
96100
use crate::p2p::peer::{P2pPeerAction, P2pPeerBestTipUpdateAction, P2pPeerReadyAction};
97101
use crate::p2p::P2pAction;
98102
use crate::rpc::{
@@ -301,6 +305,10 @@ pub enum ActionKind {
301305
P2pDiscoveryKademliaInit,
302306
P2pDiscoveryKademliaSuccess,
303307
P2pDiscoverySuccess,
308+
P2pListenClosed,
309+
P2pListenError,
310+
P2pListenExpired,
311+
P2pListenNew,
304312
P2pPeerBestTipUpdate,
305313
P2pPeerReady,
306314
RpcActionStatsGet,
@@ -415,7 +423,7 @@ pub enum ActionKind {
415423
}
416424

417425
impl ActionKind {
418-
pub const COUNT: u16 = 211;
426+
pub const COUNT: u16 = 215;
419427
}
420428

421429
impl std::fmt::Display for ActionKind {
@@ -461,6 +469,7 @@ impl ActionKindGet for EventSourceAction {
461469
impl ActionKindGet for P2pAction {
462470
fn kind(&self) -> ActionKind {
463471
match self {
472+
Self::Listen(a) => a.kind(),
464473
Self::Connection(a) => a.kind(),
465474
Self::Disconnection(a) => a.kind(),
466475
Self::Discovery(a) => a.kind(),
@@ -611,6 +620,17 @@ impl ActionKindGet for EventSourceWaitTimeoutAction {
611620
}
612621
}
613622

623+
impl ActionKindGet for P2pListenAction {
624+
fn kind(&self) -> ActionKind {
625+
match self {
626+
Self::New(a) => a.kind(),
627+
Self::Expired(a) => a.kind(),
628+
Self::Error(a) => a.kind(),
629+
Self::Closed(a) => a.kind(),
630+
}
631+
}
632+
}
633+
614634
impl ActionKindGet for P2pConnectionAction {
615635
fn kind(&self) -> ActionKind {
616636
match self {
@@ -1117,6 +1137,30 @@ impl ActionKindGet for WatchedAccountsBlockLedgerQuerySuccessAction {
11171137
}
11181138
}
11191139

1140+
impl ActionKindGet for P2pListenNewAction {
1141+
fn kind(&self) -> ActionKind {
1142+
ActionKind::P2pListenNew
1143+
}
1144+
}
1145+
1146+
impl ActionKindGet for P2pListenExpiredAction {
1147+
fn kind(&self) -> ActionKind {
1148+
ActionKind::P2pListenExpired
1149+
}
1150+
}
1151+
1152+
impl ActionKindGet for P2pListenErrorAction {
1153+
fn kind(&self) -> ActionKind {
1154+
ActionKind::P2pListenError
1155+
}
1156+
}
1157+
1158+
impl ActionKindGet for P2pListenClosedAction {
1159+
fn kind(&self) -> ActionKind {
1160+
ActionKind::P2pListenClosed
1161+
}
1162+
}
1163+
11201164
impl ActionKindGet for P2pConnectionOutgoingAction {
11211165
fn kind(&self) -> ActionKind {
11221166
match self {

node/src/event_source/event_source_effects.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
use p2p::listen::{
2+
P2pListenClosedAction, P2pListenErrorAction, P2pListenExpiredAction, P2pListenNewAction,
3+
};
4+
use p2p::P2pListenEvent;
5+
16
use crate::action::CheckTimeoutsAction;
27
use crate::external_snark_worker::{
38
ExternalSnarkWorkerErrorAction, ExternalSnarkWorkerEvent, ExternalSnarkWorkerKilledAction,
@@ -73,6 +78,20 @@ pub fn event_source_effects<S: Service>(store: &mut Store<S>, action: EventSourc
7378
// "Translate" event into the corresponding action and dispatch it.
7479
EventSourceAction::NewEvent(content) => match content.event {
7580
Event::P2p(e) => match e {
81+
P2pEvent::Listen(e) => match e {
82+
P2pListenEvent::NewListenAddr { listener_id, addr } => {
83+
store.dispatch(P2pListenNewAction { listener_id, addr });
84+
}
85+
P2pListenEvent::ExpiredListenAddr { listener_id, addr } => {
86+
store.dispatch(P2pListenExpiredAction { listener_id, addr });
87+
}
88+
P2pListenEvent::ListenerError { listener_id, error } => {
89+
store.dispatch(P2pListenErrorAction { listener_id, error });
90+
}
91+
P2pListenEvent::ListenerClosed { listener_id, error } => {
92+
store.dispatch(P2pListenClosedAction { listener_id, error });
93+
}
94+
},
7695
P2pEvent::Connection(e) => match e {
7796
P2pConnectionEvent::OfferSdpReady(peer_id, res) => match res {
7897
Err(error) => {

node/src/logger/logger_effects.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,53 @@ pub fn logger_effects<S: Service>(store: &Store<S>, action: ActionWithMetaRef<'_
2121

2222
match action {
2323
Action::P2p(action) => match action {
24+
P2pAction::Listen(action) => match action {
25+
p2p::listen::P2pListenAction::New(action) => {
26+
openmina_core::log::info!(
27+
meta.time();
28+
kind = kind.to_string(),
29+
summary = format!("addr: {}", action.addr),
30+
addr = action.addr.to_string(),
31+
listener_id = action.listener_id.to_string(),
32+
);
33+
}
34+
p2p::listen::P2pListenAction::Expired(action) => {
35+
openmina_core::log::info!(
36+
meta.time();
37+
kind = kind.to_string(),
38+
summary = format!("addr: {}", action.addr),
39+
addr = action.addr.to_string(),
40+
listener_id = action.listener_id.to_string(),
41+
);
42+
}
43+
p2p::listen::P2pListenAction::Error(action) => {
44+
openmina_core::log::warn!(
45+
meta.time();
46+
kind = kind.to_string(),
47+
summary = format!("id: {}, error: {}", action.listener_id, action.error),
48+
error = action.error,
49+
listener_id = action.listener_id.to_string(),
50+
);
51+
}
52+
p2p::listen::P2pListenAction::Closed(action) => {
53+
if let Some(error) = &action.error {
54+
openmina_core::log::warn!(
55+
meta.time();
56+
kind = kind.to_string(),
57+
summary = format!("id: {}, error: {error}", action.listener_id),
58+
error = error,
59+
listener_id = action.listener_id.to_string(),
60+
);
61+
} else {
62+
openmina_core::log::info!(
63+
meta.time();
64+
kind = kind.to_string(),
65+
summary = format!("id: {},", action.listener_id),
66+
listener_id = action.listener_id.to_string(),
67+
);
68+
}
69+
}
70+
},
2471
P2pAction::Connection(action) => match action {
2572
P2pConnectionAction::Outgoing(action) => match action {
2673
P2pConnectionOutgoingAction::RandomInit(_) => {}

node/src/p2p/listen/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pub use ::p2p::listen::*;
2+
3+
mod p2p_listen_actions;
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
use p2p::listen::{P2pListenNewAction, P2pListenExpiredAction, P2pListenErrorAction, P2pListenClosedAction};
2+
use redux::EnablingCondition;
3+
4+
use crate::State;
5+
6+
impl EnablingCondition<State> for P2pListenNewAction {
7+
fn is_enabled(&self, #[allow(unused_variables)] state: &State) -> bool {
8+
self.is_enabled(&state.p2p)
9+
}
10+
}
11+
12+
impl EnablingCondition<State> for P2pListenExpiredAction {
13+
fn is_enabled(&self, #[allow(unused_variables)] state: &State) -> bool {
14+
self.is_enabled(&state.p2p)
15+
}
16+
}
17+
18+
impl EnablingCondition<State> for P2pListenErrorAction {
19+
fn is_enabled(&self, #[allow(unused_variables)] state: &State) -> bool {
20+
self.is_enabled(&state.p2p)
21+
}
22+
}
23+
24+
impl EnablingCondition<State> for P2pListenClosedAction {
25+
fn is_enabled(&self, #[allow(unused_variables)] state: &State) -> bool {
26+
self.is_enabled(&state.p2p)
27+
}
28+
}

node/src/p2p/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub mod channels;
44
pub mod connection;
55
pub mod disconnection;
66
pub mod discovery;
7+
pub mod listen;
78
pub mod peer;
89

910
mod p2p_effects;
@@ -46,6 +47,11 @@ macro_rules! impl_into_global_action {
4647
};
4748
}
4849

50+
impl_into_global_action!(listen::P2pListenNewAction);
51+
impl_into_global_action!(listen::P2pListenExpiredAction);
52+
impl_into_global_action!(listen::P2pListenErrorAction);
53+
impl_into_global_action!(listen::P2pListenClosedAction);
54+
4955
impl_into_global_action!(connection::outgoing::P2pConnectionOutgoingRandomInitAction);
5056
impl_into_global_action!(connection::outgoing::P2pConnectionOutgoingInitAction);
5157
impl_into_global_action!(connection::outgoing::P2pConnectionOutgoingReconnectAction);

node/src/p2p/p2p_effects.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,20 @@ pub fn p2p_effects<S: Service>(store: &mut Store<S>, action: P2pActionWithMeta)
6060
let (action, meta) = action.split();
6161

6262
match action {
63+
P2pAction::Listen(action) => match action {
64+
p2p::listen::P2pListenAction::New(action) => {
65+
action.effects(&meta, store);
66+
}
67+
p2p::listen::P2pListenAction::Expired(action) => {
68+
action.effects(&meta, store);
69+
}
70+
p2p::listen::P2pListenAction::Error(action) => {
71+
action.effects(&meta, store);
72+
}
73+
p2p::listen::P2pListenAction::Closed(action) => {
74+
action.effects(&meta, store);
75+
}
76+
},
6377
P2pAction::Connection(action) => match action {
6478
P2pConnectionAction::Outgoing(action) => match action {
6579
P2pConnectionOutgoingAction::RandomInit(action) => {

p2p/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub mod connection;
44
pub mod disconnection;
55
pub mod discovery;
66
pub mod peer;
7+
pub mod listen;
78

89
pub mod identity;
910
pub use identity::PeerId;

p2p/src/listen/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
mod p2p_listen_actions;
2+
pub use p2p_listen_actions::*;
3+
4+
pub mod p2p_listen_reducer;
5+
6+
pub mod p2p_listen_effects;

p2p/src/listen/p2p_listen_actions.rs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
use redux::EnablingCondition;
2+
use serde::{Serialize, Deserialize};
3+
4+
use crate::{P2pListenerId, P2pState, P2pAction};
5+
6+
pub type P2pListenActionWithMetaRef<'a> = redux::ActionWithMeta<&'a P2pListenAction>;
7+
8+
#[derive(derive_more::From, Serialize, Deserialize, Debug, Clone)]
9+
pub enum P2pListenAction {
10+
New(P2pListenNewAction),
11+
Expired(P2pListenExpiredAction),
12+
Error(P2pListenErrorAction),
13+
Closed(P2pListenClosedAction),
14+
}
15+
16+
#[derive(Serialize, Deserialize, Debug, Clone)]
17+
pub struct P2pListenNewAction {
18+
pub listener_id: P2pListenerId,
19+
pub addr: libp2p::Multiaddr,
20+
}
21+
22+
impl EnablingCondition<P2pState> for P2pListenNewAction {
23+
fn is_enabled(&self, #[allow(unused_variables)] state: &P2pState) -> bool {
24+
true
25+
}
26+
}
27+
28+
impl From<P2pListenNewAction> for P2pAction {
29+
fn from(value: P2pListenNewAction) -> Self {
30+
P2pListenAction::from(value).into()
31+
}
32+
}
33+
34+
#[derive(Serialize, Deserialize, Debug, Clone)]
35+
pub struct P2pListenExpiredAction {
36+
pub listener_id: P2pListenerId,
37+
pub addr: libp2p::Multiaddr,
38+
}
39+
40+
impl EnablingCondition<P2pState> for P2pListenExpiredAction {
41+
fn is_enabled(&self, #[allow(unused_variables)] state: &P2pState) -> bool {
42+
true
43+
}
44+
}
45+
46+
impl From<P2pListenExpiredAction> for P2pAction {
47+
fn from(value: P2pListenExpiredAction) -> Self {
48+
P2pListenAction::from(value).into()
49+
}
50+
}
51+
52+
#[derive(Serialize, Deserialize, Debug, Clone)]
53+
pub struct P2pListenErrorAction {
54+
pub listener_id: P2pListenerId,
55+
pub error: String,
56+
}
57+
58+
impl EnablingCondition<P2pState> for P2pListenErrorAction {
59+
fn is_enabled(&self, #[allow(unused_variables)] state: &P2pState) -> bool {
60+
true
61+
}
62+
}
63+
64+
impl From<P2pListenErrorAction> for P2pAction {
65+
fn from(value: P2pListenErrorAction) -> Self {
66+
P2pListenAction::from(value).into()
67+
}
68+
}
69+
70+
#[derive(Serialize, Deserialize, Debug, Clone)]
71+
pub struct P2pListenClosedAction {
72+
pub listener_id: P2pListenerId,
73+
pub error: Option<String>,
74+
}
75+
76+
impl EnablingCondition<P2pState> for P2pListenClosedAction {
77+
fn is_enabled(&self, #[allow(unused_variables)] state: &P2pState) -> bool {
78+
true
79+
}
80+
}
81+
82+
impl From<P2pListenClosedAction> for P2pAction {
83+
fn from(value: P2pListenClosedAction) -> Self {
84+
P2pListenAction::from(value).into()
85+
}
86+
}

0 commit comments

Comments
 (0)