diff --git a/src/internal_modules/roc_pipeline/config.cpp b/src/internal_modules/roc_pipeline/config.cpp index 8745f0321..71c73934a 100644 --- a/src/internal_modules/roc_pipeline/config.cpp +++ b/src/internal_modules/roc_pipeline/config.cpp @@ -66,7 +66,8 @@ bool ReceiverCommonConfig::deduce_defaults(audio::ProcessorMap& processor_map) { // ReceiverSessionConfig ReceiverSessionConfig::ReceiverSessionConfig() - : payload_type(0) { + : payload_type(0) + , prebuf_len(0) { } bool ReceiverSessionConfig::deduce_defaults(audio::ProcessorMap& processor_map) { @@ -95,12 +96,17 @@ bool ReceiverSessionConfig::deduce_defaults(audio::ProcessorMap& processor_map) return false; } + if (!prebuf_len) { + prebuf_len = latency.target_latency; + } + return true; } // ReceiverSourceConfig -ReceiverSourceConfig::ReceiverSourceConfig() { +ReceiverSourceConfig::ReceiverSourceConfig() + : max_session_packets(0) { } bool ReceiverSourceConfig::deduce_defaults(audio::ProcessorMap& processor_map) { diff --git a/src/internal_modules/roc_pipeline/config.h b/src/internal_modules/roc_pipeline/config.h index a485633f8..dd5c5770a 100644 --- a/src/internal_modules/roc_pipeline/config.h +++ b/src/internal_modules/roc_pipeline/config.h @@ -186,6 +186,9 @@ struct ReceiverSessionConfig { //! Watchdog parameters. audio::WatchdogConfig watchdog; + //! Packet prebuffer length, nanoseconds. + core::nanoseconds_t prebuf_len; + //! Initialize config. ReceiverSessionConfig(); @@ -205,6 +208,9 @@ struct ReceiverSourceConfig { //! Default parameters for a session. ReceiverSessionConfig session_defaults; + //! Maximum number of packets per session. + size_t max_session_packets; + //! Initialize config. ReceiverSourceConfig(); diff --git a/src/internal_modules/roc_pipeline/receiver_session_group.cpp b/src/internal_modules/roc_pipeline/receiver_session_group.cpp index 9d3e9268d..b5b9df8d3 100644 --- a/src/internal_modules/roc_pipeline/receiver_session_group.cpp +++ b/src/internal_modules/roc_pipeline/receiver_session_group.cpp @@ -338,6 +338,9 @@ ReceiverSessionGroup::route_transport_packet_(const packet::PacketPtr& packet) { // Session not found, auto-create session if possible. if (can_create_session_(packet)) { return create_session_(packet); + } else { + // else, put packet in pre session buffer + enqueue_prebuf_packet_(packet); } return status::StatusNoRoute; @@ -434,6 +437,8 @@ ReceiverSessionGroup::create_session_(const packet::PacketPtr& packet) { sessions_.push_back(*sess); state_tracker_.register_session(); + dequeue_prebuf_packet_(*sess); + return status::StatusOK; } @@ -478,5 +483,47 @@ ReceiverSessionGroup::make_session_config_(const packet::PacketPtr& packet) cons return config; } +void ReceiverSessionGroup::enqueue_prebuf_packet_(const packet::PacketPtr& packet) { + prebuf_packets_.push_back(*packet.get()); + + core::nanoseconds_t now = core::timestamp(core::ClockMonotonic); + + while (prebuf_packets_.size() > 0) { + core::nanoseconds_t received = prebuf_packets_.front()->udp()->receive_timestamp; + if (now - received > source_config_.session_defaults.prebuf_len) { + prebuf_packets_.remove(*prebuf_packets_.front()); + } else { + break; + } + } +} + +void ReceiverSessionGroup::dequeue_prebuf_packet_(ReceiverSession& sess) { + packet::PacketPtr curr, next; + + if (prebuf_packets_.size() == 0) { + return; + } + + core::nanoseconds_t now = core::timestamp(core::ClockMonotonic); + + for (curr = prebuf_packets_.front(); curr; curr = next) { + next = prebuf_packets_.nextof(*curr); + + // if packet is too old, remove it from the queue + core::nanoseconds_t received = curr->udp()->receive_timestamp; + if (now - received > source_config_.session_defaults.prebuf_len) { + prebuf_packets_.remove(*curr); + continue; + } + + // if session handles the packet, remove it from the queue + const status::StatusCode code = sess.route_packet(curr); + if (code == status::StatusOK) { + prebuf_packets_.remove(*curr); + } + } +} + } // namespace pipeline } // namespace roc diff --git a/src/internal_modules/roc_pipeline/receiver_session_group.h b/src/internal_modules/roc_pipeline/receiver_session_group.h index b53e31c20..e48058015 100644 --- a/src/internal_modules/roc_pipeline/receiver_session_group.h +++ b/src/internal_modules/roc_pipeline/receiver_session_group.h @@ -19,6 +19,7 @@ #include "roc_core/list.h" #include "roc_core/noncopyable.h" #include "roc_dbgio/csv_dumper.h" +#include "roc_packet/packet.h" #include "roc_packet/packet_factory.h" #include "roc_pipeline/metrics.h" #include "roc_pipeline/receiver_endpoint.h" @@ -131,6 +132,8 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip status::StatusCode route_transport_packet_(const packet::PacketPtr& packet); status::StatusCode route_control_packet_(const packet::PacketPtr& packet, core::nanoseconds_t current_time); + void enqueue_prebuf_packet_(const packet::PacketPtr& packet); + void dequeue_prebuf_packet_(ReceiverSession& sess); bool can_create_session_(const packet::PacketPtr& packet); @@ -165,6 +168,8 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip dbgio::CsvDumper* dumper_; + core::List prebuf_packets_; + status::StatusCode init_status_; }; diff --git a/src/public_api/include/roc/config.h b/src/public_api/include/roc/config.h index 1c27cb836..b202ff7d3 100644 --- a/src/public_api/include/roc/config.h +++ b/src/public_api/include/roc/config.h @@ -1090,6 +1090,14 @@ typedef struct roc_receiver_config { * If zero, default value is used. If negative, the check is disabled. */ long long choppy_playback_timeout; + + /** Packet prebuffer length, in nanoseconds. + * Packets received for sessions that have not yet been created + * will be buffered. Any packets older than the prebuf_len + * will be discarded. + * If zero, default value is used. + */ + unsigned long long prebuf_len; } roc_receiver_config; /** Interface configuration. diff --git a/src/tests/roc_pipeline/test_receiver_source.cpp b/src/tests/roc_pipeline/test_receiver_source.cpp index 909cce0b9..46608704d 100644 --- a/src/tests/roc_pipeline/test_receiver_source.cpp +++ b/src/tests/roc_pipeline/test_receiver_source.cpp @@ -6,6 +6,8 @@ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +#include "roc_core/log.h" +#include "roc_fec/block_writer.h" #include "test_harness.h" #include "test_helpers/control_reader.h" #include "test_helpers/control_writer.h" @@ -20,6 +22,7 @@ #include "roc_pipeline/receiver_source.h" #include "roc_rtp/encoding_map.h" #include "roc_stat/mov_min_max.h" +#include // This file contains tests for ReceiverSource. ReceiverSource can be seen as a big // composite processor (consisting of chained smaller processors) that transforms @@ -3399,6 +3402,46 @@ TEST(receiver_source, timestamp_mapping_remixing) { CHECK(first_ts); } +TEST(receiver_source, packet_buffer) { + if (!fec_supported()) { + TEST_SKIP(); + } + + init_with_defaults(); + + ReceiverSource receiver(make_default_config(), processor_map, encoding_map, + packet_pool, packet_buffer_pool, frame_pool, + frame_buffer_pool, arena); + LONGS_EQUAL(status::StatusOK, receiver.init_status()); + + ReceiverSlot* slot = create_slot(receiver); + + packet::FifoQueue source_queue; + packet::FifoQueue repair_queue; + + packet::IWriter* source_endpoint_writer = create_transport_endpoint( + slot, address::Iface_AudioSource, address::Proto_RTP_RS8M_Source, dst_addr1); + + packet::IWriter* repair_endpoint_writer = + create_control_endpoint(slot, address::Iface_AudioRepair, + address::Proto_RS8M_Repair, dst_addr2, repair_queue); + + test::PacketWriter packet_writer(arena, *source_endpoint_writer, + *repair_endpoint_writer, encoding_map, + packet_factory, src_id1, src_addr1, dst_addr1, + dst_addr2, PayloadType_Ch1, fec_scheme, fec_config); + + // setup reader + test::FrameReader frame_reader(receiver, frame_factory); + + packet_writer.write_packets(Latency / SamplesPerPacket, SamplesPerPacket, + packet_sample_spec); + + refresh_source(receiver, frame_reader.refresh_ts()); + frame_reader.read_nonzero_samples(SamplesPerFrame, output_sample_spec); + UNSIGNED_LONGS_EQUAL(1, receiver.num_sessions()); +} + // Set high jitter, wait until latency increases and stabilizes. TEST(receiver_source, adaptive_latency_increase) { const size_t stabilization_window = JitterMeterWindow * 5; diff --git a/src/tools/roc_recv/cmdline.ggo b/src/tools/roc_recv/cmdline.ggo index ceb411e14..4c5e197ad 100644 --- a/src/tools/roc_recv/cmdline.ggo +++ b/src/tools/roc_recv/cmdline.ggo @@ -76,6 +76,8 @@ section "Timeout options" option "no-play-timeout" - "No-playback timeout, TIME units" typestr="TIME" string optional + option "prebuf-len" - "Length of packet prebuffer, TIME units" + string optional option "choppy-play-timeout" - "Choppy playback timeout, TIME units" typestr="TIME" string optional diff --git a/src/tools/roc_recv/main.cpp b/src/tools/roc_recv/main.cpp index 91cafc290..acf8a8bf9 100644 --- a/src/tools/roc_recv/main.cpp +++ b/src/tools/roc_recv/main.cpp @@ -8,7 +8,6 @@ #include "roc_address/io_uri.h" #include "roc_address/network_uri.h" -#include "roc_address/protocol_map.h" #include "roc_core/crash_handler.h" #include "roc_core/heap_arena.h" #include "roc_core/log.h" @@ -313,6 +312,18 @@ bool build_receiver_config(const gengetopt_args_info& args, } } + if (args.prebuf_len_given) { + if (!core::parse_duration(args.prebuf_len_arg, + receiver_config.session_defaults.prebuf_len)) { + roc_log(LogError, "invalid --prebuf-len: bad format"); + return false; + } + if (receiver_config.session_defaults.prebuf_len) { + roc_log(LogError, "invalid --prebuf-len: should be > 0"); + return false; + } + } + if (args.choppy_play_timeout_given) { if (!core::parse_duration( args.choppy_play_timeout_arg,