Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions rtmp/src/message/aggregate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use bytes::Buf;

use crate::{RtmpMessageParseError, message::RtmpMessage, protocol::RawMessage};

/// Parse an Aggregate message (type 22).
///
/// The payload is a sequence of FLV-style tagged sub-messages:
/// 1 byte – message type ID
/// 3 bytes – payload length (big-endian)
///
/// According to RTMP spec it should be in big endian order
/// FFmpeg is parsing this according to E.4.1 FLV spec
/// 3 bytes – timestamp low 24 bits (big-endian)
/// 1 byte – timestamp high 8 bits (extends to 32-bit timestamp)
///
/// 3 bytes – stream ID (little-endian, per FLV spec; typically ignored)
/// N bytes – sub-message payload
/// 4 bytes – back pointer (size of the entire preceding entry incl. header)
///
/// Sub-messages inherit the aggregate message's `stream_id`. Offset is calculated based on first
/// timestamp and added to each timestamp value
pub fn parse_aggregate_message(msg: RawMessage) -> Result<Vec<RtmpMessage>, RtmpMessageParseError> {
let mut messages = Vec::new();
let mut offset = None;

let mut payload = msg.payload;

while payload.len() >= 11 {
let header = payload.split_to(11);
let type_id = header[0];
// 3-byte big-endian payload length
let data_size = u32::from_be_bytes([0, header[1], header[2], header[3]]) as usize;
// 4-byte timestamp: 3 low bytes then 1 high byte
// TODO: verify update description
let timestamp = u32::from_be_bytes([header[7], header[4], header[5], header[6]]);
// 3-byte stream ID (little-endian) – we use the aggregate's stream_id instead.

let offset = *offset.get_or_insert_with(|| timestamp.saturating_sub(msg.timestamp));

if payload.len() < data_size + 4 {
return Err(RtmpMessageParseError::PayloadTooShort);
}
let msg_payload = payload.split_to(data_size);
let _back_pointer = payload.get_u32();

let raw = RawMessage {
msg_type: type_id,
stream_id: msg.stream_id,
timestamp: timestamp + offset,
payload: msg_payload,
chunk_stream_id: msg.chunk_stream_id,
};

messages.push(RtmpMessage::from_raw(raw)?)
}

Ok(messages)
}
7 changes: 6 additions & 1 deletion rtmp/src/message/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::{RtmpEvent, amf0::Amf0Value};
use crate::{RtmpEvent, amf0::Amf0Value, message::shared::SharedObject};

mod aggregate;
mod command;
mod event;
mod parse;
mod serialize;
mod shared;
mod user_control;

pub(crate) use command::{
Expand Down Expand Up @@ -65,4 +67,7 @@ pub(crate) enum RtmpMessage {
event: RtmpEvent,
stream_id: u32,
},

SharedObject(SharedObject),
AggregateMessage(Vec<RtmpMessage>),
}
4 changes: 4 additions & 0 deletions rtmp/src/message/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
amf0::decode_amf0_values,
message::{
RtmpMessage,
aggregate::parse_aggregate_message,
command::CommandMessage,
event::{audio_event_from_raw, video_event_from_raw},
user_control::UserControlMessage,
Expand Down Expand Up @@ -90,6 +91,9 @@ impl RtmpMessage {
)));
}
MessageType::UserControl => RtmpMessage::UserControl(UserControlMessage::from_raw(p)?),
MessageType::AggregateMessage => {
RtmpMessage::AggregateMessage(parse_aggregate_message(msg)?)
}
};
Ok(result)
}
Expand Down
81 changes: 81 additions & 0 deletions rtmp/src/message/shared.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use bytes::{Buf, Bytes};

use crate::{AmfDecodingError, RtmpMessageParseError, message::RtmpMessage};

pub(crate) struct SharedObject {
name: String,
version: u32,
persistent: bool,
events: Vec<SharedObjectEvent>,
}

/// A single event entry inside a Shared Object message.
/// `data` is raw bytes whose encoding (AMF0 or AMF3) depends on the parent message type.
#[derive(Debug, Clone)]
pub(crate) struct SharedObjectEvent {
/// Event type code (1 = Use, 2 = Release, 3 = RequestChange, 4 = Change,
/// 5 = Success, 6 = SendMessage, 7 = Status, 8 = ClearData,
/// 9 = DeleteData, 10 = RequestRemove, 11 = UseSuccess).
pub event_type: u8,
pub data: Bytes,
}

impl SharedObject {
/// B
/// 2 bytes – object name length
/// N bytes – object name (UTF-8)
/// 4 bytes – current version
/// 4 bytes – flags (bit 0 = persistent)
/// Repeated until end of payload:
/// 1 byte – event type
/// 4 bytes – event data length
/// N bytes – event data (AMF0 or AMF3 encoded)
fn parse(mut payload: Bytes) -> Result<RtmpMessage, RtmpMessageParseError> {
if payload.len() < 2 {
return Err(RtmpMessageParseError::PayloadTooShort);
}
let name_len = payload.get_u16() as usize;
_ = payload.split_to(2);

if payload.len() < name_len + 8 {
return Err(RtmpMessageParseError::PayloadTooShort);
}
let name_bytes = payload.split_to(name_len);
let name =
String::from_utf8(name_bytes.to_vec()).map_err(|_| AmfDecodingError::InvalidUtf8)?;

let version = payload.get_u32();
let flags = payload.get_u32();
let persistent = flags & 1 != 0;

let mut events = Vec::new();
while payload.len() >= 5 {
let event_type = payload.get_u8();
let event_data_len = payload.get_u32() as usize;
if payload.len() < event_data_len {
break;
}
let data = payload.copy_to_bytes(event_data_len);
events.push(SharedObjectEvent { event_type, data });
}

Ok(RtmpMessage::SetPeerBandwidth)

if amf3 {

Ok(RtmpMessage::SharedObjectAmf3 {
name,
version,
persistent,
events,
})
} else {
Ok(RtmpMessage::SharedObjectAmf0 {
name,
version,
persistent,
events,
})
}
}
}
4 changes: 4 additions & 0 deletions rtmp/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ pub enum MessageType {

CommandMessageAmf3,
CommandMessageAmf0,

AggregateMessage,
}

impl MessageType {
Expand All @@ -51,6 +53,7 @@ impl MessageType {
17 => Ok(MessageType::CommandMessageAmf3),
18 => Ok(MessageType::DataMessageAmf0),
20 => Ok(MessageType::CommandMessageAmf0),
22 => Ok(MessageType::AggregateMessage),
_ => Err(RtmpMessageParseError::InvalidMessageType(value)),
}
}
Expand All @@ -69,6 +72,7 @@ impl MessageType {
MessageType::CommandMessageAmf3 => 17,
MessageType::DataMessageAmf0 => 18,
MessageType::CommandMessageAmf0 => 20,
MessageType::AggregateMessage => 22,
}
}
}
Loading