Skip to content

Commit 536ea96

Browse files
authored
Make use of the max-message-size SDP attribute in datachannels (#722)
* [PC] Extract datachannel retrieval logic from `have_data_channel` in order to be able to reuse the logic * [webrtc] Extact the max message size from the remote parsed SDP * [webrtc] Remove unused field `RTCSctpTransport`, as the max message size is fed to the underlying association during the call to `start_sctp` * [webrtc] Update `calc_message_size` to return `u32` instead of `usize`, as the only consumer of this method expects `u32` * [webrtc] Allow users to specify the max payload they can send over sctp * [datachannel] Add the `max_message_size` config field into the datachannel's config Given that we are no longer fixed to the 64KB max size for message sizes, the user needs a way to know the actual max message size a datachannel may send before returning an error. This information is particularly useful for applications where large messages are chunked and framed in the application itself. * Basic test suite for `max-message-size` handling * [webrtc] Introduce `get_application_section_max_message_size` * Remove if disagreed upon
1 parent 8764124 commit 536ea96

File tree

9 files changed

+288
-27
lines changed

9 files changed

+288
-27
lines changed

data/src/data_channel/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub struct Config {
3434
pub reliability_parameter: u32,
3535
pub label: String,
3636
pub protocol: String,
37+
pub max_message_size: u32,
3738
}
3839

3940
/// DataChannel represents a data channel

sctp/src/association/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,12 @@ pub struct Config {
202202
pub struct Association {
203203
name: String,
204204
state: Arc<AtomicU8>,
205+
// TODO: Convert into `u32`, as there is no reason why the `max_message_size` should need to be
206+
// changed after the Assocaition has been created. Note that even if there was a use case for
207+
// this, it is not used anywhere in the code base.
208+
//
209+
// Using atomics where not necessary -- especially in a hot path such as `prepare_write` -- may
210+
// negatively impact performance, and adds unneeded complexity to the code.
205211
max_message_size: Arc<AtomicU32>,
206212
inflight_queue_length: Arc<AtomicUsize>,
207213
will_send_shutdown: Arc<AtomicBool>,

sdp/src/description/session.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ pub const ATTR_KEY_SEND_ONLY: &str = "sendonly";
3131
pub const ATTR_KEY_SEND_RECV: &str = "sendrecv";
3232
pub const ATTR_KEY_EXT_MAP: &str = "extmap";
3333
pub const ATTR_KEY_EXTMAP_ALLOW_MIXED: &str = "extmap-allow-mixed";
34+
pub const ATTR_KEY_MAX_MESSAGE_SIZE: &str = "max-message-size";
3435

3536
/// Constants for semantic tokens used in JSEP
3637
pub const SEMANTIC_TOKEN_LIP_SYNCHRONIZATION: &str = "LS";

webrtc/src/api/setting_engine/mod.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,30 @@ pub struct ReplayProtection {
5454
pub srtcp: usize,
5555
}
5656

57+
#[derive(Clone)]
58+
pub enum SctpMaxMessageSize {
59+
Bounded(u32),
60+
Unbounded,
61+
}
62+
63+
impl SctpMaxMessageSize {
64+
pub const DEFAULT_MESSAGE_SIZE: u32 = 65536;
65+
pub fn as_u32(&self) -> u32 {
66+
match self {
67+
Self::Bounded(result) => *result,
68+
Self::Unbounded => 0,
69+
}
70+
}
71+
}
72+
73+
impl Default for SctpMaxMessageSize {
74+
fn default() -> Self {
75+
// https://datatracker.ietf.org/doc/html/rfc8841#section-6.1-4
76+
// > If the SDP "max-message-size" attribute is not present, the default value is 64K.
77+
Self::Bounded(Self::DEFAULT_MESSAGE_SIZE)
78+
}
79+
}
80+
5781
/// SettingEngine allows influencing behavior in ways that are not
5882
/// supported by the WebRTC API. This allows us to support additional
5983
/// use-cases without deviating from the WebRTC API elsewhere.
@@ -79,6 +103,8 @@ pub struct SettingEngine {
79103
pub(crate) receive_mtu: usize,
80104
pub(crate) mid_generator: Option<Arc<dyn Fn(isize) -> String + Send + Sync>>,
81105
pub(crate) enable_sender_rtx: bool,
106+
/// Determines the max size of any message that may be sent through an SCTP transport.
107+
pub(crate) sctp_max_message_size_can_send: SctpMaxMessageSize,
82108
}
83109

84110
impl SettingEngine {
@@ -342,4 +368,11 @@ impl SettingEngine {
342368
pub fn enable_sender_rtx(&mut self, is_enabled: bool) {
343369
self.enable_sender_rtx = is_enabled;
344370
}
371+
372+
pub fn set_sctp_max_message_size_can_send(
373+
&mut self,
374+
max_message_size_can_send: SctpMaxMessageSize,
375+
) {
376+
self.sctp_max_message_size_can_send = max_message_size_can_send
377+
}
345378
}

webrtc/src/data_channel/data_channel_test.rs

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use waitgroup::WaitGroup;
88

99
use super::*;
1010
use crate::api::media_engine::MediaEngine;
11+
use crate::api::setting_engine::SctpMaxMessageSize;
1112
use crate::api::{APIBuilder, API};
1213
use crate::data_channel::data_channel_init::RTCDataChannelInit;
1314
//use log::LevelFilter;
@@ -1375,6 +1376,203 @@ async fn test_data_channel_non_standard_session_description() -> Result<()> {
13751376
Ok(())
13761377
}
13771378

1379+
async fn create_data_channel_with_max_message_size(
1380+
remote_max_message_size: Option<u32>,
1381+
can_send_max_message_size: Option<SctpMaxMessageSize>,
1382+
) -> Result<Arc<RTCDataChannel>> {
1383+
let mut m = MediaEngine::default();
1384+
let mut s: SettingEngine = SettingEngine::default();
1385+
s.detach_data_channels();
1386+
m.register_default_codecs()?;
1387+
let api_builder = APIBuilder::new().with_media_engine(m);
1388+
1389+
if let Some(can_send_max_message_size) = can_send_max_message_size {
1390+
s.set_sctp_max_message_size_can_send(can_send_max_message_size);
1391+
}
1392+
1393+
let api = api_builder.with_setting_engine(s).build();
1394+
1395+
let (offer_pc, answer_pc) = new_pair(&api).await?;
1396+
let (data_channel_tx, mut data_channel_rx) = mpsc::channel::<Arc<RTCDataChannel>>(1);
1397+
let data_channel_tx = Arc::new(data_channel_tx);
1398+
answer_pc.on_data_channel(Box::new(move |dc: Arc<RTCDataChannel>| {
1399+
let data_channel_tx2 = Arc::clone(&data_channel_tx);
1400+
Box::pin(async move {
1401+
data_channel_tx2.send(dc).await.unwrap();
1402+
})
1403+
}));
1404+
1405+
let _ = offer_pc.create_data_channel("foo", None).await?;
1406+
1407+
let offer = offer_pc.create_offer(None).await?;
1408+
let mut offer_gathering_complete = offer_pc.gathering_complete_promise().await;
1409+
offer_pc.set_local_description(offer).await?;
1410+
let _ = offer_gathering_complete.recv().await;
1411+
let mut offer = offer_pc.local_description().await.unwrap();
1412+
1413+
if let Some(remote_max_message_size) = remote_max_message_size {
1414+
offer
1415+
.sdp
1416+
.push_str(format!("a=max-message-size:{}\r\n", remote_max_message_size).as_str());
1417+
}
1418+
1419+
answer_pc.set_remote_description(offer).await?;
1420+
1421+
let answer = answer_pc.create_answer(None).await?;
1422+
1423+
let mut answer_gathering_complete = answer_pc.gathering_complete_promise().await;
1424+
answer_pc.set_local_description(answer).await?;
1425+
let _ = answer_gathering_complete.recv().await;
1426+
1427+
let answer = answer_pc.local_description().await.unwrap();
1428+
offer_pc.set_remote_description(answer).await?;
1429+
1430+
Ok(data_channel_rx.recv().await.unwrap())
1431+
}
1432+
1433+
// 128 KB
1434+
const EXPECTED_MAX_MESSAGE_SIZE: u32 = 131072;
1435+
1436+
#[tokio::test]
1437+
async fn test_data_channel_max_message_size_respected_on_send() -> Result<()> {
1438+
let data_channel = create_data_channel_with_max_message_size(
1439+
Some(EXPECTED_MAX_MESSAGE_SIZE),
1440+
Some(SctpMaxMessageSize::Unbounded),
1441+
)
1442+
.await?;
1443+
1444+
// A buffer with a size greater than the default size of 64KB.
1445+
let buffer = vec![0; 68000];
1446+
let bytes = bytes::Bytes::copy_from_slice(buffer.as_slice());
1447+
data_channel.send(&bytes).await.unwrap();
1448+
1449+
Ok(())
1450+
}
1451+
1452+
#[tokio::test]
1453+
async fn test_given_remote_max_message_size_is_none_when_data_channel_can_send_max_message_size_respected_on_send(
1454+
) -> Result<()> {
1455+
const EXPECTED_CAN_SEND_MAX_MESSAGE_SIZE: u32 = 1024;
1456+
let data_channel = create_data_channel_with_max_message_size(
1457+
None,
1458+
Some(SctpMaxMessageSize::Bounded(
1459+
EXPECTED_CAN_SEND_MAX_MESSAGE_SIZE,
1460+
)),
1461+
)
1462+
.await?;
1463+
1464+
let buffer = vec![0; 65536];
1465+
let bytes = bytes::Bytes::copy_from_slice(buffer.as_slice());
1466+
1467+
let actual = data_channel.send(&bytes).await;
1468+
1469+
assert!(matches!(
1470+
actual,
1471+
Err(Error::Data(data::Error::Sctp(
1472+
sctp::Error::ErrOutboundPacketTooLarge
1473+
)))
1474+
));
1475+
1476+
Ok(())
1477+
}
1478+
1479+
async fn run_data_channel_config_max_message_size(
1480+
remote_max_message_size: Option<u32>,
1481+
can_send_max_message_size: Option<SctpMaxMessageSize>,
1482+
) -> Result<u32> {
1483+
let data_channel = create_data_channel_with_max_message_size(
1484+
remote_max_message_size,
1485+
can_send_max_message_size,
1486+
)
1487+
.await?;
1488+
let data_channel = data_channel.detach().await?;
1489+
Ok(data_channel.config.max_message_size)
1490+
}
1491+
1492+
#[tokio::test]
1493+
async fn test_data_channel_max_message_size_reflected_on_data_channel_config() -> Result<()> {
1494+
assert_eq!(
1495+
run_data_channel_config_max_message_size(
1496+
Some(EXPECTED_MAX_MESSAGE_SIZE),
1497+
Some(SctpMaxMessageSize::Unbounded)
1498+
)
1499+
.await?,
1500+
EXPECTED_MAX_MESSAGE_SIZE
1501+
);
1502+
1503+
Ok(())
1504+
}
1505+
1506+
#[tokio::test]
1507+
async fn test_can_send_max_message_size_unspecified_then_remote_default_value_is_respected(
1508+
) -> Result<()> {
1509+
assert_eq!(
1510+
run_data_channel_config_max_message_size(Some(EXPECTED_MAX_MESSAGE_SIZE), None).await?,
1511+
SctpMaxMessageSize::DEFAULT_MESSAGE_SIZE
1512+
);
1513+
1514+
Ok(())
1515+
}
1516+
1517+
#[tokio::test]
1518+
async fn test_given_can_send_channel_max_message_size_less_than_remote_max_message_size_respect_send_channel_max_message_size(
1519+
) -> Result<()> {
1520+
let remote_max_message_size = 1024;
1521+
let can_send_channel_max_message_size = 256;
1522+
assert_eq!(
1523+
run_data_channel_config_max_message_size(
1524+
Some(remote_max_message_size),
1525+
Some(SctpMaxMessageSize::Bounded(
1526+
can_send_channel_max_message_size
1527+
))
1528+
)
1529+
.await?,
1530+
can_send_channel_max_message_size
1531+
);
1532+
1533+
Ok(())
1534+
}
1535+
1536+
#[tokio::test]
1537+
async fn test_can_send_max_message_size_respected_on_data_channel_config() -> Result<()> {
1538+
let can_send_channel_max_message_size = 1024;
1539+
assert_eq!(
1540+
run_data_channel_config_max_message_size(
1541+
None,
1542+
Some(SctpMaxMessageSize::Bounded(
1543+
can_send_channel_max_message_size
1544+
))
1545+
)
1546+
.await?,
1547+
can_send_channel_max_message_size
1548+
);
1549+
1550+
Ok(())
1551+
}
1552+
1553+
#[tokio::test]
1554+
async fn test_given_no_remote_message_size_or_can_send_max_message_size_max_size_is_65536(
1555+
) -> Result<()> {
1556+
assert_eq!(
1557+
run_data_channel_config_max_message_size(None, None).await?,
1558+
SctpMaxMessageSize::DEFAULT_MESSAGE_SIZE
1559+
);
1560+
1561+
Ok(())
1562+
}
1563+
1564+
#[tokio::test]
1565+
async fn test_respect_default_remote_max_message_size_when_can_send_max_message_size_is_greater_than_default(
1566+
) -> Result<()> {
1567+
assert_eq!(
1568+
run_data_channel_config_max_message_size(None, Some(SctpMaxMessageSize::Bounded(70000)))
1569+
.await?,
1570+
SctpMaxMessageSize::DEFAULT_MESSAGE_SIZE
1571+
);
1572+
1573+
Ok(())
1574+
}
1575+
13781576
struct TestOrtcStack {
13791577
//api *API
13801578
gatherer: Arc<RTCIceGatherer>,

webrtc/src/data_channel/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ impl RTCDataChannel {
173173
label: self.label.clone(),
174174
protocol: self.protocol.clone(),
175175
negotiated: self.negotiated,
176+
max_message_size: association.max_message_size(),
176177
};
177178

178179
if !self.negotiated {

webrtc/src/peer_connection/peer_connection_internal.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::collections::VecDeque;
22
use std::sync::Weak;
33

44
use super::*;
5+
use crate::api::setting_engine::SctpMaxMessageSize;
56
use crate::rtp_transceiver::{create_stream_info, PayloadType};
67
use crate::stats::stats_collector::StatsCollector;
78
use crate::stats::{
@@ -290,7 +291,16 @@ impl PeerConnectionInternal {
290291
if let Some(remote_port) = get_application_media_section_sctp_port(parsed_remote) {
291292
if let Some(local_port) = get_application_media_section_sctp_port(parsed_local)
292293
{
293-
self.start_sctp(local_port, remote_port).await;
294+
// TODO: Reuse the MediaDescription retrieved when looking for the message size.
295+
let max_message_size =
296+
get_application_media_section_max_message_size(parsed_remote)
297+
.unwrap_or(SctpMaxMessageSize::DEFAULT_MESSAGE_SIZE);
298+
self.start_sctp(
299+
local_port,
300+
remote_port,
301+
SCTPTransportCapabilities { max_message_size },
302+
)
303+
.await;
294304
}
295305
}
296306
}
@@ -460,17 +470,16 @@ impl PeerConnectionInternal {
460470
}
461471

462472
/// Start SCTP subsystem
463-
async fn start_sctp(&self, local_port: u16, remote_port: u16) {
473+
async fn start_sctp(
474+
&self,
475+
local_port: u16,
476+
remote_port: u16,
477+
sctp_transport_capabilities: SCTPTransportCapabilities,
478+
) {
464479
// Start sctp
465480
if let Err(err) = self
466481
.sctp_transport
467-
.start(
468-
SCTPTransportCapabilities {
469-
max_message_size: 0,
470-
},
471-
local_port,
472-
remote_port,
473-
)
482+
.start(sctp_transport_capabilities, local_port, remote_port)
474483
.await
475484
{
476485
log::warn!("Failed to start SCTP: {err}");

webrtc/src/peer_connection/sdp/mod.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,6 +1049,15 @@ pub(crate) fn get_application_media_section_sctp_port(desc: &SessionDescription)
10491049
None
10501050
}
10511051

1052+
pub(crate) fn get_application_media_section_max_message_size(
1053+
desc: &SessionDescription,
1054+
) -> Option<u32> {
1055+
get_application_media(desc)?
1056+
.attribute(ATTR_KEY_MAX_MESSAGE_SIZE)??
1057+
.parse()
1058+
.ok()
1059+
}
1060+
10521061
pub(crate) fn get_by_mid<'a>(
10531062
search_mid: &str,
10541063
desc: &'a session_description::RTCSessionDescription,
@@ -1065,18 +1074,17 @@ pub(crate) fn get_by_mid<'a>(
10651074
None
10661075
}
10671076

1077+
pub(crate) fn get_application_media(desc: &SessionDescription) -> Option<&MediaDescription> {
1078+
desc.media_descriptions
1079+
.iter()
1080+
.find(|media_description| media_description.media_name.media == MEDIA_SECTION_APPLICATION)
1081+
}
1082+
10681083
/// have_data_channel return MediaDescription with MediaName equal application
10691084
pub(crate) fn have_data_channel(
10701085
desc: &session_description::RTCSessionDescription,
10711086
) -> Option<&MediaDescription> {
1072-
if let Some(parsed) = &desc.parsed {
1073-
for d in &parsed.media_descriptions {
1074-
if d.media_name.media == MEDIA_SECTION_APPLICATION {
1075-
return Some(d);
1076-
}
1077-
}
1078-
}
1079-
None
1087+
get_application_media(desc.parsed.as_ref()?)
10801088
}
10811089

10821090
pub(crate) fn codecs_from_media_description(

0 commit comments

Comments
 (0)