Skip to content

Commit 29a736f

Browse files
committed
update .
1 parent 4cca774 commit 29a736f

File tree

8 files changed

+178
-64
lines changed

8 files changed

+178
-64
lines changed

crates/webrtc-manager/src/entities/str0m_room.rs

Lines changed: 66 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ use std::sync::Arc;
33
use dashmap::DashMap;
44
use parking_lot::RwLock;
55
use str0m::{
6-
Candidate,
6+
Candidate, Input,
77
change::{SdpAnswer, SdpOffer},
8+
media::MediaData,
89
};
10+
use tracing::{debug, error, info};
911

1012
use crate::{
1113
entities::media::Media,
@@ -77,6 +79,22 @@ impl Str0mRoom {
7779
(callback)(is_migrate).await;
7880
});
7981
});
82+
83+
// Set media data callback to forward to subscribers
84+
let room_clone = self.clone();
85+
let participant_id_clone = participant_id.clone();
86+
session_write.set_media_data_callback(move |media_data| {
87+
let room_clone = room_clone.clone();
88+
let participant_id = participant_id_clone.clone();
89+
tokio::spawn(async move {
90+
if let Err(e) = room_clone
91+
.forward_media_data(&participant_id, media_data)
92+
.await
93+
{
94+
error!("Failed to forward media data: {:?}", e);
95+
}
96+
});
97+
});
8098
}
8199

