diff --git a/doc/muxers.texi b/doc/muxers.texi index e1f737b1d9d16..643981fc20ff9 100644 --- a/doc/muxers.texi +++ b/doc/muxers.texi @@ -3967,6 +3967,10 @@ Possible values: The muxer will try to set dtls active role and send the first client hello. @end table +@item rtp_history @var{integer} +Set the number of RTP history items to store. +Default value is 512. + @item authorization @var{string} The optional Bearer token for WHIP Authorization. diff --git a/libavformat/whip.c b/libavformat/whip.c index 8aed0c31e5493..d2cfbdb0ee0ed 100644 --- a/libavformat/whip.c +++ b/libavformat/whip.c @@ -167,6 +167,17 @@ #define WHIP_ICE_CONSENT_CHECK_INTERVAL 5000 #define WHIP_ICE_CONSENT_EXPIRED_TIMER 30000 +/** + * RTP history packet size. + * Target is buffering 1000ms of RTP history packets. + * + * bandwidth_bps = (RTP payload bytes) * (RTP history size) * 8 + * Assumes average RTP payload is 1184 bytes (MTU - SRTP_CHECKSUM_LEN). + */ +#define WHIP_RTP_HISTORY_MIN 64 /* around 0.61 Mbps */ +#define WHIP_RTP_HISTORY_DEFAULT 512 /* around 4.85 Mbps */ +#define WHIP_RTP_HISTORY_MAX 2048 /* around 19.40 Mbps */ + /* Calculate the elapsed time from starttime to endtime in milliseconds. */ #define ELAPSED(starttime, endtime) ((float)(endtime - starttime) / 1000) @@ -214,6 +225,12 @@ typedef enum WHIPFlags { WHIP_DTLS_ACTIVE = (1 << 0), } WHIPFlags; +typedef struct RtpHistoryItem { + uint16_t seq; + int size; + uint8_t *buf; +} RtpHistoryItem; + typedef struct WHIPContext { AVClass *av_class; @@ -245,6 +262,8 @@ typedef struct WHIPContext { uint16_t audio_first_seq; uint16_t video_first_seq; + + uint16_t video_rtx_seq; /* The PT(Payload Type) of stream, generated by the muxer. */ uint8_t audio_payload_type; uint8_t video_payload_type; @@ -331,6 +350,11 @@ typedef struct WHIPContext { /* The certificate and private key used for DTLS handshake. */ char* cert_file; char* key_file; + + int hist_sz; + RtpHistoryItem *hist; + uint8_t *hist_pool; + int hist_head; } WHIPContext; /** @@ -450,6 +474,17 @@ static av_cold int initialize(AVFormatContext *s) av_log(whip, AV_LOG_WARNING, "pkt_size=%d(<%d) is too small, may cause packet loss\n", whip->pkt_size, ideal_pkt_size); + whip->hist = av_calloc(whip->hist_sz, sizeof(*whip->hist)); + if (!whip->hist) + return AVERROR(ENOMEM); + + whip->hist_pool = av_calloc(whip->hist_sz, whip->pkt_size - DTLS_SRTP_CHECKSUM_LEN); + if (!whip->hist_pool) + return AVERROR(ENOMEM); + + for (int i = 0; i < whip->hist_sz; i++) + whip->hist[i].buf = whip->hist_pool + i * (whip->pkt_size - DTLS_SRTP_CHECKSUM_LEN); + if (whip->state < WHIP_STATE_INIT) whip->state = WHIP_STATE_INIT; whip->whip_init_time = av_gettime_relative(); @@ -707,7 +742,7 @@ static int generate_sdp_offer(AVFormatContext *s) "a=rtcp-rsize\r\n" "a=rtpmap:%u %s/90000\r\n" "a=fmtp:%u level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=%02x%02x%02x\r\n" - "a=rtcp-fb%u nack\r\n" + "a=rtcp-fb:%u nack\r\n" "a=rtpmap:%u rtx/90000\r\n" "a=fmtp:%u apt=%u\r\n" "a=ssrc-group:FID %u %u\r\n" @@ -1475,6 +1510,28 @@ static int setup_srtp(AVFormatContext *s) return ret; } +static int rtp_history_store(WHIPContext *whip, const uint8_t *buf, int size) +{ + uint16_t seq = AV_RB16(buf + 2); + uint32_t pos = ((uint32_t)seq - (uint32_t)whip->video_first_seq) % (uint32_t)whip->hist_sz; + RtpHistoryItem *it = &whip->hist[pos]; + if (size > whip->pkt_size - DTLS_SRTP_CHECKSUM_LEN) + return AVERROR_INVALIDDATA; + memcpy(it->buf, buf, size); + it->size = size; + it->seq = seq; + + whip->hist_head = ++pos; + return 0; +} + +static const RtpHistoryItem *rtp_history_find(WHIPContext *whip, uint16_t seq) +{ + uint32_t pos = ((uint32_t)seq - (uint32_t)whip->video_first_seq) % (uint32_t)whip->hist_sz; + const RtpHistoryItem *it = &whip->hist[pos]; + return it->seq == seq ? it : NULL; +} + /** * Callback triggered by the RTP muxer when it creates and sends out an RTP packet. * @@ -1511,6 +1568,12 @@ static int on_rtp_write_packet(void *opaque, const uint8_t *buf, int buf_size) return 0; } + if (is_video) { + ret = rtp_history_store(whip, buf, buf_size); + if (ret < 0) + return ret; + } + ret = ffurl_write(whip->udp, whip->buf, cipher_size); if (ret < 0) { av_log(whip, AV_LOG_ERROR, "Failed to write packet=%dB, ret=%d\n", cipher_size, ret); @@ -1823,12 +1886,74 @@ static av_cold int whip_init(AVFormatContext *s) return ret; } +/** + * See https://datatracker.ietf.org/doc/html/rfc4588#section-4 + * Create RTX packet and send it out. + */ +static void handle_rtx_packet(AVFormatContext *s, uint16_t seq) +{ + int ret = -1; + WHIPContext *whip = s->priv_data; + uint8_t *ori_buf, rtx_buf[MAX_UDP_BUFFER_SIZE] = { 0 }; + int ori_size, rtx_size, cipher_size; + uint16_t ori_seq; + const RtpHistoryItem *it = rtp_history_find(whip, seq); + uint16_t latest_seq = whip->hist[(whip->hist_head - 1 + whip->hist_sz) % whip->hist_sz].seq; + + if (!it) { + av_log(whip, AV_LOG_DEBUG, + "RTP history packet seq=%"PRIu16" not found, latest seq=%"PRIu16"\n", + seq, latest_seq); + return; + } + av_log(whip, AV_LOG_DEBUG, + "Found RTP history packet for RTX, seq=%"PRIu16", latest seq=%"PRIu16"\n", + seq, latest_seq); + + ori_buf = it->buf; + ori_size = it->size; + + /* RTX packet format: header + original seq (2 bytes) + payload */ + if (ori_size + 2 > sizeof(rtx_buf)) { + av_log(whip, AV_LOG_WARNING, "RTX packet is too large, size=%d\n", ori_size); + goto end; + } + + memcpy(rtx_buf, ori_buf, ori_size); + ori_seq = AV_RB16(rtx_buf + 2); + + /* rewrite RTX packet header */ + rtx_buf[1] = (rtx_buf[1] & 0x80) | whip->video_rtx_payload_type; /* keep M bit */ + AV_WB16(rtx_buf + 2, whip->video_rtx_seq++); + AV_WB32(rtx_buf + 8, whip->video_rtx_ssrc); + + /* shift payload 2 bytes to write the original seq number */ + memmove(rtx_buf + 12 + 2, rtx_buf + 12, ori_size - 12); + AV_WB16(rtx_buf + 12, ori_seq); + + rtx_size = ori_size + 2; + cipher_size = ff_srtp_encrypt(&whip->srtp_video_rtx_send, + rtx_buf, rtx_size, + whip->buf, sizeof(whip->buf)); + if (cipher_size <= 0) { + av_log(whip, AV_LOG_WARNING, + "Failed to encrypt RTX packet, size=%d, cipher_size=%d\n", + rtx_size, cipher_size); + goto end; + } + ret = ffurl_write(whip->udp, whip->buf, cipher_size); +end: + if (ret < 0) + av_log(whip, AV_LOG_WARNING, "Failed to send RTX packet, skip this one\n"); +} + static void handle_nack_rtx(AVFormatContext *s, int size) { - int ret; + int ret, i = 0; WHIPContext *whip = s->priv_data; uint8_t *buf = NULL; int rtcp_len, srtcp_len, header_len = 12/*RFC 4585 6.1*/; + uint32_t ssrc; /** * Refer to RFC 3550 6.4.1 @@ -1853,6 +1978,34 @@ static void handle_nack_rtx(AVFormatContext *s, int size) av_log(whip, AV_LOG_WARNING, "NACK packet decrypt failed: %d\n", ret); goto error; } + ssrc = AV_RB32(&buf[8]); + if (ssrc != whip->video_ssrc) { + av_log(whip, AV_LOG_DEBUG, + "NACK packet SSRC: %"PRIu32" not match with video track SSRC: %"PRIu32"\n", + ssrc, whip->video_ssrc); + goto end; + } + while (header_len + i + 4 <= rtcp_len) { + /** + * See https://datatracker.ietf.org/doc/html/rfc4585#section-6.1 + * Handle multi NACKs in bundled packet. + */ + uint16_t pid = AV_RB16(&buf[12 + i]); + uint16_t blp = AV_RB16(&buf[14 + i]); + + handle_rtx_packet(s, pid); + /* retransmit pid + any bit set in blp */ + for (int bit = 0; bit < 16; bit++) { + uint16_t seq = pid + bit + 1; + if (!blp) + break; + if (!(blp & (1 << bit))) + continue; + + handle_rtx_packet(s, seq); + } + i += 4; + } goto end; error: av_log(whip, AV_LOG_WARNING, "Failed to handle NACK and RTX, Skip...\n"); @@ -1985,6 +2138,8 @@ static av_cold void whip_deinit(AVFormatContext *s) s->streams[i]->priv_data = NULL; } + av_freep(&whip->hist_pool); + av_freep(&whip->hist); av_freep(&whip->sdp_offer); av_freep(&whip->sdp_answer); av_freep(&whip->whip_resource_url); @@ -2032,6 +2187,7 @@ static const AVOption options[] = { { "ts_buffer_size", "The buffer size, in bytes, of underlying protocol", OFFSET(ts_buffer_size), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, ENC }, { "whip_flags", "Set flags affecting WHIP connection behavior", OFFSET(flags), AV_OPT_TYPE_FLAGS, { .i64 = 0}, 0, UINT_MAX, ENC, .unit = "flags" }, { "dtls_active", "Set dtls role as active", 0, AV_OPT_TYPE_CONST, { .i64 = WHIP_DTLS_ACTIVE}, 0, UINT_MAX, ENC, .unit = "flags" }, + { "rtp_history", "The number of RTP history items to store", OFFSET(hist_sz), AV_OPT_TYPE_INT, { .i64 = WHIP_RTP_HISTORY_DEFAULT }, WHIP_RTP_HISTORY_MIN, WHIP_RTP_HISTORY_MAX, ENC }, { "authorization", "The optional Bearer token for WHIP Authorization", OFFSET(authorization), AV_OPT_TYPE_STRING, { .str = NULL }, 0, 0, ENC }, { "cert_file", "The optional certificate file path for DTLS", OFFSET(cert_file), AV_OPT_TYPE_STRING, { .str = NULL }, 0, 0, ENC }, { "key_file", "The optional private key file path for DTLS", OFFSET(key_file), AV_OPT_TYPE_STRING, { .str = NULL }, 0, 0, ENC },