Skip to content

Commit 41f37b3

Browse files
committed
Ported scheduler
1 parent 6abef38 commit 41f37b3

18 files changed

+500
-293
lines changed

node/src/action_kind.rs

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use crate::p2p::network::pubsub::pubsub_effectful::P2pNetworkPubsubEffectfulActi
4949
use crate::p2p::network::pubsub::P2pNetworkPubsubAction;
5050
use crate::p2p::network::rpc::P2pNetworkRpcAction;
5151
use crate::p2p::network::scheduler::P2pNetworkSchedulerAction;
52+
use crate::p2p::network::scheduler_effectful::P2pNetworkSchedulerEffectfulAction;
5253
use crate::p2p::network::select::P2pNetworkSelectAction;
5354
use crate::p2p::network::yamux::P2pNetworkYamuxAction;
5455
use crate::p2p::network::P2pNetworkAction;
@@ -331,9 +332,7 @@ pub enum ActionKind {
331332
P2pNetworkSchedulerDisconnect,
332333
P2pNetworkSchedulerDisconnected,
333334
P2pNetworkSchedulerError,
334-
P2pNetworkSchedulerIncomingConnectionIsReady,
335335
P2pNetworkSchedulerIncomingDataDidReceive,
336-
P2pNetworkSchedulerIncomingDataIsReady,
337336
P2pNetworkSchedulerIncomingDidAccept,
338337
P2pNetworkSchedulerInterfaceDetected,
339338
P2pNetworkSchedulerInterfaceExpired,
@@ -345,8 +344,17 @@ pub enum ActionKind {
345344
P2pNetworkSchedulerPruneStream,
346345
P2pNetworkSchedulerPruneStreams,
347346
P2pNetworkSchedulerSelectDone,
348-
P2pNetworkSchedulerSelectError,
349347
P2pNetworkSchedulerYamuxDidInit,
348+
P2pNetworkSchedulerEffectfulDisconnect,
349+
P2pNetworkSchedulerEffectfulError,
350+
P2pNetworkSchedulerEffectfulIncomingConnectionIsReady,
351+
P2pNetworkSchedulerEffectfulIncomingDataIsReady,
352+
P2pNetworkSchedulerEffectfulIncomingDidAccept,
353+
P2pNetworkSchedulerEffectfulInterfaceDetected,
354+
P2pNetworkSchedulerEffectfulOutgoingConnect,
355+
P2pNetworkSchedulerEffectfulOutgoingDidConnect,
356+
P2pNetworkSchedulerEffectfulSelectDone,
357+
P2pNetworkSchedulerEffectfulSelectError,
350358
P2pNetworkSelectIncomingData,
351359
P2pNetworkSelectIncomingDataAuth,
352360
P2pNetworkSelectIncomingDataMux,
@@ -548,7 +556,7 @@ pub enum ActionKind {
548556
}
549557

550558
impl ActionKind {
551-
pub const COUNT: u16 = 454;
559+
pub const COUNT: u16 = 461;
552560
}
553561

554562
impl std::fmt::Display for ActionKind {
@@ -946,6 +954,7 @@ impl ActionKindGet for P2pNetworkAction {
946954
fn kind(&self) -> ActionKind {
947955
match self {
948956
Self::Scheduler(a) => a.kind(),
957+
Self::SchedulerEffectful(a) => a.kind(),
949958
Self::Pnet(a) => a.kind(),
950959
Self::Select(a) => a.kind(),
951960
Self::Noise(a) => a.kind(),
@@ -1371,18 +1380,13 @@ impl ActionKindGet for P2pNetworkSchedulerAction {
13711380
Self::InterfaceExpired { .. } => ActionKind::P2pNetworkSchedulerInterfaceExpired,
13721381
Self::ListenerReady { .. } => ActionKind::P2pNetworkSchedulerListenerReady,
13731382
Self::ListenerError { .. } => ActionKind::P2pNetworkSchedulerListenerError,
1374-
Self::IncomingConnectionIsReady { .. } => {
1375-
ActionKind::P2pNetworkSchedulerIncomingConnectionIsReady
1376-
}
13771383
Self::IncomingDidAccept { .. } => ActionKind::P2pNetworkSchedulerIncomingDidAccept,
13781384
Self::OutgoingConnect { .. } => ActionKind::P2pNetworkSchedulerOutgoingConnect,
13791385
Self::OutgoingDidConnect { .. } => ActionKind::P2pNetworkSchedulerOutgoingDidConnect,
1380-
Self::IncomingDataIsReady { .. } => ActionKind::P2pNetworkSchedulerIncomingDataIsReady,
13811386
Self::IncomingDataDidReceive { .. } => {
13821387
ActionKind::P2pNetworkSchedulerIncomingDataDidReceive
13831388
}
13841389
Self::SelectDone { .. } => ActionKind::P2pNetworkSchedulerSelectDone,
1385-
Self::SelectError { .. } => ActionKind::P2pNetworkSchedulerSelectError,
13861390
Self::YamuxDidInit { .. } => ActionKind::P2pNetworkSchedulerYamuxDidInit,
13871391
Self::Disconnect { .. } => ActionKind::P2pNetworkSchedulerDisconnect,
13881392
Self::Error { .. } => ActionKind::P2pNetworkSchedulerError,
@@ -1394,6 +1398,33 @@ impl ActionKindGet for P2pNetworkSchedulerAction {
13941398
}
13951399
}
13961400

1401+
impl ActionKindGet for P2pNetworkSchedulerEffectfulAction {
1402+
fn kind(&self) -> ActionKind {
1403+
match self {
1404+
Self::InterfaceDetected { .. } => {
1405+
ActionKind::P2pNetworkSchedulerEffectfulInterfaceDetected
1406+
}
1407+
Self::IncomingConnectionIsReady { .. } => {
1408+
ActionKind::P2pNetworkSchedulerEffectfulIncomingConnectionIsReady
1409+
}
1410+
Self::IncomingDidAccept { .. } => {
1411+
ActionKind::P2pNetworkSchedulerEffectfulIncomingDidAccept
1412+
}
1413+
Self::OutgoingConnect { .. } => ActionKind::P2pNetworkSchedulerEffectfulOutgoingConnect,
1414+
Self::OutgoingDidConnect { .. } => {
1415+
ActionKind::P2pNetworkSchedulerEffectfulOutgoingDidConnect
1416+
}
1417+
Self::IncomingDataIsReady { .. } => {
1418+
ActionKind::P2pNetworkSchedulerEffectfulIncomingDataIsReady
1419+
}
1420+
Self::SelectDone { .. } => ActionKind::P2pNetworkSchedulerEffectfulSelectDone,
1421+
Self::SelectError { .. } => ActionKind::P2pNetworkSchedulerEffectfulSelectError,
1422+
Self::Disconnect { .. } => ActionKind::P2pNetworkSchedulerEffectfulDisconnect,
1423+
Self::Error { .. } => ActionKind::P2pNetworkSchedulerEffectfulError,
1424+
}
1425+
}
1426+
}
1427+
13971428
impl ActionKindGet for P2pNetworkPnetAction {
13981429
fn kind(&self) -> ActionKind {
13991430
match self {

node/src/event_source/event_source_effects.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use p2p::channels::snark::P2pChannelsSnarkAction;
22
use p2p::channels::streaming_rpc::P2pChannelsStreamingRpcAction;
33
use p2p::channels::transaction::P2pChannelsTransactionAction;
4+
use p2p::P2pNetworkSchedulerEffectfulAction;
45
use snark::user_command_verify::{SnarkUserCommandVerifyAction, SnarkUserCommandVerifyError};
56

67
use crate::action::CheckTimeoutsAction;
@@ -73,9 +74,11 @@ pub fn event_source_effects<S: Service>(store: &mut Store<S>, action: EventSourc
7374
.dispatch(P2pNetworkSchedulerAction::ListenerError { listener, error });
7475
}
7576
MioEvent::IncomingConnectionIsReady { listener } => {
76-
store.dispatch(P2pNetworkSchedulerAction::IncomingConnectionIsReady {
77-
listener,
78-
});
77+
store.dispatch(
78+
P2pNetworkSchedulerEffectfulAction::IncomingConnectionIsReady {
79+
listener,
80+
},
81+
);
7982
}
8083
MioEvent::IncomingConnectionDidAccept(addr, result) => {
8184
store.dispatch(P2pNetworkSchedulerAction::IncomingDidAccept {
@@ -90,7 +93,9 @@ pub fn event_source_effects<S: Service>(store: &mut Store<S>, action: EventSourc
9093
});
9194
}
9295
MioEvent::IncomingDataIsReady(addr) => {
93-
store.dispatch(P2pNetworkSchedulerAction::IncomingDataIsReady { addr });
96+
store.dispatch(P2pNetworkSchedulerEffectfulAction::IncomingDataIsReady {
97+
addr,
98+
});
9499
}
95100
MioEvent::IncomingDataDidReceive(addr, result) => {
96101
store.dispatch(P2pNetworkSchedulerAction::IncomingDataDidReceive {

node/src/logger/logger_effects.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ pub fn logger_effects<S: Service>(store: &Store<S>, action: ActionWithMetaRef<'_
6868
P2pAction::Peer(action) => action.action_event(&context),
6969
P2pAction::Network(action) => match action {
7070
P2pNetworkAction::Scheduler(action) => action.action_event(&context),
71+
P2pNetworkAction::SchedulerEffectful(action) => action.action_event(&context),
7172
P2pNetworkAction::Pnet(action) => action.action_event(&context),
7273
P2pNetworkAction::Select(action) => action.action_event(&context),
7374
P2pNetworkAction::Noise(action) => action.action_event(&context),

node/src/p2p/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,5 +112,6 @@ impl_into_global_action!(p2p::P2pNetworkSelectAction);
112112
impl_into_global_action!(p2p::P2pNetworkPnetAction);
113113
impl_into_global_action!(p2p::P2pNetworkNoiseAction);
114114
impl_into_global_action!(p2p::P2pNetworkRpcAction);
115+
impl_into_global_action!(p2p::P2pNetworkSchedulerEffectfulAction);
115116

116117
impl p2p::P2pActionTrait<crate::State> for crate::Action {}

node/src/p2p/network/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,9 @@ impl redux::EnablingCondition<crate::State> for P2pNetworkPubsubAction {
7777
state.p2p.is_enabled(self, time)
7878
}
7979
}
80+
81+
impl redux::EnablingCondition<crate::State> for P2pNetworkSchedulerEffectfulAction {
82+
fn is_enabled(&self, state: &crate::State, time: redux::Timestamp) -> bool {
83+
state.p2p.is_enabled(self, time)
84+
}
85+
}

p2p/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,5 +115,6 @@ pub trait P2pActionTrait<State>:
115115
+ From<P2pNetworkRpcAction>
116116
+ From<P2pChannelsRpcAction>
117117
+ From<P2pDisconnectionAction>
118+
+ From<P2pNetworkSchedulerEffectfulAction>
118119
{
119120
}

p2p/src/network/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ mod p2p_network_effects;
2222
pub mod scheduler;
2323
pub use self::scheduler::*;
2424

25+
pub mod scheduler_effectful;
26+
pub use self::scheduler_effectful::*;
27+
2528
pub mod pnet;
2629
pub use self::pnet::*;
2730

p2p/src/network/p2p_network_actions.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@ use serde::{Deserialize, Serialize};
33

44
use super::{
55
identify::*, kad::*, noise::*, pnet::*, pubsub::*, rpc::*, scheduler::*, select::*, yamux::*,
6+
P2pNetworkSchedulerEffectfulAction,
67
};
78

89
use crate::P2pState;
910

1011
#[derive(derive_more::From, Serialize, Deserialize, Debug, Clone, ActionEvent)]
1112
pub enum P2pNetworkAction {
1213
Scheduler(P2pNetworkSchedulerAction),
14+
SchedulerEffectful(P2pNetworkSchedulerEffectfulAction),
1315
Pnet(P2pNetworkPnetAction),
1416
Select(P2pNetworkSelectAction),
1517
Noise(P2pNetworkNoiseAction),
@@ -25,6 +27,7 @@ impl redux::EnablingCondition<P2pState> for P2pNetworkAction {
2527
fn is_enabled(&self, state: &P2pState, time: redux::Timestamp) -> bool {
2628
match self {
2729
Self::Scheduler(v) => v.is_enabled(state, time),
30+
Self::SchedulerEffectful(v) => v.is_enabled(state, time),
2831
Self::Pnet(v) => v.is_enabled(state, time),
2932
Self::Select(v) => v.is_enabled(state, time),
3033
Self::Noise(v) => v.is_enabled(state, time),

p2p/src/network/p2p_network_effects.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ impl P2pNetworkAction {
99
Store::Service: P2pMioService + P2pCryptoService + P2pNetworkService,
1010
{
1111
match self {
12-
P2pNetworkAction::Scheduler(v) => v.effects(meta, store),
1312
P2pNetworkAction::Pnet(v) => v.effects(meta, store),
1413
P2pNetworkAction::Identify(v) => match v.effects(meta, store) {
1514
Ok(_) => {}
@@ -20,9 +19,11 @@ impl P2pNetworkAction {
2019
| P2pNetworkAction::Yamux(_)
2120
| P2pNetworkAction::Kad(_)
2221
| P2pNetworkAction::Pubsub(_)
23-
| P2pNetworkAction::Rpc(_) => {
22+
| P2pNetworkAction::Rpc(_)
23+
| P2pNetworkAction::Scheduler(_) => {
2424
// handled by reducer
2525
}
26+
P2pNetworkAction::SchedulerEffectful(v) => v.effects(meta, store),
2627
P2pNetworkAction::PubsubEffectful(v) => v.effects(meta, store),
2728
}
2829
}

p2p/src/network/p2p_network_reducer.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,16 @@ impl P2pNetworkState {
1818

1919
let (action, meta) = action.split();
2020
match action {
21-
P2pNetworkAction::Scheduler(a) => {
22-
state.scheduler.reducer(meta.with_action(a));
23-
Ok(())
24-
}
2521
P2pNetworkAction::Pnet(a) => {
2622
if let Some(cn) = state.scheduler.connections.get_mut(a.addr()) {
2723
cn.pnet.reducer(meta.with_action(a))
2824
}
2925
Ok(())
3026
}
27+
P2pNetworkAction::Scheduler(a) => P2pNetworkSchedulerState::reducer(
28+
Substate::from_compatible_substate(state_context),
29+
meta.with_action(a),
30+
),
3131
P2pNetworkAction::Select(a) => P2pNetworkSelectState::reducer(
3232
Substate::from_compatible_substate(state_context),
3333
meta.with_action(a),
@@ -54,15 +54,15 @@ impl P2pNetworkState {
5454
Substate::from_compatible_substate(state_context),
5555
meta.with_action(a),
5656
),
57-
P2pNetworkAction::PubsubEffectful(_) => {
58-
// Effectful action; no reducer
59-
Ok(())
60-
}
6157
P2pNetworkAction::Rpc(a) => P2pNetworkRpcState::reducer(
6258
Substate::from_compatible_substate(state_context),
6359
meta.with_action(a),
6460
limits,
6561
),
62+
P2pNetworkAction::PubsubEffectful(_) | P2pNetworkAction::SchedulerEffectful(_) => {
63+
// Effectful action; no reducer
64+
Ok(())
65+
}
6666
}
6767
}
6868

0 commit comments

Comments
 (0)