Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions doc/muxers.texi
Original file line number Diff line number Diff line change
Expand Up @@ -3967,6 +3967,10 @@ Possible values:
The muxer will try to set dtls active role and send the first client hello.
@end table

@item rtp_history @var{integer}
Set the number of RTP history items to store.
Default value is 512.

@item authorization @var{string}
The optional Bearer token for WHIP Authorization.

Expand Down
160 changes: 158 additions & 2 deletions libavformat/whip.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,17 @@
#define WHIP_ICE_CONSENT_CHECK_INTERVAL 5000
#define WHIP_ICE_CONSENT_EXPIRED_TIMER 30000

/**
* RTP history packet size.
* Target is buffering 1000ms of RTP history packets.
*
* bandwidth_bps = (RTP payload bytes) * (RTP history size) * 8
* Assumes average RTP payload is 1184 bytes (MTU - SRTP_CHECKSUM_LEN).
*/
#define WHIP_RTP_HISTORY_MIN 64 /* around 0.61 Mbps */
#define WHIP_RTP_HISTORY_DEFAULT 512 /* around 4.85 Mbps */
#define WHIP_RTP_HISTORY_MAX 2048 /* around 19.40 Mbps */

/* Calculate the elapsed time from starttime to endtime in milliseconds. */
#define ELAPSED(starttime, endtime) ((float)(endtime - starttime) / 1000)

Expand Down Expand Up @@ -214,6 +225,12 @@ typedef enum WHIPFlags {
WHIP_DTLS_ACTIVE = (1 << 0),
} WHIPFlags;

typedef struct RtpHistoryItem {
uint16_t seq;
int size;
uint8_t *buf;
} RtpHistoryItem;

