Skip to content

Commit fde0905

Browse files
committed
Review fixes
1 parent 3a52808 commit fde0905

File tree

7 files changed

+68
-30
lines changed

7 files changed

+68
-30
lines changed

p2p/src/connection/incoming/p2p_connection_incoming_reducer.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ impl P2pConnectionIncomingState {
6060
}),
6161
status: P2pPeerStatus::Connecting(P2pConnectionState::incoming_init(&opts)),
6262
identify: None,
63-
on_connect_success: None,
6463
});
6564

6665
state.status =
@@ -428,7 +427,6 @@ impl P2pConnectionIncomingState {
428427
)),
429428
status: P2pPeerStatus::Disconnected { time: meta.time() },
430429
identify: None,
431-
on_connect_success: None,
432430
});
433431

434432
Self::reduce_finalize_libp2p_pending(state, addr, time, my_id, peer_id);

p2p/src/connection/outgoing/p2p_connection_outgoing_reducer.rs

Lines changed: 55 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -58,18 +58,14 @@ impl P2pConnectionOutgoingState {
5858
&opts,
5959
)),
6060
identify: None,
61-
on_connect_success: None,
6261
});
6362

64-
if let Some(on_connect_success) = on_success {
65-
peer_state.on_connect_success = Some(on_connect_success);
66-
}
67-
6863
peer_state.status =
6964
P2pPeerStatus::Connecting(P2pConnectionState::Outgoing(Self::Init {
7065
time,
7166
opts: opts.clone(),
7267
rpc_id,
68+
on_success,
7369
}));
7470

7571
let dispatcher = state_context.into_dispatcher();
@@ -109,6 +105,7 @@ impl P2pConnectionOutgoingState {
109105
time,
110106
opts: opts.clone(),
111107
rpc_id,
108+
on_success: None,
112109
}));
113110

