Skip to content

Commit bd1d506

Browse files
committed
refactor(callbacks): .
1 parent 639a656 commit bd1d506

File tree

13 files changed

+335
-310
lines changed

13 files changed

+335
-310
lines changed

crates/rtc-manager/src/entities/publisher.rs

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ use tokio_util::sync::CancellationToken;
1717
use crate::{
1818
errors::RtcError,
1919
models::{
20+
callbacks::{IceCandidateHandler, JoinedHandler},
2021
connection_type::ConnectionType,
21-
params::{IceCandidateCallback, JoinedCallback},
2222
streaming_protocol::StreamingProtocol,
2323
},
2424
};
@@ -40,26 +40,30 @@ pub struct Publisher {
4040
pub streaming_protocol: StreamingProtocol,
4141
// pub track_event_sender: Option<mpsc::UnboundedSender<TrackSubscribedMessage>>,
4242
// pub track_event_receiver: Option<mpsc::UnboundedReceiver<TrackSubscribedMessage>>,
43-
pub ice_candidate_callback: Option<IceCandidateCallback>,
44-
pub joined_callback: Option<JoinedCallback>,
43+
// pub ice_handler: Option<I>,
44+
// pub joined_handler: Option<J>,
4545
// Map incoming mid -> kind, populated on MediaAdded
4646
pub incoming_mid_kind: Arc<DashMap<Mid, MediaKind>>,
4747
// Throttle keyframe requests per mid
4848
pub last_kf_req: Arc<DashMap<Mid, Instant>>,
4949
}
5050