typedef struct WHIPContext {
AVClass *av_class;

Expand Down Expand Up @@ -245,6 +262,8 @@ typedef struct WHIPContext {

uint16_t audio_first_seq;
uint16_t video_first_seq;

uint16_t video_rtx_seq;
/* The PT(Payload Type) of stream, generated by the muxer. */
uint8_t audio_payload_type;
uint8_t video_payload_type;
Expand Down Expand Up @@ -331,6 +350,11 @@ typedef struct WHIPContext {
/* The certificate and private key used for DTLS handshake. */
char* cert_file;
char* key_file;

int hist_sz;
RtpHistoryItem *hist;
uint8_t *hist_pool;
int hist_head;
} WHIPContext;

/**
Expand Down Expand Up @@ -450,6 +474,17 @@ static av_cold int initialize(AVFormatContext *s)
av_log(whip, AV_LOG_WARNING, "pkt_size=%d(<%d) is too small, may cause packet loss\n",
whip->pkt_size, ideal_pkt_size);

whip->hist = av_calloc(whip->hist_sz, sizeof(*whip->hist));
if (!whip->hist)
return AVERROR(ENOMEM);

whip->hist_pool = av_calloc(whip->hist_sz, whip->pkt_size - DTLS_SRTP_CHECKSUM_LEN);
if (!whip->hist_pool)
return AVERROR(ENOMEM);

for (int i = 0; i < whip->hist_sz; i++)
whip->hist[i].buf = whip->hist_pool + i * (whip->pkt_size - DTLS_SRTP_CHECKSUM_LEN);

if (whip->state < WHIP_STATE_INIT)
whip->state = WHIP_STATE_INIT;
whip->whip_init_time = av_gettime_relative();
Expand Down Expand Up @@ -707,7 +742,7 @@ static int generate_sdp_offer(AVFormatContext *s)
"a=rtcp-rsize\r\n"
"a=rtpmap:%u %s/90000\r\n"
"a=fmtp:%u level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=%02x%02x%02x\r\n"
"a=rtcp-fb%u nack\r\n"
"a=rtcp-fb:%u nack\r\n"
"a=rtpmap:%u rtx/90000\r\n"
"a=fmtp:%u apt=%u\r\n"
"a=ssrc-group:FID %u %u\r\n"
Expand Down Expand Up @@ -1475,6 +1510,28 @@ static int setup_srtp(AVFormatContext *s)
return ret;
}

static int rtp_history_store(WHIPContext *whip, const uint8_t *buf, int size)
{
uint16_t seq = AV_RB16(buf + 2);
uint32_t pos = ((uint32_t)seq - (uint32_t)whip->video_first_seq) % (uint32_t)whip->hist_sz;
RtpHistoryItem *it = &whip->hist[pos];
if (size > whip->pkt_size - DTLS_SRTP_CHECKSUM_LEN)
return AVERROR_INVALIDDATA;
memcpy(it->buf, buf, size);
it->size = size;
it->seq = seq;

whip->hist_head = ++pos;
return 0;
}

static const RtpHistoryItem *rtp_history_find(WHIPContext *whip, uint16_t seq)
{
uint32_t pos = ((uint32_t)seq - (uint32_t)whip->video_first_seq) % (uint32_t)whip->hist_sz;
const RtpHistoryItem *it = &whip->hist[pos];
return it->seq == seq ? it : NULL;
}

/**
* Callback triggered by the RTP muxer when it creates and sends out an RTP packet.
*
Expand Down Expand Up @@ -1511,6 +1568,12 @@ static int on_rtp_write_packet(void *opaque, const uint8_t *buf, int buf_size)
return 0;
}

if (is_video) {
ret = rtp_history_store(whip, buf, buf_size);
if (ret < 0)
return ret;
}

ret = ffurl_write(whip->udp, whip->buf, cipher_size);
if (ret < 0) {
av_log(whip, AV_LOG_ERROR, "Failed to write packet=%dB, ret=%d\n", cipher_size, ret);
Expand Down Expand Up @@ -1823,12 +1886,74 @@ static av_cold int whip_init(AVFormatContext *s)
return ret;
}

/**
* See https://datatracker.ietf.org/doc/html/rfc4588#section-4
* Create RTX packet and send it out.
*/
static void handle_rtx_packet(AVFormatContext *s, uint16_t seq)
{
int ret = -1;
WHIPContext *whip = s->priv_data;
uint8_t *ori_buf, rtx_buf[MAX_UDP_BUFFER_SIZE] = { 0 };
int ori_size, rtx_size, cipher_size;
uint16_t ori_seq;
const RtpHistoryItem *it = rtp_history_find(whip, seq);
uint16_t latest_seq = whip->hist[(whip->hist_head - 1 + whip->hist_sz) % whip->hist_sz].seq;

if (!it) {
av_log(whip, AV_LOG_DEBUG,
"RTP history packet seq=%"PRIu16" not found, latest seq=%"PRIu16"\n",
seq, latest_seq);
return;
}
av_log(whip, AV_LOG_DEBUG,
"Found RTP history packet for RTX, seq=%"PRIu16", latest seq=%"PRIu16"\n",
seq, latest_seq);

ori_buf = it->buf;
ori_size = it->size;

/* RTX packet format: header + original seq (2 bytes) + payload */
if (ori_size + 2 > sizeof(rtx_buf)) {
av_log(whip, AV_LOG_WARNING, "RTX packet is too large, size=%d\n", ori_size);
goto end;
}

memcpy(rtx_buf, ori_buf, ori_size);
ori_seq = AV_RB16(rtx_buf + 2);

/* rewrite RTX packet header */
rtx_buf[1] = (rtx_buf[1] & 0x80) | whip->video_rtx_payload_type; /* keep M bit */
AV_WB16(rtx_buf + 2, whip->video_rtx_seq++);
AV_WB32(rtx_buf + 8, whip->video_rtx_ssrc);

/* shift payload 2 bytes to write the original seq number */
memmove(rtx_buf + 12 + 2, rtx_buf + 12, ori_size - 12);
AV_WB16(rtx_buf + 12, ori_seq);

rtx_size = ori_size + 2;
cipher_size = ff_srtp_encrypt(&whip->srtp_video_rtx_send,
rtx_buf, rtx_size,
whip->buf, sizeof(whip->buf));
if (cipher_size <= 0) {
av_log(whip, AV_LOG_WARNING,
"Failed to encrypt RTX packet, size=%d, cipher_size=%d\n",
rtx_size, cipher_size);
goto end;
}
ret = ffurl_write(whip->udp, whip->buf, cipher_size);
end:
if (ret < 0)
av_log(whip, AV_LOG_WARNING, "Failed to send RTX packet, skip this one\n");
}

static void handle_nack_rtx(AVFormatContext *s, int size)
{
int ret;
int ret, i = 0;
WHIPContext *whip = s->priv_data;
uint8_t *buf = NULL;
int rtcp_len, srtcp_len, header_len = 12/*RFC 4585 6.1*/;
uint32_t ssrc;

/**
* Refer to RFC 3550 6.4.1
Expand All @@ -1853,6 +1978,34 @@ static void handle_nack_rtx(AVFormatContext *s, int size)
av_log(whip, AV_LOG_WARNING, "NACK packet decrypt failed: %d\n", ret);
goto error;
}
ssrc = AV_RB32(&buf[8]);
if (ssrc != whip->video_ssrc) {
av_log(whip, AV_LOG_DEBUG,
"NACK packet SSRC: %"PRIu32" not match with video track SSRC: %"PRIu32"\n",
ssrc, whip->video_ssrc);
goto end;
}
while (header_len + i + 4 <= rtcp_len) {
/**
* See https://datatracker.ietf.org/doc/html/rfc4585#section-6.1
* Handle multi NACKs in bundled packet.
*/
uint16_t pid = AV_RB16(&buf[12 + i]);
uint16_t blp = AV_RB16(&buf[14 + i]);

handle_rtx_packet(s, pid);
/* retransmit pid + any bit set in blp */
for (int bit = 0; bit < 16; bit++) {
uint16_t seq = pid + bit + 1;
if (!blp)
break;
if (!(blp & (1 << bit)))
continue;

handle_rtx_packet(s, seq);
}
i += 4;
}
goto end;
error:
av_log(whip, AV_LOG_WARNING, "Failed to handle NACK and RTX, Skip...\n");
Expand Down Expand Up @@ -1985,6 +2138,8 @@ static av_cold void whip_deinit(AVFormatContext *s)
s->streams[i]->priv_data = NULL;
}

av_freep(&whip->hist_pool);
av_freep(&whip->hist);
av_freep(&whip->sdp_offer);
av_freep(&whip->sdp_answer);
av_freep(&whip->whip_resource_url);
Expand Down Expand Up @@ -2032,6 +2187,7 @@ static const AVOption options[] = {
{ "ts_buffer_size", "The buffer size, in bytes, of underlying protocol", OFFSET(ts_buffer_size), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, ENC },
{ "whip_flags", "Set flags affecting WHIP connection behavior", OFFSET(flags), AV_OPT_TYPE_FLAGS, { .i64 = 0}, 0, UINT_MAX, ENC, .unit = "flags" },
{ "dtls_active", "Set dtls role as active", 0, AV_OPT_TYPE_CONST, { .i64 = WHIP_DTLS_ACTIVE}, 0, UINT_MAX, ENC, .unit = "flags" },
{ "rtp_history", "The number of RTP history items to store", OFFSET(hist_sz), AV_OPT_TYPE_INT, { .i64 = WHIP_RTP_HISTORY_DEFAULT }, WHIP_RTP_HISTORY_MIN, WHIP_RTP_HISTORY_MAX, ENC },
{ "authorization", "The optional Bearer token for WHIP Authorization", OFFSET(authorization), AV_OPT_TYPE_STRING, { .str = NULL }, 0, 0, ENC },
{ "cert_file", "The optional certificate file path for DTLS", OFFSET(cert_file), AV_OPT_TYPE_STRING, { .str = NULL }, 0, 0, ENC },
{ "key_file", "The optional private key file path for DTLS", OFFSET(key_file), AV_OPT_TYPE_STRING, { .str = NULL }, 0, 0, ENC },
Expand Down
Loading