Skip to content

Commit b321a87

Browse files
committed
feat(p2p/webrtc): polish discovery/signaling state machines
re: #772
1 parent 510b1b1 commit b321a87

File tree

6 files changed

+131
-64
lines changed

6 files changed

+131
-64
lines changed

p2p/src/channels/signaling/discovery/p2p_channels_signaling_discovery_actions.rs

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ impl P2pChannelsSignalingDiscoveryAction {
9999
}
100100

101101
impl redux::EnablingCondition<P2pState> for P2pChannelsSignalingDiscoveryAction {
102-
fn is_enabled(&self, state: &P2pState, _time: redux::Timestamp) -> bool {
102+
fn is_enabled(&self, state: &P2pState, now: redux::Timestamp) -> bool {
103103
match self {
104104
P2pChannelsSignalingDiscoveryAction::Init { peer_id } => {
105105
state.get_ready_peer(peer_id).map_or(false, |p| {
@@ -129,12 +129,21 @@ impl redux::EnablingCondition<P2pState> for P2pChannelsSignalingDiscoveryAction
129129
!state.already_has_max_peers()
130130
&& state.get_ready_peer(peer_id).map_or(false, |p| {
131131
match &p.channels.signaling.discovery {
132-
P2pChannelsSignalingDiscoveryState::Ready { local, .. } => matches!(
133-
local,
134-
SignalingDiscoveryState::WaitingForRequest { .. }
135-
| SignalingDiscoveryState::DiscoveredRejected { .. }
136-
| SignalingDiscoveryState::Answered { .. },
137-
),
132+
P2pChannelsSignalingDiscoveryState::Ready { local, .. } => {
133+
match local {
134+
SignalingDiscoveryState::WaitingForRequest { .. } => true,
135+
SignalingDiscoveryState::DiscoveredRejected {
136+
time, ..
137+
}
138+
| SignalingDiscoveryState::Answered { time, .. } => {
139+
// Allow one discovery request per minute.
140+
// TODO(binier): make configurable
141+
now.checked_sub(*time)
142+
.map_or(false, |dur| dur.as_secs() >= 60)
143+
}
144+
_ => false,
145+
}
146+
}
138147
_ => false,
139148
}
140149
})
@@ -147,14 +156,31 @@ impl redux::EnablingCondition<P2pState> for P2pChannelsSignalingDiscoveryAction
147156
}
148157
_ => false,
149158
}),
150-
P2pChannelsSignalingDiscoveryAction::DiscoveredSend { peer_id, .. } => state
151-
.get_ready_peer(peer_id)
152-
.map_or(false, |p| match &p.channels.signaling.discovery {
153-
P2pChannelsSignalingDiscoveryState::Ready { local, .. } => {
154-
matches!(local, SignalingDiscoveryState::DiscoveryRequested { .. })
155-
}
156-
_ => false,
157-
}),
159+
P2pChannelsSignalingDiscoveryAction::DiscoveredSend {
160+
peer_id,
161+
target_public_key,
162+
..
163+
} => {
164+
let target_peer_id = target_public_key.peer_id();
165+
let has_peer_requested_discovery =
166+
state.get_ready_peer(peer_id).map_or(false, |p| {
167+
match &p.channels.signaling.discovery {
168+
P2pChannelsSignalingDiscoveryState::Ready { local, .. } => {
169+
matches!(local, SignalingDiscoveryState::DiscoveryRequested { .. })
170+
}
171+
_ => false,
172+
}
173+
});
174+
let target_peer_already_discovering_them =
175+
state.get_ready_peer(&target_peer_id).map_or(false, |p| {
176+
p.channels.signaling.sent_discovered_peer_id() == Some(*peer_id)
177+
});
178+
has_peer_requested_discovery
179+
&& !target_peer_already_discovering_them
180+
&& state.ready_peers_iter().all(|(_, p)| {
181+
p.channels.signaling.sent_discovered_peer_id() != Some(target_peer_id)
182+
})
183+
}
158184
P2pChannelsSignalingDiscoveryAction::DiscoveredRejectReceived { peer_id, .. } => state
159185
.get_ready_peer(peer_id)
160186
.map_or(false, |p| match &p.channels.signaling.discovery {
@@ -190,6 +216,8 @@ impl redux::EnablingCondition<P2pState> for P2pChannelsSignalingDiscoveryAction
190216
),
191217
_ => false,
192218
}),
219+
// TODO(binier): constrain interval between these requests
220+
// to handle malicious peers.
193221
P2pChannelsSignalingDiscoveryAction::DiscoveryRequestSend { peer_id, .. } => state
194222
.get_ready_peer(peer_id)
195223
.map_or(false, |p| match &p.channels.signaling.discovery {

p2p/src/channels/signaling/discovery/p2p_channels_signaling_discovery_reducer.rs

Lines changed: 65 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ impl P2pChannelsSignalingDiscoveryState {
8383
P2pChannelsSignalingDiscoveryAction::DiscoveryRequestReceived { .. } => {
8484
let Self::Ready { local, .. } = state else {
8585
bug_condition!(
86-
"Invalid state for `P2pChannelsSignalingDiscoveryAction::OfferDecryptError`, state: {state:?}",
86+
"Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveryRequestReceived`, state: {state:?}",
8787
);
8888
return Ok(());
8989
};
@@ -92,7 +92,7 @@ impl P2pChannelsSignalingDiscoveryState {
9292

9393
let (dispatcher, state) = state_context.into_dispatcher_and_state();
9494
let state: &P2pState = state.substate()?;
95-
state.webrtc_discovery_respond_with_availble_peers(dispatcher);
95+
state.webrtc_discovery_respond_with_availble_peers(dispatcher, meta.time());
9696

9797
Ok(())
9898
}
@@ -101,7 +101,7 @@ impl P2pChannelsSignalingDiscoveryState {
101101
} => {
102102
let Self::Ready { local, .. } = state else {
103103
bug_condition!(
104-
"Invalid state for `P2pChannelsSignalingDiscoveryAction::OfferDecryptError`, state: {state:?}",
104+
"Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredSend`, state: {state:?}",
105105
);
106106
return Ok(());
107107
};
@@ -123,7 +123,7 @@ impl P2pChannelsSignalingDiscoveryState {
123123
P2pChannelsSignalingDiscoveryAction::DiscoveredRejectReceived { .. } => {
124124
let Self::Ready { local, .. } = state else {
125125
bug_condition!(
126-
"Invalid state for `P2pChannelsSignalingDiscoveryAction::OfferDecryptError`, state: {state:?}",
126+
"Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredRejectReceived`, state: {state:?}",
127127
);
128128
return Ok(());
129129
};
@@ -134,7 +134,7 @@ impl P2pChannelsSignalingDiscoveryState {
134134
} => target_public_key.clone(),
135135
state => {
136136
bug_condition!(
137-
"Invalid local state for `P2pChannelsSignalingDiscoveryAction::OfferDecryptError`, state: {state:?}",
137+
"Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredRejectReceived`, state: {state:?}",
138138
);
139139
return Ok(());
140140
}
@@ -145,12 +145,16 @@ impl P2pChannelsSignalingDiscoveryState {
145145
target_public_key,
146146
};
147147

148+
let (dispatcher, state) = state_context.into_dispatcher_and_state();
149+
let state: &P2pState = state.substate()?;
150+
state.webrtc_discovery_respond_with_availble_peers(dispatcher, meta.time());
151+
148152
Ok(())
149153
}
150154
P2pChannelsSignalingDiscoveryAction::DiscoveredAcceptReceived { offer, .. } => {
151155
let Self::Ready { local, .. } = state else {
152156
bug_condition!(
153-
"Invalid state for `P2pChannelsSignalingDiscoveryAction::OfferDecryptError`, state: {state:?}",
157+
"Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredAcceptReceived`, state: {state:?}",
154158
);
155159
return Ok(());
156160
};
@@ -161,7 +165,7 @@ impl P2pChannelsSignalingDiscoveryState {
161165
} => target_public_key.clone(),
162166
state => {
163167
bug_condition!(
164-
"Invalid local state for `P2pChannelsSignalingDiscoveryAction::OfferDecryptError`, state: {state:?}",
168+
"Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredAcceptReceived`, state: {state:?}",
165169
);
166170
return Ok(());
167171
}
@@ -183,7 +187,7 @@ impl P2pChannelsSignalingDiscoveryState {
183187
P2pChannelsSignalingDiscoveryAction::AnswerSend { answer, .. } => {
184188
let Self::Ready { local, .. } = state else {
185189
bug_condition!(
186-
"Invalid state for `P2pChannelsSignalingDiscoveryAction::OfferDecryptError`, state: {state:?}",
190+
"Invalid state for `P2pChannelsSignalingDiscoveryAction::AnswerSend`, state: {state:?}",
187191
);
188192
return Ok(());
189193
};
@@ -212,7 +216,7 @@ impl P2pChannelsSignalingDiscoveryState {
212216
P2pChannelsSignalingDiscoveryAction::DiscoveryRequestSend { .. } => {
213217
let Self::Ready { remote, .. } = state else {
214218
bug_condition!(
215-
"Invalid state for `P2pChannelsSignalingDiscoveryAction::OfferSend`, state: {state:?}",
219+
"Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveryRequestSend`, state: {state:?}",
216220
);
217221
return Ok(());
218222
};
@@ -231,7 +235,7 @@ impl P2pChannelsSignalingDiscoveryState {
231235
} => {
232236
let Self::Ready { remote, .. } = state else {
233237
bug_condition!(
234-
"Invalid state for `P2pChannelsSignalingDiscoveryAction::OfferSend`, state: {state:?}",
238+
"Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredReceived`, state: {state:?}",
235239
);
236240
return Ok(());
237241
};
@@ -240,27 +244,62 @@ impl P2pChannelsSignalingDiscoveryState {
240244
time: meta.time(),
241245
target_public_key: target_public_key.clone(),
242246
};
243-
let dispatcher = state_context.into_dispatcher();
244-
// TODO(binier): this action might not be enabled, in
245-
// which case we sshould be rejecting discovered peer.
246-
dispatcher.push(P2pConnectionOutgoingAction::Init {
247+
let (dispatcher, state) = state_context.into_dispatcher_and_state();
248+
let state: &P2pState = state.substate()?;
249+
let action = P2pConnectionOutgoingAction::Init {
247250
opts: P2pConnectionOutgoingInitOpts::WebRTC {
248251
peer_id: target_public_key.peer_id(),
249252
signaling: SignalingMethod::P2p {
250253
relay_peer_id: peer_id,
251254
},
252255
},
253256
rpc_id: None,
254-
});
257+
};
258+
let accepted = redux::EnablingCondition::is_enabled(&action, state, meta.time());
259+
if accepted {
260+
dispatcher.push(action);
261+
} else {
262+
dispatcher
263+
.push(P2pChannelsSignalingDiscoveryAction::DiscoveredReject { peer_id });
264+
}
255265
Ok(())
256266
}
257267
P2pChannelsSignalingDiscoveryAction::DiscoveredReject { .. } => {
258-
todo!("handle peer rejection")
268+
let Self::Ready { remote, .. } = state else {
269+
bug_condition!(
270+
"Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredReject`, state: {state:?}",
271+
);
272+
return Ok(());
273+
};
274+
275+
let target_public_key = match remote {
276+
SignalingDiscoveryState::Discovered {
277+
target_public_key, ..
278+
} => target_public_key.clone(),
279+
state => {
280+
bug_condition!(
281+
"Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredReject`, state: {state:?}",
282+
);
283+
return Ok(());
284+
}
285+
};
286+
287+
*remote = SignalingDiscoveryState::DiscoveredRejected {
288+
time: meta.time(),
289+
target_public_key,
290+
};
291+
let dispatcher = state_context.into_dispatcher();
292+
let message = SignalingDiscoveryChannelMsg::DiscoveredReject;
293+
dispatcher.push(P2pChannelsSignalingDiscoveryEffectfulAction::MessageSend {
294+
peer_id,
295+
message,
296+
});
297+
Ok(())
259298
}
260299
P2pChannelsSignalingDiscoveryAction::DiscoveredAccept { offer, .. } => {
261300
let Self::Ready { remote, .. } = state else {
262301
bug_condition!(
263-
"Invalid state for `P2pChannelsSignalingDiscoveryAction::OfferSend`, state: {state:?}",
302+
"Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredAccept`, state: {state:?}",
264303
);
265304
return Ok(());
266305
};
@@ -271,7 +310,7 @@ impl P2pChannelsSignalingDiscoveryState {
271310
} => target_public_key.clone(),
272311
state => {
273312
bug_condition!(
274-
"Invalid state for `P2pChannelsSignalingDiscoveryAction::OfferSend`, state: {state:?}",
313+
"Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredAccept`, state: {state:?}",
275314
);
276315
return Ok(());
277316
}
@@ -299,7 +338,7 @@ impl P2pChannelsSignalingDiscoveryState {
299338
P2pChannelsSignalingDiscoveryAction::AnswerReceived { answer, .. } => {
300339
let Self::Ready { remote, .. } = state else {
301340
bug_condition!(
302-
"Invalid state for `P2pChannelsSignalingDiscoveryAction::OfferSend`, state: {state:?}",
341+
"Invalid state for `P2pChannelsSignalingDiscoveryAction::AnswerReceived`, state: {state:?}",
303342
);
304343
return Ok(());
305344
};
@@ -310,7 +349,7 @@ impl P2pChannelsSignalingDiscoveryState {
310349
} => target_public_key.clone(),
311350
state => {
312351
bug_condition!(
313-
"Invalid remote state for `P2pChannelsSignalingDiscoveryAction::OfferDecryptError`, state: {state:?}",
352+
"Invalid state for `P2pChannelsSignalingDiscoveryAction::AnswerReceived`, state: {state:?}",
314353
);
315354
return Ok(());
316355
}
@@ -336,7 +375,7 @@ impl P2pChannelsSignalingDiscoveryState {
336375
P2pChannelsSignalingDiscoveryAction::AnswerDecrypted { answer, .. } => {
337376
let Self::Ready { remote, .. } = state else {
338377
bug_condition!(
339-
"Invalid state for `P2pChannelsSignalingDiscoveryAction::OfferSend`, state: {state:?}",
378+
"Invalid state for `P2pChannelsSignalingDiscoveryAction::AnswerDecrypted`, state: {state:?}",
340379
);
341380
return Ok(());
342381
};
@@ -347,15 +386,15 @@ impl P2pChannelsSignalingDiscoveryState {
347386
} => target_public_key.clone(),
348387
state => {
349388
bug_condition!(
350-
"Invalid remote state for `P2pChannelsSignalingDiscoveryAction::OfferDecryptError`, state: {state:?}",
389+
"Invalid state for `P2pChannelsSignalingDiscoveryAction::AnswerDecrypted`, state: {state:?}",
351390
);
352391
return Ok(());
353392
}
354393
};
355394

356395
*remote = SignalingDiscoveryState::Answered { time: meta.time() };
357396

358-
let dispatcher = state_context.into_dispatcher();
397+
let (dispatcher, state) = state_context.into_dispatcher_and_state();
359398
match answer {
360399
P2pConnectionResponse::Accepted(answer) => {
361400
dispatcher.push(P2pConnectionOutgoingAction::AnswerRecvSuccess {
@@ -382,6 +421,9 @@ impl P2pChannelsSignalingDiscoveryState {
382421
})
383422
}
384423
}
424+
425+
let state: &P2pState = state.substate()?;
426+
state.webrtc_discovery_respond_with_availble_peers(dispatcher, meta.time());
385427
Ok(())
386428
}
387429
}

p2p/src/channels/signaling/discovery_effectful/p2p_channels_signaling_discovery_effectful_effects.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ impl P2pChannelsSignalingDiscoveryEffectfulAction {
3232
pub_key,
3333
offer,
3434
} => match store.service().encrypt(&pub_key, &*offer) {
35-
Err(()) => todo!("Failed to encrypt webrtc offer. Handle it."),
35+
Err(()) => {
36+
// todo!("Failed to encrypt webrtc offer. Handle it.")
37+
}
3638
Ok(offer) => {
3739
let message = SignalingDiscoveryChannelMsg::DiscoveredAccept(offer);
3840
message_send(store.service(), peer_id, message);

p2p/src/channels/signaling/exchange/p2p_channels_signaling_exchange_reducer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ impl P2pChannelsSignalingExchangeState {
8888
} => {
8989
let Self::Ready { local, .. } = state else {
9090
bug_condition!(
91-
"Invalid state for `P2pChannelsSignalingExchangeAction::OfferDecryptError`, state: {state:?}",
91+
"Invalid state for `P2pChannelsSignalingExchangeAction::OfferReceived`, state: {state:?}",
9292
);
9393
return Ok(());
9494
};
@@ -185,7 +185,7 @@ impl P2pChannelsSignalingExchangeState {
185185

186186
let (dispatcher, state) = state_context.into_dispatcher_and_state();
187187
let state: &P2pState = state.substate()?;
188-
state.webrtc_discovery_respond_with_availble_peers(dispatcher);
188+
state.webrtc_discovery_respond_with_availble_peers(dispatcher, meta.time());
189189
Ok(())
190190
}
191191
P2pChannelsSignalingExchangeAction::OfferSend {

0 commit comments

Comments
 (0)