5151
impl Publisher {
52-
pub async fn new(
52+
pub fn new<I, J>(
5353
participant_id: String,
5454
room_id: String,
5555
connection_type: ConnectionType,
5656
is_video_enabled: bool,
5757
is_audio_enabled: bool,
5858
is_e2ee_enabled: bool,
5959
streaming_protocol: StreamingProtocol,
60-
ice_candidate_callback: IceCandidateCallback,
61-
joined_callback: JoinedCallback,
62-
) -> Result<Arc<Self>, RtcError> {
60+
ice_handler: I,
61+
joined_handler: J,
62+
) -> Result<Arc<Self>, RtcError>
63+
where
64+
I: IceCandidateHandler,
65+
J: JoinedHandler + Clone,
66+
{
6367
// Create str0m RTC instance
6468
let rtc = Rtc::builder().build();
6569

@@ -82,8 +86,8 @@ impl Publisher {
8286
streaming_protocol,
8387
// track_event_sender: Some(track_event_sender),
8488
// track_event_receiver: Some(track_event_receiver),
85-
ice_candidate_callback: Some(ice_candidate_callback),
86-
joined_callback: Some(joined_callback),
89+
// ice_handler: Some(ice_handler),
90+
// joined_handler: Some(joined_handler),
8791
incoming_mid_kind: Arc::new(DashMap::new()),
8892
last_kf_req: Arc::new(DashMap::new()),
8993
});
@@ -99,22 +103,23 @@ impl Publisher {
99103
.ok();
100104

101105
// Announce host candidate to the client via callback
102-
if let (Some(cb), Some(host)) = (
103-
publisher.ice_candidate_callback.as_ref(),
104-
RtcUdpRuntime::global().host_candidate(),
105-
) {
106+
if let (Some(cb), Some(host)) =
107+
(Some(ice_handler), RtcUdpRuntime::global().host_candidate())
108+
{
106109
let ice = crate::utils::ice_utils::IceUtils::convert_from_str0m_candidate(
107110
&host,
108111
Some("0".to_string()),
109112
Some(0),
110113
);
111-
(cb)(ice).await;
114+
cb.handle_candidate(ice);
112115
}
113116

114117
// Start the RTC event loop (events from runtime)
115118
let publisher_clone = Arc::clone(&publisher);
116119
tokio::spawn(async move {
117-
publisher_clone.run_event_loop(event_rx).await;
120+
publisher_clone
121+
.run_event_loop(event_rx, joined_handler)
122+
.await;
118123
});
119124

120125
Ok(publisher)
@@ -206,22 +211,26 @@ impl Publisher {
206211
self.tracks.clear();
207212
}
208213

209-
async fn run_event_loop(self: Arc<Self>, mut rx: UnboundedReceiver<Event>) {
214+
async fn run_event_loop<J>(self: Arc<Self>, mut rx: UnboundedReceiver<Event>, joined_handler: J)
215+
where
216+
J: JoinedHandler + Clone,
217+
{
210218
while let Some(event) = rx.recv().await {
211-
self.handle_rtc_event(event).await;
219+
self.handle_rtc_event(event, joined_handler.clone()).await;
212220
if self.cancel_token.is_cancelled() {
213221
break;
214222
}
215223
}
216224
}
217225

218-
async fn handle_rtc_event(&self, event: Event) {
226+
async fn handle_rtc_event<J>(&self, event: Event, joined_handler: J)
227+
where
228+
J: JoinedHandler,
229+
{
219230
match event {
220231
Event::Connected => {
221232
tracing::info!("Publisher {} connected", self.participant_id);
222-
if let Some(callback) = &self.joined_callback {
223-
(callback)(true).await;
224-
}
233+
joined_handler.handle_joined(true);
225234
}
226235
Event::IceConnectionStateChange(state) => {
227236
tracing::debug!("ICE connection state changed: {:?}", state);

crates/rtc-manager/src/entities/room.rs

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ use crate::{
88
entities::{publisher::Publisher, subscriber::Subscriber},
99
errors::RtcError,
1010
models::{
11+
callbacks::{IceCandidateHandler, JoinedHandler, RenegotiationHandler},
1112
connection_type::ConnectionType,
12-
params::{
13-
IceCandidate, JoinRoomParams, JoinRoomResponse, RtcManagerConfigs,
14-
SubscribeHlsLiveStreamParams, SubscribeHlsLiveStreamResponse, SubscribeParams,
15-
SubscribeResponse,
13+
rtc_dto::{
14+
IceCandidate, JoinRoomParameters, JoinRoomResponse, SubscribeHlsLiveStreamParams,
15+
SubscribeHlsLiveStreamResponse, SubscribeParameters, SubscribeResponse,
1616
},
1717
},
1818
};
@@ -22,24 +22,25 @@ pub struct Room {
2222
pub room_id: String,
2323
publishers: Arc<DashMap<String, Arc<Publisher>>>,
2424
subscribers: Arc<DashMap<String, Arc<Subscriber>>>,
25-
config: RtcManagerConfigs,
2625
}
2726

2827
impl Room {
29-
pub fn new(room_id: String, config: RtcManagerConfigs) -> Self {
28+
pub fn new(room_id: String) -> Self {
3029
Self {
3130
room_id,
3231
publishers: Arc::new(DashMap::new()),
3332
subscribers: Arc::new(DashMap::new()),
34-
config,
3533
}
3634
}
3735

38-
pub async fn join_room(
36+
pub fn join_room<I, J>(
3937
&mut self,
40-
params: JoinRoomParams,
41-
_room_id: &str,
42-
) -> Result<Option<JoinRoomResponse>, RtcError> {
38+
params: JoinRoomParameters<I, J>,
39+
) -> Result<Option<JoinRoomResponse>, RtcError>
40+
where
41+
I: IceCandidateHandler,
42+
J: JoinedHandler + Clone,
43+
{
4344
let participant_id = params.participant_id.clone();
4445

4546
info!("participant joining {}", participant_id.clone());
@@ -53,10 +54,9 @@ impl Room {
5354
params.is_audio_enabled,
5455
params.is_e2ee_enabled,
5556
params.streaming_protocol,
56-
params.on_candidate,
57-
params.callback,
58-
)
59-
.await?;
57+
params.ice_handler,
58+
params.joined_handler,
59+
)?;
6060

6161
info!("publisher created");
6262

@@ -90,10 +90,14 @@ impl Room {
9090
}
9191
}
9292

93-
pub async fn subscribe(
93+
pub fn subscribe<I, R>(
9494
&mut self,
95-
params: SubscribeParams,
96-
) -> Result<SubscribeResponse, RtcError> {
95+
params: SubscribeParameters<I, R>,
96+
) -> Result<SubscribeResponse, RtcError>
97+
where
98+
I: IceCandidateHandler,
99+
R: RenegotiationHandler,
100+
{
97101
let target_id = params.target_id.clone();
98102
let participant_id = params.participant_id.clone();
99103

@@ -108,10 +112,9 @@ impl Room {
108112
let subscriber = Subscriber::new(
109113
participant_id.clone(),
110114
target_id.clone(),
111-
params.on_candidate,
112-
params.on_negotiation_needed,
113-
)
114-
.await?;
115+
params.ice_handler,
116+
params.renegotiation_handler,
117+
)?;
115118

116119
// Add subscriber to publisher's subscriber list
117120
publisher.add_subscriber(participant_id.clone(), subscriber.clone());
@@ -132,7 +135,7 @@ impl Room {
132135
is_screen_sharing: false, // TODO: Get from publisher state
133136
is_hand_raising: false, // TODO: Get from publisher state
134137
is_e2ee_enabled: false, // TODO: Get from publisher state
135-
video_codec: "H264".to_string(), // TODO: Get from publisher
138+
video_codec: "H264".to_string(), // TODO: Get from publisher
136139
screen_track_id: "".to_string(), // TODO: Get from publisher if screen sharing
137140
})
138141
}

crates/rtc-manager/src/entities/subscriber.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use tokio_util::sync::CancellationToken;
1616
use crate::{
1717
errors::RtcError,
1818
models::{
19-
params::{IceCandidateCallback, RenegotiationCallback},
19+
callbacks::{IceCandidateHandler, RenegotiationHandler},
2020
quality::TrackQuality,
2121
},
2222
};
@@ -46,18 +46,22 @@ pub struct Subscriber {
4646
pub client_requested_quality: Arc<RwLock<Option<TrackQuality>>>,
4747
pub send_video_mid: Arc<RwLock<Option<Mid>>>,
4848
pub send_audio_mid: Arc<RwLock<Option<Mid>>>,
49-
pub ice_candidate_callback: Option<IceCandidateCallback>,
50-
pub renegotiation_callback: Option<RenegotiationCallback>,
49+
// pub ice_handler: Option<I>,
50+
// pub renegotiation_handler: Option<R>,
5151
pending: Arc<RwLock<Option<SdpPendingOffer>>>,
5252
}
5353

5454
impl Subscriber {
55-
pub async fn new(
55+
pub fn new<I, R>(
5656
participant_id: String,
5757
target_id: String,
58-
ice_candidate_callback: IceCandidateCallback,
59-
renegotiation_callback: RenegotiationCallback,
60-
) -> Result<Arc<Self>, RtcError> {
58+
ice_handler: I,
59+
_renegotiation_handler: R,
60+
) -> Result<Arc<Self>, RtcError>
61+
where
62+
I: IceCandidateHandler,
63+
R: RenegotiationHandler,
64+
{
6165
// Create str0m RTC instance
6266
let rtc = Rtc::builder().build();
6367

@@ -75,8 +79,6 @@ impl Subscriber {
7579
client_requested_quality: Arc::new(RwLock::new(None)),
7680
send_video_mid: Arc::new(RwLock::new(None)),
7781
send_audio_mid: Arc::new(RwLock::new(None)),
78-
ice_candidate_callback: Some(ice_candidate_callback),
79-
renegotiation_callback: Some(renegotiation_callback),
8082
pending: Arc::new(RwLock::new(None)),
8183
});
8284

@@ -92,15 +94,15 @@ impl Subscriber {
9294

9395
// Announce host candidate to the client via callback
9496
if let (Some(cb), Some(host)) = (
95-
subscriber.ice_candidate_callback.as_ref(),
97+
Some(ice_handler),
9698
RtcUdpRuntime::global().host_candidate(),
9799
) {
98100
let ice = crate::utils::ice_utils::IceUtils::convert_from_str0m_candidate(
99101
&host,
100102
Some("0".to_string()),
101103
Some(0),
102104
);
103-
(cb)(ice).await;
105+
cb.handle_candidate(ice);
104106
}
105107

106108
// Start the RTC event loop
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
use crate::models::rtc_dto::IceCandidate;
2+
3+
pub trait IceCandidateHandler: Send + Sync + 'static {
4+
fn handle_candidate(&self, candidate: IceCandidate);
5+
}
6+
7+
pub trait JoinedHandler: Send + Sync + 'static {
8+
fn handle_joined(&self, is_migrate: bool);
9+
}
10+
11+
pub trait RenegotiationHandler: Send + Sync + 'static {
12+
fn handle_renegotiation(&self, sdp: String);
13+
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
pub mod callbacks;
12
pub mod connection_type;
2-
pub mod params;
33
pub mod quality;
4+
pub mod rtc_dto;
45
pub mod rtp_forward_info;
56
pub mod streaming_protocol;
Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,15 @@
1-
use std::{pin::Pin, sync::Arc, future::Future};
21
use parking_lot::RwLock;
32
use serde::Serialize;
3+
use std::sync::Arc;
44

5-
use crate::models::streaming_protocol::StreamingProtocol;
65
use super::connection_type::ConnectionType;
7-
8-
pub type IceCandidateCallback =
9-
Arc<dyn Fn(IceCandidate) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
10-
pub type RenegotiationCallback =
11-
Arc<dyn Fn(String) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
12-
pub type JoinedCallback =
13-
Arc<dyn Fn(bool) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
6+
use crate::models::{
7+
callbacks::{IceCandidateHandler, JoinedHandler, RenegotiationHandler},
8+
streaming_protocol::StreamingProtocol,
9+
};
1410

1511
#[derive(Debug, Clone)]
16-
pub struct RtcManagerConfigs {
12+
pub struct RtcManagerConfig {
1713
pub public_ip: String,
1814
pub port_min: u16,
1915
pub port_max: u16,
@@ -25,17 +21,22 @@ pub struct WClient {
2521
pub room_id: String,
2622
}
2723

28-
#[derive(Clone)]
29-
pub struct JoinRoomParams {
30-
pub sdp: String,
24+
pub struct JoinRoomParameters<I, J>
25+
where
26+
I: IceCandidateHandler,
27+
J: JoinedHandler,
28+
{
29+
pub client_id: String,
3130
pub participant_id: String,
31+
pub room_id: String,
32+
pub sdp: String,
3233
pub is_video_enabled: bool,
3334
pub is_audio_enabled: bool,
3435
pub is_e2ee_enabled: bool,
3536
pub total_tracks: u8,
3637
pub connection_type: ConnectionType,
37-
pub callback: JoinedCallback,
38-
pub on_candidate: IceCandidateCallback,
38+
pub joined_handler: J,
39+
pub ice_handler: I,
3940
pub streaming_protocol: StreamingProtocol,
4041
pub is_ipv6_supported: bool,
4142
}
@@ -48,11 +49,17 @@ pub struct JoinRoomResponse {
4849
}
4950

5051
#[derive(Clone)]
51-
pub struct SubscribeParams {
52+
pub struct SubscribeParameters<I, R>
53+
where
54+
I: IceCandidateHandler,
55+
R: RenegotiationHandler,
56+
{
57+
pub client_id: String,
58+
pub room_id: String,
5259
pub target_id: String,
5360
pub participant_id: String,
54-
pub on_negotiation_needed: RenegotiationCallback,
55-
pub on_candidate: IceCandidateCallback,
61+
pub renegotiation_handler: R,
62+
pub ice_handler: I,
5663
pub is_ipv6_supported: bool,
5764
}
5865

0 commit comments

Comments
 (0)