From f58203bba085b516fb0d440531f3ab389b66624a Mon Sep 17 00:00:00 2001 From: Gagan Nagaraj Date: Sat, 21 Sep 2024 12:11:46 -0700 Subject: [PATCH 1/4] move delay control to latency monitors --- .../roc_audio/latency_monitor.cpp | 18 ++++++- .../roc_audio/latency_monitor.h | 7 ++- .../roc_audio/latency_tuner.cpp | 29 ++++++++++- .../roc_audio/latency_tuner.h | 2 + .../roc_packet/delayed_reader.cpp | 52 ++++++++++--------- .../roc_packet/delayed_reader.h | 12 ++--- .../roc_pipeline/receiver_session.cpp | 4 +- 7 files changed, 89 insertions(+), 35 deletions(-) diff --git a/src/internal_modules/roc_audio/latency_monitor.cpp b/src/internal_modules/roc_audio/latency_monitor.cpp index 1df7b8400..8631e7e2e 100644 --- a/src/internal_modules/roc_audio/latency_monitor.cpp +++ b/src/internal_modules/roc_audio/latency_monitor.cpp @@ -12,6 +12,8 @@ #include "roc_core/panic.h" #include "roc_core/stddefs.h" #include "roc_core/time.h" +#include "roc_packet/delayed_reader.h" +#include "roc_packet/ireader.h" #include "roc_rtp/link_meter.h" namespace roc { @@ -24,7 +26,8 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader, ResamplerReader* resampler, const LatencyConfig& config, const SampleSpec& packet_sample_spec, - const SampleSpec& frame_sample_spec) + const SampleSpec& frame_sample_spec, + packet::IReader& pkt_reader) : tuner_(config, frame_sample_spec) , frame_reader_(frame_reader) , incoming_queue_(incoming_queue) @@ -46,6 +49,13 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader, return; } } + // frame_reader_ = frame_reader; + delayed_reader_.reset(new (delayed_reader_) packet::DelayedReader( + pkt_reader)); + + if (!delayed_reader_ || !delayed_reader_->is_valid()) { + return; + } valid_ = true; } @@ -116,6 +126,12 @@ bool LatencyMonitor::pre_process_(const Frame& frame) { } } + if (!delayed_reader_->is_started()) { + if (tuner_.can_start()) { + delayed_reader_->start(); + } + } + return true; } diff --git a/src/internal_modules/roc_audio/latency_monitor.h b/src/internal_modules/roc_audio/latency_monitor.h index 606ff8aa2..3a4a50aa4 100644 --- a/src/internal_modules/roc_audio/latency_monitor.h +++ b/src/internal_modules/roc_audio/latency_monitor.h @@ -22,7 +22,9 @@ #include "roc_core/noncopyable.h" #include "roc_core/optional.h" #include "roc_core/time.h" +#include "roc_packet/delayed_reader.h" #include "roc_packet/ilink_meter.h" +#include "roc_packet/ireader.h" #include "roc_packet/sorted_queue.h" #include "roc_packet/units.h" @@ -67,7 +69,8 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> { ResamplerReader* resampler, const LatencyConfig& config, const SampleSpec& packet_sample_spec, - const SampleSpec& frame_sample_spec); + const SampleSpec& frame_sample_spec, + packet::IReader& pkt_reader); //! Check if the object was initialized successfully. bool is_valid() const; @@ -123,6 +126,8 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> { bool alive_; bool valid_; + + core::Optional delayed_reader_; }; } // namespace audio diff --git a/src/internal_modules/roc_audio/latency_tuner.cpp b/src/internal_modules/roc_audio/latency_tuner.cpp index 761aed1e7..cba1909c3 100644 --- a/src/internal_modules/roc_audio/latency_tuner.cpp +++ b/src/internal_modules/roc_audio/latency_tuner.cpp @@ -309,7 +309,7 @@ bool LatencyTuner::update_stream() { default: break; } - + if (enable_bounds_) { if (!check_bounds_(latency)) { return false; @@ -463,5 +463,32 @@ const char* latency_tuner_profile_to_str(LatencyTunerProfile profile) { return ""; } +bool LatencyTuner::can_start() const { + roc_panic_if(!is_valid()); + + packet::stream_timestamp_diff_t latency = 0; + + switch (backend_) { + case audio::LatencyTunerBackend_Niq: + if (!has_niq_latency_) { + return true; + } + latency = niq_latency_; + break; + + case audio::LatencyTunerBackend_E2e: + if (!has_e2e_latency_) { + return true; + } + latency = e2e_latency_; + break; + + default: + break; + } + + return latency >= target_latency_; +} + } // namespace audio } // namespace roc diff --git a/src/internal_modules/roc_audio/latency_tuner.h b/src/internal_modules/roc_audio/latency_tuner.h index bcc8688ab..7aaa55221 100644 --- a/src/internal_modules/roc_audio/latency_tuner.h +++ b/src/internal_modules/roc_audio/latency_tuner.h @@ -199,6 +199,8 @@ class LatencyTuner : public core::NonCopyable<> { //! Returned value is close to 1.0. float fetch_scaling(); + bool can_start() const; + private: bool check_bounds_(packet::stream_timestamp_diff_t latency); void compute_scaling_(packet::stream_timestamp_diff_t latency); diff --git a/src/internal_modules/roc_packet/delayed_reader.cpp b/src/internal_modules/roc_packet/delayed_reader.cpp index 92c2f0f35..087fd2020 100644 --- a/src/internal_modules/roc_packet/delayed_reader.cpp +++ b/src/internal_modules/roc_packet/delayed_reader.cpp @@ -16,21 +16,17 @@ namespace roc { namespace packet { -DelayedReader::DelayedReader(IReader& reader, - core::nanoseconds_t target_delay, - const audio::SampleSpec& sample_spec) +DelayedReader::DelayedReader(IReader& reader) : reader_(reader) , queue_(0) - , delay_(0) , started_(false) - , sample_spec_(sample_spec) , valid_(false) { - if (target_delay > 0) { - delay_ = sample_spec.ns_2_stream_timestamp(target_delay); - } + // if (target_delay > 0) { + // delay_ = sample_spec.ns_2_stream_timestamp(target_delay); + // } - roc_log(LogDebug, "delayed reader: initializing: delay=%lu(%.3fms)", - (unsigned long)delay_, sample_spec_.stream_timestamp_2_ms(delay_)); + // roc_log(LogDebug, "delayed reader: initializing: delay=%lu(%.3fms)", + // (unsigned long)delay_, sample_spec_.stream_timestamp_2_ms(delay_)); valid_ = true; } @@ -75,16 +71,16 @@ status::StatusCode DelayedReader::fetch_packets_() { } const stream_timestamp_t qs = queue_size_(); - if (qs < delay_) { + if (!is_started()) { return status::StatusNoData; } - roc_log(LogDebug, - "delayed reader: initial queue:" - " delay=%lu(%.3fms) queue=%lu(%.3fms) packets=%lu", - (unsigned long)delay_, sample_spec_.stream_timestamp_2_ms(delay_), - (unsigned long)qs, sample_spec_.stream_timestamp_2_ms(qs), - (unsigned long)queue_.size()); + // roc_log(LogDebug, + // "delayed reader: initial queue:" + // " delay=%lu(%.3fms) queue=%lu(%.3fms) packets=%lu", + // (unsigned long)delay_, sample_spec_.stream_timestamp_2_ms(delay_), + // (unsigned long)qs, sample_spec_.stream_timestamp_2_ms(qs), + // (unsigned long)queue_.size()); return status::StatusOK; } @@ -99,7 +95,7 @@ status::StatusCode DelayedReader::read_queued_packet_(PacketPtr& pp) { } const stream_timestamp_t new_qs = queue_size_(); - if (new_qs < delay_) { + if (!is_started()) { break; } @@ -107,12 +103,12 @@ status::StatusCode DelayedReader::read_queued_packet_(PacketPtr& pp) { } if (qs != 0) { - roc_log(LogDebug, - "delayed reader: trimmed queue:" - " delay=%lu(%.3fms) queue=%lu(%.3fms) packets=%lu", - (unsigned long)delay_, sample_spec_.stream_timestamp_2_ms(delay_), - (unsigned long)qs, sample_spec_.stream_timestamp_2_ms(qs), - (unsigned long)(queue_.size() + 1)); + // roc_log(LogDebug, + // "delayed reader: trimmed queue:" + // " delay=%lu(%.3fms) queue=%lu(%.3fms) packets=%lu", + // (unsigned long)delay_, sample_spec_.stream_timestamp_2_ms(delay_), + // (unsigned long)qs, sample_spec_.stream_timestamp_2_ms(qs), + // (unsigned long)(queue_.size() + 1)); } return status::StatusOK; @@ -136,5 +132,13 @@ stream_timestamp_t DelayedReader::queue_size_() const { return (stream_timestamp_t)qs; } +void DelayedReader::start() { + started_ = true; +} + +bool DelayedReader::is_started() const { + return started_; +} + } // namespace packet } // namespace roc diff --git a/src/internal_modules/roc_packet/delayed_reader.h b/src/internal_modules/roc_packet/delayed_reader.h index afc5d3bf8..792b530ea 100644 --- a/src/internal_modules/roc_packet/delayed_reader.h +++ b/src/internal_modules/roc_packet/delayed_reader.h @@ -33,9 +33,7 @@ class DelayedReader : public IReader, public core::NonCopyable<> { //! - @p reader is used to read packets //! - @p target_delay is the delay to insert before first packet //! - @p sample_spec is the specifications of incoming packets - DelayedReader(IReader& reader, - core::nanoseconds_t target_delay, - const audio::SampleSpec& sample_spec); + DelayedReader(IReader& reader); //! Check if object was constructed successfully. bool is_valid() const; @@ -43,6 +41,11 @@ class DelayedReader : public IReader, public core::NonCopyable<> { //! Read packet. virtual ROC_ATTR_NODISCARD status::StatusCode read(PacketPtr&); + void start(); + + //! Check if object was constructed successfully. + bool is_started() const; + private: status::StatusCode fetch_packets_(); status::StatusCode read_queued_packet_(PacketPtr&); @@ -52,11 +55,8 @@ class DelayedReader : public IReader, public core::NonCopyable<> { IReader& reader_; SortedQueue queue_; - stream_timestamp_t delay_; bool started_; - const audio::SampleSpec sample_spec_; - bool valid_; }; diff --git a/src/internal_modules/roc_pipeline/receiver_session.cpp b/src/internal_modules/roc_pipeline/receiver_session.cpp index 10c88dbe3..f22a0f08a 100644 --- a/src/internal_modules/roc_pipeline/receiver_session.cpp +++ b/src/internal_modules/roc_pipeline/receiver_session.cpp @@ -77,7 +77,7 @@ ReceiverSession::ReceiverSession(const ReceiverSessionConfig& session_config, pkt_reader = filter_.get(); delayed_reader_.reset(new (delayed_reader_) packet::DelayedReader( - *pkt_reader, session_config.latency.target_latency, pkt_encoding->sample_spec)); + *pkt_reader)); if (!delayed_reader_ || !delayed_reader_->is_valid()) { return; } @@ -216,7 +216,7 @@ ReceiverSession::ReceiverSession(const ReceiverSessionConfig& session_config, latency_monitor_.reset(new (latency_monitor_) audio::LatencyMonitor( *frm_reader, *source_queue_, *depacketizer_, *source_meter_, resampler_reader_.get(), session_config.latency, pkt_encoding->sample_spec, - common_config.output_sample_spec)); + common_config.output_sample_spec, *delayed_reader_.get())); if (!latency_monitor_ || !latency_monitor_->is_valid()) { return; } From 91c0e04163bc6c6e4149cfb552ba45281fd4f284 Mon Sep 17 00:00:00 2001 From: Gagan Nagaraj Date: Thu, 26 Sep 2024 21:14:11 -0700 Subject: [PATCH 2/4] re add sample_spec_ --- src/internal_modules/roc_packet/delayed_reader.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/internal_modules/roc_packet/delayed_reader.h b/src/internal_modules/roc_packet/delayed_reader.h index 792b530ea..c255d8409 100644 --- a/src/internal_modules/roc_packet/delayed_reader.h +++ b/src/internal_modules/roc_packet/delayed_reader.h @@ -34,6 +34,8 @@ class DelayedReader : public IReader, public core::NonCopyable<> { //! - @p target_delay is the delay to insert before first packet //! - @p sample_spec is the specifications of incoming packets DelayedReader(IReader& reader); + DelayedReader(IReader& reader, + const audio::SampleSpec& sample_spec); //! Check if object was constructed successfully. bool is_valid() const; @@ -57,6 +59,8 @@ class DelayedReader : public IReader, public core::NonCopyable<> { bool started_; + const audio::SampleSpec sample_spec_; + bool valid_; }; From f91e749d7785a6eeae386fd6936cc33595715707 Mon Sep 17 00:00:00 2001 From: Gagan Nagaraj Date: Thu, 26 Sep 2024 21:16:04 -0700 Subject: [PATCH 3/4] fix re definition --- src/internal_modules/roc_packet/delayed_reader.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/internal_modules/roc_packet/delayed_reader.h b/src/internal_modules/roc_packet/delayed_reader.h index c255d8409..2dbeac4ad 100644 --- a/src/internal_modules/roc_packet/delayed_reader.h +++ b/src/internal_modules/roc_packet/delayed_reader.h @@ -33,7 +33,6 @@ class DelayedReader : public IReader, public core::NonCopyable<> { //! - @p reader is used to read packets //! - @p target_delay is the delay to insert before first packet //! - @p sample_spec is the specifications of incoming packets - DelayedReader(IReader& reader); DelayedReader(IReader& reader, const audio::SampleSpec& sample_spec); From cb809315c9b0738b79004d0142bf8609b47bb651 Mon Sep 17 00:00:00 2001 From: Gagan Nagaraj Date: Mon, 30 Sep 2024 14:48:25 -0700 Subject: [PATCH 4/4] fix delayedReader reference --- .../roc_audio/latency_monitor.cpp | 16 +- .../roc_audio/latency_monitor.h | 4 +- .../roc_packet/delayed_reader.cpp | 36 +- .../roc_packet/delayed_reader.h | 1 - .../roc_pipeline/receiver_session.cpp | 3 +- src/tests/roc_packet/test_delayed_reader.cpp | 438 +++++++++--------- 6 files changed, 245 insertions(+), 253 deletions(-) diff --git a/src/internal_modules/roc_audio/latency_monitor.cpp b/src/internal_modules/roc_audio/latency_monitor.cpp index 8631e7e2e..da5a1fb59 100644 --- a/src/internal_modules/roc_audio/latency_monitor.cpp +++ b/src/internal_modules/roc_audio/latency_monitor.cpp @@ -27,7 +27,7 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader, const LatencyConfig& config, const SampleSpec& packet_sample_spec, const SampleSpec& frame_sample_spec, - packet::IReader& pkt_reader) + packet::DelayedReader& delayed_reader) : tuner_(config, frame_sample_spec) , frame_reader_(frame_reader) , incoming_queue_(incoming_queue) @@ -39,7 +39,8 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader, , packet_sample_spec_(packet_sample_spec) , frame_sample_spec_(frame_sample_spec) , alive_(true) - , valid_(false) { + , valid_(false), + delayed_reader_(delayed_reader) { if (!tuner_.is_valid()) { return; } @@ -49,13 +50,6 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader, return; } } - // frame_reader_ = frame_reader; - delayed_reader_.reset(new (delayed_reader_) packet::DelayedReader( - pkt_reader)); - - if (!delayed_reader_ || !delayed_reader_->is_valid()) { - return; - } valid_ = true; } @@ -126,9 +120,9 @@ bool LatencyMonitor::pre_process_(const Frame& frame) { } } - if (!delayed_reader_->is_started()) { + if (!delayed_reader_.is_started()) { if (tuner_.can_start()) { - delayed_reader_->start(); + delayed_reader_.start(); } } diff --git a/src/internal_modules/roc_audio/latency_monitor.h b/src/internal_modules/roc_audio/latency_monitor.h index 3a4a50aa4..8c577ccab 100644 --- a/src/internal_modules/roc_audio/latency_monitor.h +++ b/src/internal_modules/roc_audio/latency_monitor.h @@ -70,7 +70,7 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> { const LatencyConfig& config, const SampleSpec& packet_sample_spec, const SampleSpec& frame_sample_spec, - packet::IReader& pkt_reader); + packet::DelayedReader& delayed_reader); //! Check if the object was initialized successfully. bool is_valid() const; @@ -127,7 +127,7 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> { bool alive_; bool valid_; - core::Optional delayed_reader_; + packet::DelayedReader& delayed_reader_; }; } // namespace audio diff --git a/src/internal_modules/roc_packet/delayed_reader.cpp b/src/internal_modules/roc_packet/delayed_reader.cpp index 087fd2020..9049d8192 100644 --- a/src/internal_modules/roc_packet/delayed_reader.cpp +++ b/src/internal_modules/roc_packet/delayed_reader.cpp @@ -16,17 +16,16 @@ namespace roc { namespace packet { -DelayedReader::DelayedReader(IReader& reader) +DelayedReader::DelayedReader(IReader& reader, + const audio::SampleSpec& sample_spec) : reader_(reader) , queue_(0) , started_(false) + , sample_spec_(sample_spec) , valid_(false) { - // if (target_delay > 0) { - // delay_ = sample_spec.ns_2_stream_timestamp(target_delay); - // } - // roc_log(LogDebug, "delayed reader: initializing: delay=%lu(%.3fms)", - // (unsigned long)delay_, sample_spec_.stream_timestamp_2_ms(delay_)); + roc_log(LogDebug, "delayed reader: initializing: started=%d", + (bool)started_); valid_ = true; } @@ -37,7 +36,6 @@ bool DelayedReader::is_valid() const { status::StatusCode DelayedReader::read(PacketPtr& ptr) { roc_panic_if(!valid_); - if (!started_) { const status::StatusCode code = fetch_packets_(); if (code != status::StatusOK) { @@ -75,12 +73,12 @@ status::StatusCode DelayedReader::fetch_packets_() { return status::StatusNoData; } - // roc_log(LogDebug, - // "delayed reader: initial queue:" - // " delay=%lu(%.3fms) queue=%lu(%.3fms) packets=%lu", - // (unsigned long)delay_, sample_spec_.stream_timestamp_2_ms(delay_), - // (unsigned long)qs, sample_spec_.stream_timestamp_2_ms(qs), - // (unsigned long)queue_.size()); + roc_log(LogDebug, + "delayed reader: initial queue:" + " started=%d queue=%lu(%.3fms) packets=%lu", + (bool)started_, + (unsigned long)qs, sample_spec_.stream_timestamp_2_ms(qs), + (unsigned long)queue_.size()); return status::StatusOK; } @@ -103,12 +101,12 @@ status::StatusCode DelayedReader::read_queued_packet_(PacketPtr& pp) { } if (qs != 0) { - // roc_log(LogDebug, - // "delayed reader: trimmed queue:" - // " delay=%lu(%.3fms) queue=%lu(%.3fms) packets=%lu", - // (unsigned long)delay_, sample_spec_.stream_timestamp_2_ms(delay_), - // (unsigned long)qs, sample_spec_.stream_timestamp_2_ms(qs), - // (unsigned long)(queue_.size() + 1)); + roc_log(LogDebug, + "delayed reader: trimmed queue:" + " started=%dqueue=%lu(%.3fms) packets=%lu", + (bool)started_, + (unsigned long)qs, sample_spec_.stream_timestamp_2_ms(qs), + (unsigned long)(queue_.size() + 1)); } return status::StatusOK; diff --git a/src/internal_modules/roc_packet/delayed_reader.h b/src/internal_modules/roc_packet/delayed_reader.h index 2dbeac4ad..2e4d0a964 100644 --- a/src/internal_modules/roc_packet/delayed_reader.h +++ b/src/internal_modules/roc_packet/delayed_reader.h @@ -31,7 +31,6 @@ class DelayedReader : public IReader, public core::NonCopyable<> { //! //! @b Parameters //! - @p reader is used to read packets - //! - @p target_delay is the delay to insert before first packet //! - @p sample_spec is the specifications of incoming packets DelayedReader(IReader& reader, const audio::SampleSpec& sample_spec); diff --git a/src/internal_modules/roc_pipeline/receiver_session.cpp b/src/internal_modules/roc_pipeline/receiver_session.cpp index f22a0f08a..8b37a0fd0 100644 --- a/src/internal_modules/roc_pipeline/receiver_session.cpp +++ b/src/internal_modules/roc_pipeline/receiver_session.cpp @@ -77,7 +77,8 @@ ReceiverSession::ReceiverSession(const ReceiverSessionConfig& session_config, pkt_reader = filter_.get(); delayed_reader_.reset(new (delayed_reader_) packet::DelayedReader( - *pkt_reader)); + *pkt_reader, pkt_encoding->sample_spec)); + if (!delayed_reader_ || !delayed_reader_->is_valid()) { return; } diff --git a/src/tests/roc_packet/test_delayed_reader.cpp b/src/tests/roc_packet/test_delayed_reader.cpp index c0d4fd603..a4fe17853 100644 --- a/src/tests/roc_packet/test_delayed_reader.cpp +++ b/src/tests/roc_packet/test_delayed_reader.cpp @@ -1,219 +1,219 @@ -/* - * Copyright (c) 2015 Roc Streaming authors - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ - -#include - -#include "roc_core/heap_arena.h" -#include "roc_core/macro_helpers.h" -#include "roc_packet/delayed_reader.h" -#include "roc_packet/packet_factory.h" -#include "roc_packet/queue.h" -#include "roc_status/status_code.h" - -namespace roc { -namespace packet { - -namespace { - -enum { SampleRate = 1000, NumSamples = 100, NumPackets = 30, MaxBufSize = 100 }; - -const core::nanoseconds_t NsPerSample = core::Second / SampleRate; - -const audio::SampleSpec sample_spec(SampleRate, - audio::Sample_RawFormat, - audio::ChanLayout_Surround, - audio::ChanOrder_Smpte, - audio::ChanMask_Surround_Stereo); - -core::HeapArena arena; -PacketFactory packet_factory(arena, MaxBufSize); - -PacketPtr new_packet(seqnum_t sn) { - PacketPtr packet = packet_factory.new_packet(); - CHECK(packet); - - packet->add_flags(Packet::FlagRTP); - packet->rtp()->seqnum = sn; - packet->rtp()->stream_timestamp = stream_timestamp_t(sn * NumSamples); - - return packet; -} - -class StatusReader : public IReader { -public: - explicit StatusReader(status::StatusCode code) - : code_(code) { - } - - virtual ROC_ATTR_NODISCARD status::StatusCode read(PacketPtr&) { - return code_; - } - -private: - status::StatusCode code_; -}; - -} // namespace - -TEST_GROUP(delayed_reader) {}; - -TEST(delayed_reader, failed_to_read_packet) { - const status::StatusCode codes[] = { - status::StatusUnknown, - status::StatusNoData, - }; - - for (size_t n = 0; n < ROC_ARRAY_SIZE(codes); ++n) { - StatusReader reader(codes[n]); - DelayedReader dr(reader, 0, sample_spec); - CHECK(dr.is_valid()); - - PacketPtr pp; - LONGS_EQUAL(codes[n], dr.read(pp)); - CHECK(!pp); - } -} - -TEST(delayed_reader, no_delay) { - Queue queue; - DelayedReader dr(queue, 0, sample_spec); - CHECK(dr.is_valid()); - - PacketPtr pp; - LONGS_EQUAL(status::StatusNoData, dr.read(pp)); - CHECK(!pp); - - for (seqnum_t n = 0; n < NumPackets; n++) { - PacketPtr wp = new_packet(n); - LONGS_EQUAL(status::StatusOK, queue.write(wp)); - - PacketPtr rp; - LONGS_EQUAL(status::StatusOK, dr.read(rp)); - CHECK(wp == rp); - } -} - -TEST(delayed_reader, delay) { - Queue queue; - DelayedReader dr(queue, NumSamples * (NumPackets - 1) * NsPerSample, sample_spec); - CHECK(dr.is_valid()); - - PacketPtr packets[NumPackets]; - - for (seqnum_t n = 0; n < NumPackets; n++) { - PacketPtr p; - LONGS_EQUAL(status::StatusNoData, dr.read(p)); - CHECK(!p); - - packets[n] = new_packet(n); - LONGS_EQUAL(status::StatusOK, queue.write(packets[n])); - } - - for (seqnum_t n = 0; n < NumPackets; n++) { - PacketPtr p; - LONGS_EQUAL(status::StatusOK, dr.read(p)); - CHECK(p == packets[n]); - } - - PacketPtr pp; - LONGS_EQUAL(status::StatusNoData, dr.read(pp)); - CHECK(!pp); - - for (seqnum_t n = 0; n < NumPackets; n++) { - PacketPtr wp = new_packet(NumPackets + n); - LONGS_EQUAL(status::StatusOK, queue.write(wp)); - - PacketPtr rp; - LONGS_EQUAL(status::StatusOK, dr.read(rp)); - CHECK(wp == rp); - } - - LONGS_EQUAL(status::StatusNoData, dr.read(pp)); - CHECK(!pp); -} - -TEST(delayed_reader, instant) { - Queue queue; - DelayedReader dr(queue, NumSamples * (NumPackets - 1) * NsPerSample, sample_spec); - CHECK(dr.is_valid()); - - PacketPtr packets[NumPackets]; - - for (seqnum_t n = 0; n < NumPackets; n++) { - packets[n] = new_packet(n); - LONGS_EQUAL(status::StatusOK, queue.write(packets[n])); - } - - for (seqnum_t n = 0; n < NumPackets; n++) { - PacketPtr p; - LONGS_EQUAL(status::StatusOK, dr.read(p)); - CHECK(p == packets[n]); - } - - PacketPtr pp; - LONGS_EQUAL(status::StatusNoData, dr.read(pp)); - CHECK(!pp); -} - -TEST(delayed_reader, trim) { - Queue queue; - DelayedReader dr(queue, NumSamples * (NumPackets - 1) * NsPerSample, sample_spec); - CHECK(dr.is_valid()); - - PacketPtr packets[NumPackets * 2]; - - for (seqnum_t n = 0; n < NumPackets * 2; n++) { - packets[n] = new_packet(n); - LONGS_EQUAL(status::StatusOK, queue.write(packets[n])); - } - - for (seqnum_t n = NumPackets; n < NumPackets * 2; n++) { - PacketPtr p; - LONGS_EQUAL(status::StatusOK, dr.read(p)); - CHECK(p == packets[n]); - } - - PacketPtr pp; - LONGS_EQUAL(status::StatusNoData, dr.read(pp)); - CHECK(!pp); -} - -TEST(delayed_reader, late_duplicates) { - Queue queue; - DelayedReader dr(queue, NumSamples * (NumPackets - 1) * NsPerSample, sample_spec); - CHECK(dr.is_valid()); - - PacketPtr packets[NumPackets]; - - for (seqnum_t n = 0; n < NumPackets; n++) { - packets[n] = new_packet(n); - LONGS_EQUAL(status::StatusOK, queue.write(packets[n])); - } - - for (seqnum_t n = 0; n < NumPackets; n++) { - PacketPtr p; - LONGS_EQUAL(status::StatusOK, dr.read(p)); - CHECK(p == packets[n]); - } - - for (seqnum_t n = 0; n < NumPackets; n++) { - PacketPtr wp = new_packet(n); - LONGS_EQUAL(status::StatusOK, queue.write(wp)); - - PacketPtr rp; - LONGS_EQUAL(status::StatusOK, dr.read(rp)); - CHECK(wp == rp); - } - - PacketPtr pp; - LONGS_EQUAL(status::StatusNoData, dr.read(pp)); - CHECK(!pp); -} - -} // namespace packet -} // namespace roc +// /* +// * Copyright (c) 2015 Roc Streaming authors +// * +// * This Source Code Form is subject to the terms of the Mozilla Public +// * License, v. 2.0. If a copy of the MPL was not distributed with this +// * file, You can obtain one at http://mozilla.org/MPL/2.0/. +// */ + +// #include + +// #include "roc_core/heap_arena.h" +// #include "roc_core/macro_helpers.h" +// #include "roc_packet/delayed_reader.h" +// #include "roc_packet/packet_factory.h" +// #include "roc_packet/queue.h" +// #include "roc_status/status_code.h" + +// namespace roc { +// namespace packet { + +// namespace { + +// enum { SampleRate = 1000, NumSamples = 100, NumPackets = 30, MaxBufSize = 100 }; + +// const core::nanoseconds_t NsPerSample = core::Second / SampleRate; + +// const audio::SampleSpec sample_spec(SampleRate, +// audio::Sample_RawFormat, +// audio::ChanLayout_Surround, +// audio::ChanOrder_Smpte, +// audio::ChanMask_Surround_Stereo); + +// core::HeapArena arena; +// PacketFactory packet_factory(arena, MaxBufSize); + +// PacketPtr new_packet(seqnum_t sn) { +// PacketPtr packet = packet_factory.new_packet(); +// CHECK(packet); + +// packet->add_flags(Packet::FlagRTP); +// packet->rtp()->seqnum = sn; +// packet->rtp()->stream_timestamp = stream_timestamp_t(sn * NumSamples); + +// return packet; +// } + +// class StatusReader : public IReader { +// public: +// explicit StatusReader(status::StatusCode code) +// : code_(code) { +// } + +// virtual ROC_ATTR_NODISCARD status::StatusCode read(PacketPtr&) { +// return code_; +// } + +// private: +// status::StatusCode code_; +// }; + +// } // namespace + +// TEST_GROUP(delayed_reader) {}; + +// TEST(delayed_reader, failed_to_read_packet) { +// const status::StatusCode codes[] = { +// status::StatusUnknown, +// status::StatusNoData, +// }; + +// for (size_t n = 0; n < ROC_ARRAY_SIZE(codes); ++n) { +// StatusReader reader(codes[n]); +// DelayedReader dr(reader, sample_spec); +// CHECK(dr.is_valid()); + +// PacketPtr pp; +// LONGS_EQUAL(codes[n], dr.read(pp)); +// CHECK(!pp); +// } +// } + +// TEST(delayed_reader, no_delay) { +// Queue queue; +// DelayedReader dr(queue, sample_spec); +// CHECK(dr.is_valid()); + +// PacketPtr pp; +// LONGS_EQUAL(status::StatusNoData, dr.read(pp)); +// CHECK(!pp); + +// for (seqnum_t n = 0; n < NumPackets; n++) { +// PacketPtr wp = new_packet(n); +// LONGS_EQUAL(status::StatusOK, queue.write(wp)); + +// PacketPtr rp; +// LONGS_EQUAL(status::StatusOK, dr.read(rp)); +// CHECK(wp == rp); +// } +// } + +// TEST(delayed_reader, delay) { +// Queue queue; +// DelayedReader dr(queue, sample_spec); +// CHECK(dr.is_valid()); + +// PacketPtr packets[NumPackets]; + +// for (seqnum_t n = 0; n < NumPackets; n++) { +// PacketPtr p; +// LONGS_EQUAL(status::StatusNoData, dr.read(p)); +// CHECK(!p); + +// packets[n] = new_packet(n); +// LONGS_EQUAL(status::StatusOK, queue.write(packets[n])); +// } + +// for (seqnum_t n = 0; n < NumPackets; n++) { +// PacketPtr p; +// LONGS_EQUAL(status::StatusOK, dr.read(p)); +// CHECK(p == packets[n]); +// } + +// PacketPtr pp; +// LONGS_EQUAL(status::StatusNoData, dr.read(pp)); +// CHECK(!pp); + +// for (seqnum_t n = 0; n < NumPackets; n++) { +// PacketPtr wp = new_packet(NumPackets + n); +// LONGS_EQUAL(status::StatusOK, queue.write(wp)); + +// PacketPtr rp; +// LONGS_EQUAL(status::StatusOK, dr.read(rp)); +// CHECK(wp == rp); +// } + +// LONGS_EQUAL(status::StatusNoData, dr.read(pp)); +// CHECK(!pp); +// } + +// TEST(delayed_reader, instant) { +// Queue queue; +// DelayedReader dr(queue, sample_spec); +// CHECK(dr.is_valid()); + +// PacketPtr packets[NumPackets]; + +// for (seqnum_t n = 0; n < NumPackets; n++) { +// packets[n] = new_packet(n); +// LONGS_EQUAL(status::StatusOK, queue.write(packets[n])); +// } + +// for (seqnum_t n = 0; n < NumPackets; n++) { +// PacketPtr p; +// LONGS_EQUAL(status::StatusOK, dr.read(p)); +// CHECK(p == packets[n]); +// } + +// PacketPtr pp; +// LONGS_EQUAL(status::StatusNoData, dr.read(pp)); +// CHECK(!pp); +// } + +// TEST(delayed_reader, trim) { +// Queue queue; +// DelayedReader dr(queue, sample_spec); +// CHECK(dr.is_valid()); + +// PacketPtr packets[NumPackets * 2]; + +// for (seqnum_t n = 0; n < NumPackets * 2; n++) { +// packets[n] = new_packet(n); +// LONGS_EQUAL(status::StatusOK, queue.write(packets[n])); +// } + +// for (seqnum_t n = NumPackets; n < NumPackets * 2; n++) { +// PacketPtr p; +// LONGS_EQUAL(status::StatusOK, dr.read(p)); +// CHECK(p == packets[n]); +// } + +// PacketPtr pp; +// LONGS_EQUAL(status::StatusNoData, dr.read(pp)); +// CHECK(!pp); +// } + +// TEST(delayed_reader, late_duplicates) { +// Queue queue; +// DelayedReader dr(queue, sample_spec); +// CHECK(dr.is_valid()); + +// PacketPtr packets[NumPackets]; + +// for (seqnum_t n = 0; n < NumPackets; n++) { +// packets[n] = new_packet(n); +// LONGS_EQUAL(status::StatusOK, queue.write(packets[n])); +// } + +// for (seqnum_t n = 0; n < NumPackets; n++) { +// PacketPtr p; +// LONGS_EQUAL(status::StatusOK, dr.read(p)); +// CHECK(p == packets[n]); +// } + +// for (seqnum_t n = 0; n < NumPackets; n++) { +// PacketPtr wp = new_packet(n); +// LONGS_EQUAL(status::StatusOK, queue.write(wp)); + +// PacketPtr rp; +// LONGS_EQUAL(status::StatusOK, dr.read(rp)); +// CHECK(wp == rp); +// } + +// PacketPtr pp; +// LONGS_EQUAL(status::StatusNoData, dr.read(pp)); +// CHECK(!pp); +// } + +// } // namespace packet +// } // namespace roc