Skip to content

Commit 1c8a6d2

Browse files
committed
add MessageInner::SenderRtp/Rtcp and ReceiverRtcp
1 parent b0cc388 commit 1c8a6d2

File tree

6 files changed

+118
-24
lines changed

6 files changed

+118
-24
lines changed
Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,32 @@
1+
use crate::error::Result;
12
use crate::media_stream::Track;
3+
use crate::peer_connection::MessageInner;
4+
use crate::runtime::Sender;
5+
use rtc::rtp_transceiver::RTCRtpSenderId;
26
use rtc::{rtcp, rtp};
37

48
pub mod static_rtp;
59

10+
/// TrackLocalContext is the Context passed when a TrackLocal has been Binded/Unbinded from a PeerConnection, and used
11+
/// in Interceptors.
12+
#[derive(Clone)]
13+
pub struct TrackLocalContext {
14+
pub(crate) sender_id: RTCRtpSenderId,
15+
pub(crate) msg_tx: Sender<MessageInner>,
16+
}
17+
618
#[async_trait::async_trait]
719
pub trait TrackLocal: Track {
8-
async fn write_rtp(&self, packet: rtp::Packet) -> crate::error::Result<()>;
9-
async fn write_rtcp(&self, packets: Vec<Box<dyn rtcp::Packet>>) -> crate::error::Result<()>;
20+
/// bind should implement the way how the media data flows from the Track to the PeerConnection
21+
/// This will be called internally after signaling is complete and the list of available
22+
/// codecs has been determined
23+
async fn bind(&self, ctx: &TrackLocalContext) -> Result<()>;
24+
25+
/// unbind should implement the teardown logic when the track is no longer needed. This happens
26+
/// because a track has been stopped.
27+
async fn unbind(&self) -> Result<()>;
28+
29+
async fn write_rtp(&self, packet: rtp::Packet) -> Result<()>;
30+
31+
async fn write_rtcp(&self, packets: Vec<Box<dyn rtcp::Packet>>) -> Result<()>;
1032
}
Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,25 @@
1-
use crate::error::Result;
1+
use crate::error::{Error, Result};
22
use crate::media_stream::Track;
3-
use crate::media_stream::track_local::TrackLocal;
3+
use crate::media_stream::track_local::{TrackLocal, TrackLocalContext};
4+
use crate::peer_connection::MessageInner;
5+
use crate::runtime::Mutex;
46
use rtc::media_stream::MediaStreamTrack;
5-
use rtc::rtp::Packet;
7+
use rtc::{rtcp, rtp};
68

79
/// TrackLocalStaticRTP is a TrackLocal that has a pre-set codec and accepts RTP Packets.
810
/// If you wish to send a media.Sample use TrackLocalStaticSample
9-
#[derive(Debug)]
11+
#[derive(Clone)]
1012
pub struct TrackLocalStaticRTP {
1113
track: MediaStreamTrack,
14+
ctx: Mutex<Option<TrackLocalContext>>,
1215
}
1316

1417
impl TrackLocalStaticRTP {
1518
pub fn new(track: MediaStreamTrack) -> Self {
16-
Self { track }
19+
Self {
20+
track,
21+
ctx: Mutex::new(None),
22+
}
1723
}
1824
}
1925

