Skip to content

Commit 554a7f8

Browse files
committed
refactor(rtc-manager): remove async
1 parent bd1d506 commit 554a7f8

File tree

5 files changed

+56
-59
lines changed

5 files changed

+56
-59
lines changed

crates/rtc-manager/src/entities/publisher.rs

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
1-
use std::sync::{
2-
atomic::{AtomicU8, Ordering},
3-
Arc,
4-
};
51
use std::time::{Duration, Instant};
2+
use std::{
3+
sync::{
4+
atomic::{AtomicU8, Ordering},
5+
Arc,
6+
},
7+
thread,
8+
};
69

710
use dashmap::DashMap;
811
use parking_lot::RwLock;
12+
use std::sync::mpsc::{self, Receiver};
913
use str0m::{
1014
change::SdpOffer,
11-
media::{Direction, KeyframeRequestKind, MediaData, MediaKind, Mid},
15+
media::{Direction, KeyframeRequestKind, MediaData, MediaKind, Mid, Rid},
1216
Event, IceConnectionState, Rtc,
1317
};
14-
use tokio::sync::mpsc::{self, UnboundedReceiver};
1518
use tokio_util::sync::CancellationToken;
1619

1720
use crate::{
@@ -67,10 +70,8 @@ impl Publisher {
6770
// Create str0m RTC instance
6871
let rtc = Rtc::builder().build();
6972

70-
// let (track_event_sender, track_event_receiver) = mpsc::unbounded_channel();
71-
7273
// Create event channel for runtime -> publisher
73-
let (event_tx, event_rx) = mpsc::unbounded_channel();
74+
let (event_tx, event_rx) = mpsc::sync_channel(1);
7475

7576
let publisher = Arc::new(Self {
7677
participant_id,
@@ -116,10 +117,8 @@ impl Publisher {
116117

117118
// Start the RTC event loop (events from runtime)
118119
let publisher_clone = Arc::clone(&publisher);
119-
tokio::spawn(async move {
120-
publisher_clone
121-
.run_event_loop(event_rx, joined_handler)
122-
.await;
120+
thread::spawn(move || {
121+
publisher_clone.run_event_loop(event_rx, joined_handler);
123122
});
124123

125124
Ok(publisher)
@@ -211,19 +210,19 @@ impl Publisher {
211210
self.tracks.clear();
212211
}
213212

214-
async fn run_event_loop<J>(self: Arc<Self>, mut rx: UnboundedReceiver<Event>, joined_handler: J)
213+
fn run_event_loop<J>(self: Arc<Self>, rx: Receiver<Event>, joined_handler: J)
215214
where
216215
J: JoinedHandler + Clone,
217216
{
218-
while let Some(event) = rx.recv().await {
219-
self.handle_rtc_event(event, joined_handler.clone()).await;
217+
while let Ok(event) = rx.recv() {
218+
self.handle_rtc_event(event, joined_handler.clone());
220219
if self.cancel_token.is_cancelled() {
221220
break;
222221
}
223222
}
224223
}
225224

226-
async fn handle_rtc_event<J>(&self, event: Event, joined_handler: J)
225+
fn handle_rtc_event<J>(&self, event: Event, joined_handler: J)
227226
where
228227
J: JoinedHandler,
229228
{
@@ -250,7 +249,7 @@ impl Publisher {
250249
self.request_keyframe_throttled(&data);
251250

252251
// Forward media data to subscribers
253-
self.forward_media_to_subscribers(data).await;
252+
self.forward_media_to_subscribers(data);
254253
}
255254
Event::RtpPacket(packet) => {
256255
// Forward RTP packet to subscribers
@@ -262,7 +261,11 @@ impl Publisher {
262261
}
263262
}
264263

265-
async fn forward_media_to_subscribers(&self, media_data: MediaData) {
264+
fn forward_media_to_subscribers(&self, media_data: MediaData) {
265+
if media_data.rid.is_some() && media_data.rid != Some(Rid::from("h")) {
266+
return;
267+
}
268+
266269
// For each subscriber, write the incoming media into its corresponding send-only mid
267270
for entry in self.subscribers.iter() {
268271
let subscriber = entry.value();

crates/rtc-manager/src/entities/room.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use std::sync::Arc;
22

33
use dashmap::DashMap;
44
use str0m::{net::Protocol, Candidate};
5-
use tracing::info;
65

76
use crate::{
87
entities::{publisher::Publisher, subscriber::Subscriber},
@@ -43,8 +42,6 @@ impl Room {
4342
{
4443
let participant_id = params.participant_id.clone();
4544

46-
info!("participant joining {}", participant_id.clone());
47-
4845
// Create publisher
4946
let publisher = Publisher::new(
5047
participant_id.clone(),
@@ -58,25 +55,16 @@ impl Room {
5855
params.joined_handler,
5956
)?;
6057

61-
info!("publisher created");
62-
6358
// Add publisher to room
6459
self.publishers
6560
.insert(participant_id.clone(), publisher.clone());
6661

67-
info!("publisher added to room");
68-
69-
info!("connection type: {:?}", params.connection_type);
70-
7162
// Handle SDP based on connection type
7263
match params.connection_type {
7364
ConnectionType::SFU => {
74-
info!("handle_offer: {}", params.sdp.len());
7565
// For SFU mode, handle the offer and create an answer
7666
let answer_sdp = publisher.handle_offer(params.sdp)?;
7767

78-
info!("answer_sdp: {}", answer_sdp.len());
79-
8068
Ok(Some(JoinRoomResponse {
8169
sdp: answer_sdp,
8270
is_recording: false,
@@ -160,13 +148,9 @@ impl Room {
160148
.get(participant_id)
161149
.ok_or(RtcError::PublisherNotFound)?;
162150

163-
info!("add_publisher_candidate: {}", participant_id);
164-
165151
// Convert IceCandidate to str0m Candidate
166152
let str0m_candidate = self.convert_ice_candidate_to_str0m(candidate)?;
167153

168-
info!("str0m_candidate: {:?}", str0m_candidate);
169-
170154
// Add remote (peer) candidate to publisher's RTC instance
171155
{
172156
let mut rtc = publisher.rtc.write();

crates/rtc-manager/src/entities/subscriber.rs

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
1-
use std::sync::{
2-
atomic::{AtomicU8, Ordering},
3-
Arc,
1+
use std::{
2+
sync::{
3+
atomic::{AtomicU8, Ordering},
4+
Arc,
5+
},
6+
thread,
47
};
58

69
use dashmap::DashMap;
710
use parking_lot::RwLock;
11+
use std::sync::mpsc::{self, Receiver};
812
use str0m::{
913
change::{SdpAnswer, SdpPendingOffer},
1014
media::{Direction, MediaKind, Mid},
1115
Event, IceConnectionState, Rtc,
1216
};
13-
use tokio::sync::mpsc::{self, UnboundedReceiver};
1417
use tokio_util::sync::CancellationToken;
1518

1619
use crate::{
@@ -66,7 +69,7 @@ impl Subscriber {
6669
let rtc = Rtc::builder().build();
6770

6871
// Create event channel for runtime -> subscriber
69-
let (event_tx, event_rx) = mpsc::unbounded_channel();
72+
let (event_tx, event_rx) = mpsc::sync_channel(1);
7073

7174
let subscriber = Arc::new(Self {
7275
participant_id,
@@ -93,10 +96,9 @@ impl Subscriber {
9396
.ok();
9497

9598
// Announce host candidate to the client via callback
96-
if let (Some(cb), Some(host)) = (
97-
Some(ice_handler),
98-
RtcUdpRuntime::global().host_candidate(),
99-
) {
99+
if let (Some(cb), Some(host)) =
100+
(Some(ice_handler), RtcUdpRuntime::global().host_candidate())
101+
{
100102
let ice = crate::utils::ice_utils::IceUtils::convert_from_str0m_candidate(
101103
&host,
102104
Some("0".to_string()),
@@ -107,8 +109,8 @@ impl Subscriber {
107109

108110
// Start the RTC event loop
109111
let subscriber_clone = Arc::clone(&subscriber);
110-
tokio::spawn(async move {
111-
subscriber_clone.run_event_loop(event_rx).await;
112+
thread::spawn(move || {
113+
subscriber_clone.run_event_loop(event_rx);
112114
});
113115

114116
Ok(subscriber)
@@ -220,16 +222,16 @@ impl Subscriber {
220222
self.tracks.clear();
221223
}
222224

223-
async fn run_event_loop(self: Arc<Self>, mut rx: UnboundedReceiver<Event>) {
224-
while let Some(event) = rx.recv().await {
225-
self.handle_rtc_event(event).await;
225+
fn run_event_loop(self: Arc<Self>, rx: Receiver<Event>) {
226+
while let Ok(event) = rx.recv() {
227+
self.handle_rtc_event(event);
226228
if self.cancel_token.is_cancelled() {
227229
break;
228230
}
229231
}
230232
}
231233

232-
async fn handle_rtc_event(&self, event: Event) {
234+
fn handle_rtc_event(&self, event: Event) {
233235
match event {
234236
Event::Connected => {
235237
tracing::info!(
@@ -268,7 +270,7 @@ impl Subscriber {
268270
}
269271
}
270272

271-
pub async fn receive_media_data(&self, _data: &[u8]) -> Result<(), RtcError> {
273+
pub fn receive_media_data(&self, _data: &[u8]) -> Result<(), RtcError> {
272274
// TODO: Process incoming media data from publisher
273275
// This is where the subscriber receives data controlled by the publisher
274276
Ok(())

crates/rtc-manager/src/utils/udp_runtime.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ use std::time::{Duration, Instant};
66
use dashmap::DashMap;
77
use once_cell::sync::OnceCell;
88
use parking_lot::RwLock;
9+
use std::sync::mpsc::{self};
910
use systemstat::{Platform, System};
10-
use tokio::sync::mpsc;
1111
use tokio_util::sync::CancellationToken;
1212

1313
use str0m::net::Receive;
@@ -20,7 +20,7 @@ use crate::models::rtc_dto::RtcManagerConfig;
2020
pub struct RtcRegistration {
2121
pub id: String,
2222
pub rtc: Arc<RwLock<Rtc>>,
23-
pub event_tx: mpsc::UnboundedSender<Event>,
23+
pub event_tx: mpsc::SyncSender<Event>,
2424
}
2525

2626
pub struct RtcUdpRuntime {
@@ -89,7 +89,7 @@ impl RtcUdpRuntime {
8989
&self,
9090
id: String,
9191
rtc: Arc<RwLock<Rtc>>,
92-
event_tx: mpsc::UnboundedSender<Event>,
92+
event_tx: mpsc::SyncSender<Event>,
9393
) -> Result<(), RtcError> {
9494
// Add a host candidate that matches the runtime socket
9595
let addr = self

sfu/src/application/sfu_grpc_service.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ impl SfuService for SfuGrpcService {
5757
let ice_handler = GrpcPublisherIceHandler {
5858
dispatcher: Arc::clone(&self.dispatcher_grpc_client),
5959
client_id: req.client_id.clone(),
60+
tokio_handle: tokio::runtime::Handle::current(),
6061
};
6162

6263
let joined_handler = GrpcJoinedHandler {
@@ -65,6 +66,7 @@ impl SfuService for SfuGrpcService {
6566
room_id: req.room_id.clone(),
6667
client_id: req.client_id.clone(),
6768
node_id: self.node_id.clone(),
69+
tokio_handle: tokio::runtime::Handle::current(),
6870
};
6971

7072
let rtc_manager = self.rtc_manager.clone();
@@ -115,12 +117,14 @@ impl SfuService for SfuGrpcService {
115117
dispatcher: Arc::clone(&self.dispatcher_grpc_client),
116118
client_id: req.client_id.clone(),
117119
target_id: req.target_id.clone(),
120+
tokio_handle: tokio::runtime::Handle::current(),
118121
};
119122

120123
let renegotiation_handler = GrpcRenegotiationHandler {
121124
dispatcher: Arc::clone(&self.dispatcher_grpc_client),
122125
client_id: req.client_id.clone(),
123126
target_id: req.target_id.clone(),
127+
tokio_handle: tokio::runtime::Handle::current(),
124128
};
125129

126130
let rtc_manager = self.rtc_manager.clone();
@@ -434,14 +438,15 @@ impl SfuService for SfuGrpcService {
434438
pub struct GrpcPublisherIceHandler {
435439
pub dispatcher: Arc<Mutex<DispatcherGrpcClient>>,
436440
pub client_id: String,
441+
pub tokio_handle: tokio::runtime::Handle,
437442
}
438443

439444
impl IceCandidateHandler for GrpcPublisherIceHandler {
440445
fn handle_candidate(&self, candidate: IceCandidate) {
441446
let dispatcher = Arc::clone(&self.dispatcher);
442447
let client_id = self.client_id.clone();
443448

444-
tokio::spawn(async move {
449+
self.tokio_handle.spawn(async move {
445450
let dispatcher = dispatcher.lock().await;
446451

447452
let _ = dispatcher
@@ -463,14 +468,16 @@ pub struct GrpcSubscriberIceHandler {
463468
pub dispatcher: Arc<Mutex<DispatcherGrpcClient>>,
464469
pub client_id: String,
465470
pub target_id: String,
471+
pub tokio_handle: tokio::runtime::Handle,
466472
}
467473

468474
impl IceCandidateHandler for GrpcSubscriberIceHandler {
469475
fn handle_candidate(&self, candidate: IceCandidate) {
470476
let dispatcher = Arc::clone(&self.dispatcher);
471477
let client_id = self.client_id.clone();
472478
let target_id = self.target_id.clone();
473-
tokio::spawn(async move {
479+
480+
self.tokio_handle.spawn(async move {
474481
let dispatcher = dispatcher.lock().await;
475482

476483
let _ = dispatcher
@@ -495,6 +502,7 @@ pub struct GrpcJoinedHandler {
495502
pub room_id: String,
496503
pub client_id: String,
497504
pub node_id: String,
505+
pub tokio_handle: tokio::runtime::Handle,
498506
}
499507

500508
impl JoinedHandler for GrpcJoinedHandler {
@@ -505,9 +513,8 @@ impl JoinedHandler for GrpcJoinedHandler {
505513
let client_id = self.client_id.clone();
506514
let node_id = self.node_id.clone();
507515

508-
tokio::spawn(async move {
516+
self.tokio_handle.spawn(async move {
509517
let dispatcher = dispatcher.lock().await;
510-
511518
let _ = dispatcher
512519
.new_user_joined(NewUserJoinedRequest {
513520
participant_id,
@@ -526,6 +533,7 @@ pub struct GrpcRenegotiationHandler {
526533
pub dispatcher: Arc<Mutex<DispatcherGrpcClient>>,
527534
pub client_id: String,
528535
pub target_id: String,
536+
pub tokio_handle: tokio::runtime::Handle,
529537
}
530538

531539
impl RenegotiationHandler for GrpcRenegotiationHandler {
@@ -534,7 +542,7 @@ impl RenegotiationHandler for GrpcRenegotiationHandler {
534542
let client_id = self.client_id.clone();
535543
let target_id = self.target_id.clone();
536544

537-
tokio::spawn(async move {
545+
self.tokio_handle.spawn(async move {
538546
let dispatcher = dispatcher.lock().await;
539547
let _ = dispatcher
540548
.subscriber_renegotiate(SubscriberRenegotiateRequest {

0 commit comments

Comments
 (0)