114111
let dispatcher = state_context.into_dispatcher();
@@ -142,11 +139,18 @@ impl P2pConnectionOutgoingState {
142139
.outgoing_peer_connection_mut(&peer_id)
143140
.ok_or("Missing connection state for: `P2pConnectionOutgoingAction::OfferSdpCreatePending`")?;
144141

145-
if let Self::Init { opts, rpc_id, .. } = state {
142+
if let Self::Init {
143+
opts,
144+
rpc_id,
145+
on_success,
146+
..
147+
} = state
148+
{
146149
*state = Self::OfferSdpCreatePending {
147150
time,
148151
opts: opts.clone(),
149152
rpc_id: rpc_id.take(),
153+
on_success: on_success.take(),
150154
};
151155
} else {
152156
bug_condition!("Invalid state for `P2pConnectionOutgoingAction::OfferSdpCreatePending`: {:?}", state);
@@ -168,12 +172,19 @@ impl P2pConnectionOutgoingState {
168172
.outgoing_peer_connection_mut(&peer_id)
169173
.ok_or("Missing peer connection for `P2pConnectionOutgoingAction::OfferSdpCreateSuccess`")?;
170174

171-
if let Self::OfferSdpCreatePending { opts, rpc_id, .. } = state {
175+
if let Self::OfferSdpCreatePending {
176+
opts,
177+
rpc_id,
178+
on_success,
179+
..
180+
} = state
181+
{
172182
*state = Self::OfferSdpCreateSuccess {
173183
time,
174184
opts: opts.clone(),
175185
sdp: sdp.clone(),
176186
rpc_id: rpc_id.take(),
187+
on_success: on_success.take(),
177188
};
178189
} else {
179190
bug_condition!("Invalid state for `P2pConnectionOutgoingAction::OfferSdpCreateSuccess`: {:?}", state);
@@ -198,7 +209,13 @@ impl P2pConnectionOutgoingState {
198209
.outgoing_peer_connection_mut(&peer_id)
199210
.ok_or("Invalid state for `P2pConnectionOutgoingAction::OfferReady`")?;
200211

201-
let Self::OfferSdpCreateSuccess { opts, rpc_id, .. } = state else {
212+
let Self::OfferSdpCreateSuccess {
213+
opts,
214+
rpc_id,
215+
on_success,
216+
..
217+
} = state
218+
else {
202219
bug_condition!(
203220
"Invalid state for `P2pConnectionOutgoingAction::OfferReady`: {:?}",
204221
state
@@ -211,6 +228,7 @@ impl P2pConnectionOutgoingState {
211228
opts: opts.clone(),
212229
offer: offer.clone(),
213230
rpc_id: rpc_id.take(),
231+
on_success: on_success.take(),
214232
};
215233

216234
let dispatcher = state_context.into_dispatcher();
@@ -243,6 +261,7 @@ impl P2pConnectionOutgoingState {
243261
opts,
244262
offer,
245263
rpc_id,
264+
on_success,
246265
..
247266
} = state
248267
{
@@ -251,6 +270,7 @@ impl P2pConnectionOutgoingState {
251270
opts: opts.clone(),
252271
offer: offer.clone(),
253272
rpc_id: rpc_id.take(),
273+
on_success: on_success.take(),
254274
};
255275
} else {
256276
bug_condition!(
@@ -272,6 +292,7 @@ impl P2pConnectionOutgoingState {
272292
opts,
273293
offer,
274294
rpc_id,
295+
on_success,
275296
..
276297
} = state
277298
{
@@ -280,6 +301,7 @@ impl P2pConnectionOutgoingState {
280301
opts: opts.clone(),
281302
offer: offer.clone(),
282303
rpc_id: rpc_id.take(),
304+
on_success: on_success.take(),
283305
};
284306
} else {
285307
bug_condition!(
@@ -317,6 +339,7 @@ impl P2pConnectionOutgoingState {
317339
opts,
318340
offer,
319341
rpc_id,
342+
on_success,
320343
..
321344
} = state
322345
{
@@ -326,6 +349,7 @@ impl P2pConnectionOutgoingState {
326349
offer: offer.clone(),
327350
answer: answer.clone(),
328351
rpc_id: rpc_id.take(),
352+
on_success: on_success.take(),
329353
};
330354
} else {
331355
bug_condition!(
@@ -345,13 +369,19 @@ impl P2pConnectionOutgoingState {
345369
.ok_or_else(|| format!("Invalid state: {:?}", action))?;
346370

347371
let (auth, other_pub_key) = match state {
348-
Self::Init { opts, rpc_id, .. } => {
372+
Self::Init {
373+
opts,
374+
rpc_id,
375+
on_success,
376+
..
377+
} => {
349378
*state = Self::FinalizePending {
350379
time,
351380
opts: opts.clone(),
352381
offer: None,
353382
answer: None,
354383
rpc_id: rpc_id.take(),
384+
on_success: on_success.take(),
355385
};
356386
return Ok(());
357387
}
@@ -360,6 +390,7 @@ impl P2pConnectionOutgoingState {
360390
offer,
361391
answer,
362392
rpc_id,
393+
on_success,
363394
..
364395
} => {
365396
let auth = offer.conn_auth(answer);
@@ -371,6 +402,7 @@ impl P2pConnectionOutgoingState {
371402
offer: Some(offer.clone()),
372403
answer: Some(answer.clone()),
373404
rpc_id: rpc_id.take(),
405+
on_success: on_success.take(),
374406
};
375407

376408
(auth, other_pub_key)
@@ -414,6 +446,7 @@ impl P2pConnectionOutgoingState {
414446
offer,
415447
answer,
416448
rpc_id,
449+
on_success,
417450
..
418451
} = state
419452
{
@@ -431,6 +464,7 @@ impl P2pConnectionOutgoingState {
431464
offer: offer.clone(),
432465
answer: answer.clone(),
433466
rpc_id: rpc_id.take(),
467+
on_success: on_success.take(),
434468
};
435469
values
436470
} else {
@@ -508,32 +542,30 @@ impl P2pConnectionOutgoingState {
508542
.outgoing_peer_connection_mut(&peer_id)
509543
.ok_or_else(|| format!("Invalid state: {:?}", action))?;
510544

511-
if let Self::FinalizeSuccess {
545+
let Self::FinalizeSuccess {
512546
offer,
513547
answer,
514548
rpc_id,
549+
on_success,
515550
..
516551
} = state
517-
{
518-
*state = Self::Success {
519-
time,
520-
offer: offer.clone(),
521-
answer: answer.clone(),
522-
rpc_id: rpc_id.take(),
523-
};
524-
} else {
552+
else {
525553
bug_condition!(
526554
"Invalid state for `P2pConnectionOutgoingAction::Success`: {:?}",
527555
state
528556
);
529557
return Ok(());
530-
}
558+
};
531559

532-
let Some(peer_state) = p2p_state.peers.get_mut(&peer_id) else {
533-
bug_condition!("Outgoing peer state not found for: {}", peer_id);
534-
return Ok(());
560+
let callback = on_success.take();
561+
562+
*state = Self::Success {
563+
time,
564+
offer: offer.clone(),
565+
answer: answer.clone(),
566+
rpc_id: rpc_id.take(),
535567
};
536-
let callback = peer_state.on_connect_success.take();
568+
537569
let (dispatcher, state) = state_context.into_dispatcher_and_state();
538570
let p2p_state: &P2pState = state.substate()?;
539571

p2p/src/connection/outgoing/p2p_connection_outgoing_state.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use redux::Timestamp;
1+
use redux::{Callback, Timestamp};
22
use serde::{Deserialize, Serialize};
33

44
use openmina_core::requests::RpcId;
55

6-
use crate::{connection::RejectionReason, webrtc, P2pTimeouts};
6+
use crate::{connection::RejectionReason, webrtc, P2pTimeouts, PeerId};
77

88
use super::P2pConnectionOutgoingInitOpts;
99

@@ -13,56 +13,65 @@ pub enum P2pConnectionOutgoingState {
1313
time: Timestamp,
1414
opts: P2pConnectionOutgoingInitOpts,
1515
rpc_id: Option<RpcId>,
16+
on_success: Option<Callback<(PeerId, Option<RpcId>)>>,
1617
},
1718
OfferSdpCreatePending {
1819
time: Timestamp,
1920
opts: P2pConnectionOutgoingInitOpts,
2021
rpc_id: Option<RpcId>,
22+
on_success: Option<Callback<(PeerId, Option<RpcId>)>>,
2123
},
2224
OfferSdpCreateSuccess {
2325
time: Timestamp,
2426
opts: P2pConnectionOutgoingInitOpts,
2527
sdp: String,
2628
rpc_id: Option<RpcId>,
29+
on_success: Option<Callback<(PeerId, Option<RpcId>)>>,
2730
},
2831
OfferReady {
2932
time: Timestamp,
3033
opts: P2pConnectionOutgoingInitOpts,
3134
offer: Box<webrtc::Offer>,
3235
rpc_id: Option<RpcId>,
36+
on_success: Option<Callback<(PeerId, Option<RpcId>)>>,
3337
},
3438
OfferSendSuccess {
3539
time: Timestamp,
3640
opts: P2pConnectionOutgoingInitOpts,
3741
offer: Box<webrtc::Offer>,
3842
rpc_id: Option<RpcId>,
43+
on_success: Option<Callback<(PeerId, Option<RpcId>)>>,
3944
},
4045
AnswerRecvPending {
4146
time: Timestamp,
4247
opts: P2pConnectionOutgoingInitOpts,
4348
offer: Box<webrtc::Offer>,
4449
rpc_id: Option<RpcId>,
50+
on_success: Option<Callback<(PeerId, Option<RpcId>)>>,
4551
},
4652
AnswerRecvSuccess {
4753
time: Timestamp,
4854
opts: P2pConnectionOutgoingInitOpts,
4955
offer: Box<webrtc::Offer>,
5056
answer: Box<webrtc::Answer>,
5157
rpc_id: Option<RpcId>,
58+
on_success: Option<Callback<(PeerId, Option<RpcId>)>>,
5259
},
5360
FinalizePending {
5461
time: Timestamp,
5562
opts: P2pConnectionOutgoingInitOpts,
5663
offer: Option<Box<webrtc::Offer>>,
5764
answer: Option<Box<webrtc::Answer>>,
5865
rpc_id: Option<RpcId>,
66+
on_success: Option<Callback<(PeerId, Option<RpcId>)>>,
5967
},
6068
FinalizeSuccess {
6169
time: Timestamp,
6270
opts: P2pConnectionOutgoingInitOpts,
6371
offer: Option<Box<webrtc::Offer>>,
6472
answer: Option<Box<webrtc::Answer>>,
6573
rpc_id: Option<RpcId>,
74+
on_success: Option<Callback<(PeerId, Option<RpcId>)>>,
6675
},
6776
Error {
6877
time: Timestamp,

p2p/src/connection/p2p_connection_state.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ impl P2pConnectionState {
2020
time: Timestamp::ZERO,
2121
opts: opts.clone(),
2222
rpc_id: None,
23+
on_success: None,
2324
})
2425
}
2526

p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ impl P2pNetworkPubsubState {
137137
return Ok(());
138138
};
139139

140+
// TODO: try to reuse data instead of cloning all message
140141
let messages = state.incoming_messages.clone();
141142
dispatcher.push(P2pNetworkPubsubEffectfulAction::IncomingData {
142143
peer_id,

p2p/src/p2p_state.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ impl P2pState {
102102
time: Timestamp::ZERO,
103103
},
104104
identify: None,
105-
on_connect_success: None,
106105
},
107106
)
108107
})
@@ -362,7 +361,6 @@ pub struct P2pPeerState {
362361
pub dial_opts: Option<P2pConnectionOutgoingInitOpts>,
363362
pub status: P2pPeerStatus,
364363
pub identify: Option<P2pNetworkIdentify>,
365-
pub on_connect_success: Option<Callback<(PeerId, Option<RpcId>)>>,
366364
}
367365

368366
impl P2pPeerState {

p2p/src/peer/p2p_peer_reducer.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ impl P2pPeerState {
3131
status: P2pPeerStatus::Disconnected {
3232
time: Timestamp::ZERO,
3333
},
34-
on_connect_success: None,
3534
});
3635

3736
if let Some(dial_opts) = dial_opts {

0 commit comments

Comments
 (0)