82100
// Add the session to publishers
@@ -85,19 +103,22 @@ impl Str0mRoom {
85103

86104
// Handle SDP negotiation based on connection type
87105
if params.connection_type == ConnectionType::SFU {
106+
println!("[Str0mRoom] params.sdp: {:?}", params.sdp.len());
88107
// For SFU, we need to accept the offer and create an answer
89-
let offer: SdpOffer =
90-
serde_json::from_str(&params.sdp).map_err(|_| WebRTCError::FailedToCreateOffer)?;
91-
108+
let offer: SdpOffer = SdpOffer::from_sdp_string(&params.sdp)
109+
.map_err(|_| WebRTCError::FailedToCreateOffer)?;
110+
println!("===> parse offer");
92111
let mut session_write = session.write();
93112
let answer = session_write.accept_offer(offer)?;
94113

114+
println!("===> accept offer");
115+
95116
// Convert answer to string
96-
let answer_json =
97-
serde_json::to_string(&answer).map_err(|_| WebRTCError::FailedToCreateAnswer)?;
117+
// let answer_json =
118+
// serde_json::to_string(&answer).map_err(|_| WebRTCError::FailedToCreateAnswer)?;
98119

99120
return Ok(Some(JoinRoomResponse {
100-
sdp: answer_json,
121+
sdp: answer.to_sdp_string(),
101122
is_recording: false,
102123
}));
103124
} else {
@@ -147,10 +168,10 @@ impl Str0mRoom {
147168
let subscriber_session = self.session_manager.create_session(
148169
participant_id.clone(),
149170
ConnectionType::SFU,
150-
true, // is_video_enabled
151-
true, // is_audio_enabled
152-
false, // is_e2ee_enabled
153-
StreamingProtocol::HLS, // Use HLS as default for subscribers
171+
true, // is_video_enabled
172+
true, // is_audio_enabled
173+
false, // is_e2ee_enabled
174+
StreamingProtocol::SFU,
154175
);
155176

156177
// Set up callbacks for subscriber
@@ -192,6 +213,40 @@ impl Str0mRoom {
192213
}
193214
}
194215

216+
/// Forward media data from a publisher to all its subscribers
217+
pub async fn forward_media_data(
218+
&self,
219+
publisher_id: &str,
220+
media_data: MediaData,
221+
) -> Result<(), WebRTCError> {
222+
let prefix = format!("p_{publisher_id}_");
223+
224+
// Find all subscribers for this publisher
225+
for entry in self.subscribers.iter() {
226+
if entry.key().starts_with(&prefix) {
227+
let subscriber = entry.value();
228+
let mut session_write = subscriber.write();
229+
230+
// For now, we'll just log the media data forwarding
231+
// In a real implementation, you would need to properly handle
232+
// the media data according to str0m's API
233+
debug!(
234+
"Forwarding media data from publisher {} to subscriber {}: mid={:?}, data_len={}",
235+
publisher_id,
236+
entry.key(),
237+
media_data.mid,
238+
media_data.data.len()
239+
);
240+
241+
// TODO: Implement proper media data forwarding using str0m's API
242+
// This would typically involve creating the appropriate media tracks
243+
// and writing the data to them
244+
}
245+
}
246+
247+
Ok(())
248+
}
249+
195250
pub fn subscribe_hls_live_stream(
196251
&self,
197252
params: SubscribeHlsLiveStreamParams,

crates/webrtc-manager/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,6 @@ pub mod models;
44
pub mod str0m_network_manager;
55
pub mod str0m_session;
66
pub mod str0m_webrtc_manager;
7-
pub mod utils;
87
pub mod webrtc_manager;
8+
9+
pub use webrtc_manager::{JoinRoomRequest, NetworkManager, WebRTCManager};

crates/webrtc-manager/src/models/params.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ pub struct JoinRoomParams {
4141
pub is_ipv6_supported: bool,
4242
}
4343

44-
#[derive(Serialize)]
44+
#[derive(Serialize, Debug)]
4545
#[serde(rename_all = "camelCase")]
4646
pub struct JoinRoomResponse {
4747
pub sdp: String,

crates/webrtc-manager/src/str0m_session.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub struct Str0mSession {
3232
pub on_candidate: Option<Box<dyn Fn(IceCandidate) + Send + Sync>>,
3333
pub on_negotiation_needed: Option<Box<dyn Fn(String) + Send + Sync>>,
3434
pub on_connected: Option<Box<dyn Fn() + Send + Sync>>,
35+
pub on_media_data: Option<Box<dyn Fn(MediaData) + Send + Sync>>,
3536
}
3637

3738
impl Str0mSession {
@@ -68,6 +69,7 @@ impl Str0mSession {
6869
on_candidate: None,
6970
on_negotiation_needed: None,
7071
on_connected: None,
72+
on_media_data: None,
7173
}
7274
}
7375

@@ -194,8 +196,11 @@ impl Str0mSession {
194196
data.data.len()
195197
);
196198

197-
// Here you would typically forward the media data to subscribers
198-
// For now, we'll just log it
199+
// Forward media data to subscribers if callback is set
200+
if let Some(callback) = &self.on_media_data {
201+
callback(data);
202+
}
203+
199204
Ok(())
200205
}
201206

@@ -223,6 +228,14 @@ impl Str0mSession {
223228
self.on_connected = Some(Box::new(callback));
224229
}
225230

231+
/// Set the media data callback for forwarding to subscribers
232+
pub fn set_media_data_callback<F>(&mut self, callback: F)
233+
where
234+
F: Fn(MediaData) + Send + Sync + 'static,
235+
{
236+
self.on_media_data = Some(Box::new(callback));
237+
}
238+
226239
/// Check if the session is connected
227240
pub fn is_connected(&self) -> bool {
228241
self.is_connected

crates/webrtc-manager/src/str0m_webrtc_manager.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ impl Str0mWebRTCManager {
8989
room.join_room(params, room_id).await?
9090
};
9191

92+
println!("[Str0mWebRTCManager] res: {:?}", res);
93+
9294
Ok(res)
9395
}
9496

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,48 @@
1-
use crate::str0m_webrtc_manager::{JoinRoomReq, Str0mWebRTCManager};
1+
use std::sync::Arc;
2+
use tokio::sync::Mutex;
3+
use tracing::info;
24

3-
// Re-export the str0m implementation as the main WebRTC manager
4-
pub type WebRTCManager = Str0mWebRTCManager;
5+
use crate::{
6+
str0m_network_manager::Str0mNetworkManager,
7+
str0m_webrtc_manager::{JoinRoomReq, Str0mWebRTCManager},
8+
};
59

6-
// Re-export the join room request type
10+
pub type WebRTCManager = Str0mWebRTCManager;
711
pub type JoinRoomRequest = JoinRoomReq;
12+
13+
pub struct NetworkManager {
14+
network_manager: Arc<Mutex<Str0mNetworkManager>>,
15+
webrtc_manager: Arc<Str0mWebRTCManager>,
16+
}
17+
18+
impl NetworkManager {
19+
pub fn new(bind_addr: std::net::SocketAddr) -> Result<Self, crate::errors::WebRTCError> {
20+
let network_manager = Str0mNetworkManager::new(bind_addr)?;
21+
let webrtc_manager = Str0mWebRTCManager::new();
22+
23+
Ok(Self {
24+
network_manager: Arc::new(Mutex::new(network_manager)),
25+
webrtc_manager: Arc::new(webrtc_manager),
26+
})
27+
}
28+
29+
pub fn get_webrtc_manager(&self) -> Arc<Str0mWebRTCManager> {
30+
self.webrtc_manager.clone()
31+
}
32+
33+
pub async fn start_network_loop(&self) {
34+
let network_manager = self.network_manager.clone();
35+
36+
tokio::spawn(async move {
37+
info!("Starting network loop...");
38+
loop {
39+
let mut manager = network_manager.lock().await;
40+
if let Err(e) = manager.run_network_loop() {
41+
tracing::error!("Network loop error: {:?}", e);
42+
}
43+
// Small delay to prevent busy waiting
44+
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
45+
}
46+
});
47+
}
48+
}

justfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
signalling:
22
cargo run --bin signalling
33
sfu:
4-
cargo run --bin sfu --release
4+
cargo run --bin sfu
55
build-proto:
66
cargo build -p waterbus-proto
77
build-signalling:

0 commit comments

Comments
 (0)