@@ -25,11 +31,37 @@ impl Track for TrackLocalStaticRTP {
2531

2632
#[async_trait::async_trait]
2733
impl TrackLocal for TrackLocalStaticRTP {
28-
async fn write_rtp(&self, _packet: Packet) -> Result<()> {
29-
todo!()
34+
async fn bind(&self, ctx: &TrackLocalContext) -> Result<()> {
35+
let mut ctx_opt = self.ctx.lock().await;
36+
*ctx_opt = Some(ctx.clone());
37+
Ok(())
3038
}
3139

32-
async fn write_rtcp(&self, _packets: Vec<Box<dyn rtc::rtcp::Packet>>) -> Result<()> {
33-
todo!()
40+
async fn unbind(&self) -> Result<()> {
41+
let mut ctx_opt = self.ctx.lock().await;
42+
*ctx_opt = None;
43+
Ok(())
44+
}
45+
46+
async fn write_rtp(&self, packet: rtp::Packet) -> Result<()> {
47+
let ctx_opt = self.ctx.lock().await;
48+
if let Some(ctx) = &*ctx_opt {
49+
ctx.msg_tx
50+
.try_send(MessageInner::SenderRtp(ctx.sender_id, packet))
51+
.map_err(|e| Error::Other(format!("{:?}", e)))
52+
} else {
53+
Err(Error::Other("track is not binding yet".to_string()))
54+
}
55+
}
56+
57+
async fn write_rtcp(&self, packets: Vec<Box<dyn rtcp::Packet>>) -> Result<()> {
58+
let ctx_opt = self.ctx.lock().await;
59+
if let Some(ctx) = &*ctx_opt {
60+
ctx.msg_tx
61+
.try_send(MessageInner::SenderRtcp(ctx.sender_id, packets))
62+
.map_err(|e| Error::Other(format!("{:?}", e)))
63+
} else {
64+
Err(Error::Other("track is not binding yet".to_string()))
65+
}
3466
}
3567
}

src/media_stream/track_remote/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub(crate) mod static_rtp;
22

3+
use crate::error::Result;
34
use crate::media_stream::Track;
45
use rtc::{rtcp, rtp};
56

@@ -19,6 +20,7 @@ pub enum TrackRemoteEvent {
1920

2021
#[async_trait::async_trait]
2122
pub trait TrackRemote: Track {
22-
async fn write_rtcp(&self, packets: Vec<Box<dyn rtcp::Packet>>) -> crate::error::Result<()>;
23+
async fn write_rtcp(&self, packets: Vec<Box<dyn rtcp::Packet>>) -> Result<()>;
24+
2325
async fn poll(&self) -> Option<TrackRemoteEvent>;
2426
}

src/media_stream/track_remote/static_rtp.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,32 @@
1+
use crate::error::{Error, Result};
12
use crate::media_stream::Track;
23
use crate::media_stream::track_remote::{TrackRemote, TrackRemoteEvent};
3-
use crate::runtime::{Mutex, Receiver};
4+
use crate::peer_connection::MessageInner;
5+
use crate::runtime::{Mutex, Receiver, Sender};
46
use rtc::media_stream::MediaStreamTrack;
7+
use rtc::rtp_transceiver::RTCRtpReceiverId;
58

69
/// TrackLocalStaticRTP is a TrackLocal that has a pre-set codec and accepts RTP Packets.
710
/// If you wish to send a media.Sample use TrackLocalStaticSample
11+
#[derive(Clone)]
812
pub(crate) struct TrackRemoteStaticRTP {
913
track: MediaStreamTrack,
14+
receiver_id: RTCRtpReceiverId,
15+
msg_tx: Sender<MessageInner>,
1016
evt_rx: Mutex<Receiver<TrackRemoteEvent>>,
1117
}
1218

1319
impl TrackRemoteStaticRTP {
14-
pub fn new(track: MediaStreamTrack, evt_rx: Receiver<TrackRemoteEvent>) -> Self {
20+
pub fn new(
21+
track: MediaStreamTrack,
22+
receiver_id: RTCRtpReceiverId,
23+
msg_tx: Sender<MessageInner>,
24+
evt_rx: Receiver<TrackRemoteEvent>,
25+
) -> Self {
1526
Self {
1627
track,
28+
receiver_id,
29+
msg_tx,
1730
evt_rx: Mutex::new(evt_rx),
1831
}
1932
}
@@ -27,11 +40,10 @@ impl Track for TrackRemoteStaticRTP {
2740

2841
#[async_trait::async_trait]
2942
impl TrackRemote for TrackRemoteStaticRTP {
30-
async fn write_rtcp(
31-
&self,
32-
_packets: Vec<Box<dyn rtc::rtcp::Packet>>,
33-
) -> crate::error::Result<()> {
34-
todo!()
43+
async fn write_rtcp(&self, packets: Vec<Box<dyn rtc::rtcp::Packet>>) -> Result<()> {
44+
self.msg_tx
45+
.try_send(MessageInner::ReceiverRtcp(self.receiver_id, packets))
46+
.map_err(|e| Error::Other(format!("{:?}", e)))
3547
}
3648

3749
async fn poll(&self) -> Option<TrackRemoteEvent> {

src/peer_connection/driver.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use std::sync::Arc;
3232
use std::time::{Duration, Instant};
3333

3434
/// Capacity of the internal driver message channel (WriteNotify, IceGathering, Close, …).
35-
pub(crate) const MESSAGE_INNER_CHANNEL_CAPACITY: usize = 64;
35+
pub(crate) const MESSAGE_INNER_CHANNEL_CAPACITY: usize = 256;
3636

3737
/// Capacity of each data-channel event channel (OnOpen, OnMessage, OnClose, …).
3838
pub(crate) const DATA_CHANNEL_EVENT_CHANNEL_CAPACITY: usize = 256;
@@ -410,7 +410,12 @@ where
410410
if let Some(track) = track {
411411
let (evt_tx, evt_rx) = channel(TRACK_REMOTE_EVENT_CHANNEL_CAPACITY);
412412
let track_remote: Arc<dyn TrackRemote> =
413-
Arc::new(TrackRemoteStaticRTP::new(track, evt_rx));
413+
Arc::new(TrackRemoteStaticRTP::new(
414+
track,
415+
init.receiver_id,
416+
self.inner.msg_tx.clone(),
417+
evt_rx,
418+
));
414419

415420
{
416421
let mut rtp_transceivers = self.inner.rtp_transceivers.lock().await;
@@ -512,12 +517,17 @@ where
512517

513518
async fn handle_inner_message(&mut self, msg: MessageInner) -> bool {
514519
match msg {
515-
/*MessageInner::SenderRtp(sender_id, packet) => {
520+
MessageInner::SenderRtp(sender_id, packet) => {
516521
let mut core = self.inner.core.lock().await;
517522
if let Some(mut sender) = core.rtp_sender(sender_id) {
518523
if let Err(err) = sender.write_rtp(packet) {
519524
error!("Failed to send RTP: {}", err);
520525
}
526+
} else {
527+
error!(
528+
"Failed to send RTCP feedback due to unknown sender id {:?}",
529+
sender_id
530+
);
521531
}
522532
}
523533
MessageInner::SenderRtcp(sender_id, rtcp_packets) => {
@@ -526,6 +536,11 @@ where
526536
if let Err(err) = sender.write_rtcp(rtcp_packets) {
527537
error!("Failed to send RTCP: {}", err);
528538
}
539+
} else {
540+
error!(
541+
"Failed to send RTCP feedback due to unknown sender id {:?}",
542+
sender_id
543+
);
529544
}
530545
}
531546
MessageInner::ReceiverRtcp(receiver_id, rtcp_packets) => {
@@ -534,8 +549,13 @@ where
534549
if let Err(err) = receiver.write_rtcp(rtcp_packets) {
535550
error!("Failed to send RTCP feedback: {}", err);
536551
}
552+
} else {
553+
error!(
554+
"Failed to send RTCP feedback due to unknown receiver id {:?}",
555+
receiver_id
556+
);
537557
}
538-
}*/
558+
}
539559
MessageInner::WriteNotify => {
540560
//Do nothing, just want to wake up from futures::select! in order to poll_write
541561
}

src/peer_connection/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@ use rtc::data_channel::{RTCDataChannelId, RTCDataChannelInit};
2525
use rtc::peer_connection::RTCPeerConnectionBuilder;
2626
use rtc::peer_connection::configuration::{RTCAnswerOptions, RTCOfferOptions};
2727
use rtc::rtp_transceiver::rtp_sender::RtpCodecKind;
28-
use rtc::rtp_transceiver::{RTCRtpTransceiverId, RTCRtpTransceiverInit};
28+
use rtc::rtp_transceiver::{
29+
RTCRtpReceiverId, RTCRtpSenderId, RTCRtpTransceiverId, RTCRtpTransceiverInit,
30+
};
2931
use rtc::sansio::Protocol;
3032
use rtc::shared::error::{Error, Result};
3133
use rtc::statistics::StatsSelector;
3234
use rtc::statistics::report::RTCStatsReport;
35+
use rtc::{rtcp, rtp};
3336

3437
use crate::media_stream::track_local::static_rtp::TrackLocalStaticRTP;
3538
use crate::media_stream::track_remote::TrackRemoteEvent;
@@ -109,6 +112,9 @@ pub trait PeerConnectionEventHandler: Send + Sync + 'static {
109112
/// Unified inner message type for the peer connection driver
110113
#[derive(Debug)]
111114
pub(crate) enum MessageInner {
115+
SenderRtp(RTCRtpSenderId, rtp::Packet),
116+
SenderRtcp(RTCRtpSenderId, Vec<Box<dyn rtcp::Packet>>),
117+
ReceiverRtcp(RTCRtpReceiverId, Vec<Box<dyn rtcp::Packet>>),
112118
WriteNotify,
113119
IceGathering,
114120
Close,

0 commit comments

Comments
 (0)