Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 27 additions & 175 deletions rtmp/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::collections::HashMap;

use tracing::{debug, warn};

use crate::{
RtmpConnectionError, RtmpEvent, RtmpMessageParseError,
amf0::AmfValue,
RtmpConnectionError, RtmpEvent, RtmpMessageSerializeError,
client::negotiation::{
NegotiationProgress, connect_response_supports_enhanced, send_connect, send_create_stream,
send_publish,
},
error::RtmpStreamError,
message::{
AudioMessage, CONTROL_MESSAGE_STREAM_ID, CommandMessage, CommandMessageConnectSuccess,
CommandMessageCreateStreamSuccess, CommandMessageResultExt, DataMessage, RtmpMessage,
AudioMessage, CONTROL_MESSAGE_STREAM_ID, CommandMessage, DataMessage, RtmpMessage,
UserControlMessage, VideoMessage,
},
protocol::{
Expand All @@ -18,8 +18,7 @@ use crate::{
utils::ShutdownCondition,
};

const CONNECT_TRANSACTION_ID: u32 = 1;
const CREATE_STREAM_TRANSACTION_ID: u32 = 2;
mod negotiation;

pub struct RtmpClientConfig {
pub host: String,
Expand All @@ -42,6 +41,7 @@ struct RtmpClientState {
window_size: Option<u64>,
/// last ack sent to client
last_ack: u64,
peer_supports_enhanced: bool,
}

impl RtmpClient {
Expand All @@ -62,6 +62,7 @@ impl RtmpClient {
stream: RtmpMessageStream::new(socket),
window_size: None,
last_ack: 0,
peer_supports_enhanced: false,
};

let stream_id = state.negotiate_connection(&config.app, &config.stream_key)?;
Expand Down Expand Up @@ -99,10 +100,24 @@ impl RtmpClient {
audio: AudioMessage::Unknown(audio),
stream_id: self.stream_id,
},
RtmpEvent::UnknownVideoData(video) => RtmpMessage::Video {
video: VideoMessage::Unknown(video),
RtmpEvent::LegacyVideoData(video) => RtmpMessage::Video {
video: VideoMessage::Legacy(video),
stream_id: self.stream_id,
},
RtmpEvent::EnhancedVideoData(video) => {
// Enhanced RTMP video requires explicit connect capability signaling.
if !self.state.peer_supports_enhanced {
return Err(RtmpMessageSerializeError::InternalError(
"Peer did not negotiate Enhanced RTMP video support".into(),
)
.into());
}

RtmpMessage::Video {
video: VideoMessage::Enhanced(video),
stream_id: self.stream_id,
}
}
RtmpEvent::Metadata(metadata) => RtmpMessage::DataMessage {
data: DataMessage::OnMetaData(metadata),
stream_id: self.stream_id,
Expand Down Expand Up @@ -150,7 +165,8 @@ impl RtmpClientState {
Err(err) => return Err(err.into()),
};

if let Some(_response) = state.try_match_connect_response(&msg)? {
if let Some(response) = state.try_match_connect_response(&msg)? {
self.peer_supports_enhanced = connect_response_supports_enhanced(&response);
state = NegotiationProgress::WaitingForCreateStreamResult;
send_create_stream(&mut self.stream)?;
continue;
Expand Down Expand Up @@ -226,167 +242,3 @@ impl RtmpClientState {
Ok(())
}
}

/// -> - from client to server
/// <- - from server to client
///
/// indented steps are not reliable, assume that they can happen at different point or
/// not at all
enum NegotiationProgress {
/// -> connect
/// <- Window Ack size
/// <- Set Peer Bandwidth
/// -> Window Ack Size
/// <- StreamBegin (with stream id 0)
/// <- connect _result
WaitingForConnectResult,

/// -> createStream
/// <- createStream _result
WaitingForCreateStreamResult,

/// -> publish
/// <- StreamBegin (with real stream id)
/// -> DataMessage (metadata) TODO
/// -> SetChunkSize TODO
/// <- onStatus
WaitingForOnStatus { stream_id: u32 },
}

impl NegotiationProgress {
fn try_match_connect_response(
&self,
msg: &RtmpMessage,
) -> Result<Option<CommandMessageConnectSuccess>, RtmpConnectionError> {
let NegotiationProgress::WaitingForConnectResult = self else {
return Ok(None);
};

let RtmpMessage::CommandMessage { msg, .. } = msg else {
return Ok(None);
};
let CommandMessage::Result(result) = msg else {
return Ok(None);
};

if result.transaction_id() != CONNECT_TRANSACTION_ID {
return Ok(None);
}

match result {
Ok(result) => {
let connect_success = result
.to_connect_success()
.map_err(RtmpMessageParseError::CommandMessage)
.map_err(RtmpStreamError::ParseMessage)?;
Ok(Some(connect_success))
}
Err(err) => Err(RtmpConnectionError::ErrorOnConnect(format!("{err:?}"))),
}
}

fn try_match_create_stream_response(
&self,
msg: &RtmpMessage,
) -> Result<Option<CommandMessageCreateStreamSuccess>, RtmpConnectionError> {
let NegotiationProgress::WaitingForCreateStreamResult = self else {
return Ok(None);
};

let RtmpMessage::CommandMessage { msg, .. } = msg else {
return Ok(None);
};
let CommandMessage::Result(result) = msg else {
return Ok(None);
};

if result.transaction_id() != CREATE_STREAM_TRANSACTION_ID {
return Ok(None);
}

match result {
Ok(result) => {
let create_stream_success = result
.to_create_stream_success()
.map_err(RtmpMessageParseError::CommandMessage)
.map_err(RtmpStreamError::ParseMessage)?;
Ok(Some(create_stream_success))
}
Err(err) => Err(RtmpConnectionError::ErrorOnCreateStream(format!("{err:?}"))),
}
}

fn try_match_on_status(&self, msg: &RtmpMessage) -> Option<(AmfValue, u32)> {
let NegotiationProgress::WaitingForOnStatus { stream_id } = self else {
return None;
};

let RtmpMessage::CommandMessage {
msg: CommandMessage::OnStatus(status),
stream_id: on_status_stream_id,
} = msg
else {
return None;
};

if on_status_stream_id != stream_id {
return None;
}
Some((status.clone(), *stream_id))
}
}

fn send_connect(stream: &mut RtmpMessageStream, app: &str) -> Result<(), RtmpConnectionError> {
let props = HashMap::from_iter(
[
("app", app.into()),
("flashVer", "FMS/3,0,1,123".into()),
// True if proxy is being used
("fpad", AmfValue::Boolean(false)),
// TODO: add config option
("audioCodecs", AmfValue::Number(0x0FFF as f64)), // all RTMP supported
// TODO: add config option
("videoCodecs", AmfValue::Number(0x00FF as f64)), // all RTMP supported
("videoFunction", AmfValue::Number(0.0)),
("objectEncoding", AmfValue::Number(0.0)),
]
.into_iter()
.map(|(k, v)| (k.into(), v)),
);

stream.write_msg(RtmpMessage::CommandMessage {
msg: CommandMessage::Connect {
transaction_id: CONNECT_TRANSACTION_ID,
command_object: props,
optional_args: None,
},
stream_id: CONTROL_MESSAGE_STREAM_ID,
})?;
Ok(())
}

fn send_create_stream(stream: &mut RtmpMessageStream) -> Result<(), RtmpConnectionError> {
stream.write_msg(RtmpMessage::CommandMessage {
msg: CommandMessage::CreateStream {
transaction_id: CREATE_STREAM_TRANSACTION_ID,
command_object: AmfValue::Null,
},
stream_id: CONTROL_MESSAGE_STREAM_ID,
})?;
Ok(())
}

fn send_publish(
stream: &mut RtmpMessageStream,
stream_key: &str,
stream_id: u32,
) -> Result<(), RtmpConnectionError> {
stream.write_msg(RtmpMessage::CommandMessage {
msg: CommandMessage::Publish {
stream_key: stream_key.to_string(),
publishing_type: "live".to_string(),
},
stream_id,
})?;
Ok(())
}
Loading
Loading