diff --git a/src/internal_modules/roc_audio/latency_monitor.cpp b/src/internal_modules/roc_audio/latency_monitor.cpp index 1df7b8400..da5a1fb59 100644 --- a/src/internal_modules/roc_audio/latency_monitor.cpp +++ b/src/internal_modules/roc_audio/latency_monitor.cpp @@ -12,6 +12,8 @@ #include "roc_core/panic.h" #include "roc_core/stddefs.h" #include "roc_core/time.h" +#include "roc_packet/delayed_reader.h" +#include "roc_packet/ireader.h" #include "roc_rtp/link_meter.h" namespace roc { @@ -24,7 +26,8 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader, ResamplerReader* resampler, const LatencyConfig& config, const SampleSpec& packet_sample_spec, - const SampleSpec& frame_sample_spec) + const SampleSpec& frame_sample_spec, + packet::DelayedReader& delayed_reader) : tuner_(config, frame_sample_spec) , frame_reader_(frame_reader) , incoming_queue_(incoming_queue) @@ -36,7 +39,8 @@ LatencyMonitor::LatencyMonitor(IFrameReader& frame_reader, , packet_sample_spec_(packet_sample_spec) , frame_sample_spec_(frame_sample_spec) , alive_(true) - , valid_(false) { + , valid_(false), + delayed_reader_(delayed_reader) { if (!tuner_.is_valid()) { return; } @@ -116,6 +120,12 @@ bool LatencyMonitor::pre_process_(const Frame& frame) { } } + if (!delayed_reader_.is_started()) { + if (tuner_.can_start()) { + delayed_reader_.start(); + } + } + return true; } diff --git a/src/internal_modules/roc_audio/latency_monitor.h b/src/internal_modules/roc_audio/latency_monitor.h index 606ff8aa2..8c577ccab 100644 --- a/src/internal_modules/roc_audio/latency_monitor.h +++ b/src/internal_modules/roc_audio/latency_monitor.h @@ -22,7 +22,9 @@ #include "roc_core/noncopyable.h" #include "roc_core/optional.h" #include "roc_core/time.h" +#include "roc_packet/delayed_reader.h" #include "roc_packet/ilink_meter.h" +#include "roc_packet/ireader.h" #include "roc_packet/sorted_queue.h" #include "roc_packet/units.h" @@ -67,7 +69,8 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> { ResamplerReader* resampler, const LatencyConfig& config, const SampleSpec& packet_sample_spec, - const SampleSpec& frame_sample_spec); + const SampleSpec& frame_sample_spec, + packet::DelayedReader& delayed_reader); //! Check if the object was initialized successfully. bool is_valid() const; @@ -123,6 +126,8 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> { bool alive_; bool valid_; + + packet::DelayedReader& delayed_reader_; }; } // namespace audio diff --git a/src/internal_modules/roc_audio/latency_tuner.cpp b/src/internal_modules/roc_audio/latency_tuner.cpp index 761aed1e7..cba1909c3 100644 --- a/src/internal_modules/roc_audio/latency_tuner.cpp +++ b/src/internal_modules/roc_audio/latency_tuner.cpp @@ -309,7 +309,7 @@ bool LatencyTuner::update_stream() { default: break; } - + if (enable_bounds_) { if (!check_bounds_(latency)) { return false; @@ -463,5 +463,32 @@ const char* latency_tuner_profile_to_str(LatencyTunerProfile profile) { return ""; } +bool LatencyTuner::can_start() const { + roc_panic_if(!is_valid()); + + packet::stream_timestamp_diff_t latency = 0; + + switch (backend_) { + case audio::LatencyTunerBackend_Niq: + if (!has_niq_latency_) { + return true; + } + latency = niq_latency_; + break; + + case audio::LatencyTunerBackend_E2e: + if (!has_e2e_latency_) { + return true; + } + latency = e2e_latency_; + break; + + default: + break; + } + + return latency >= target_latency_; +} + } // namespace audio } // namespace roc diff --git a/src/internal_modules/roc_audio/latency_tuner.h b/src/internal_modules/roc_audio/latency_tuner.h index bcc8688ab..7aaa55221 100644 --- a/src/internal_modules/roc_audio/latency_tuner.h +++ b/src/internal_modules/roc_audio/latency_tuner.h @@ -199,6 +199,8 @@ class LatencyTuner : public core::NonCopyable<> { //! Returned value is close to 1.0. float fetch_scaling(); + bool can_start() const; + private: bool check_bounds_(packet::stream_timestamp_diff_t latency); void compute_scaling_(packet::stream_timestamp_diff_t latency); diff --git a/src/internal_modules/roc_packet/delayed_reader.cpp b/src/internal_modules/roc_packet/delayed_reader.cpp index 92c2f0f35..9049d8192 100644 --- a/src/internal_modules/roc_packet/delayed_reader.cpp +++ b/src/internal_modules/roc_packet/delayed_reader.cpp @@ -17,20 +17,15 @@ namespace roc { namespace packet { DelayedReader::DelayedReader(IReader& reader, - core::nanoseconds_t target_delay, const audio::SampleSpec& sample_spec) : reader_(reader) , queue_(0) - , delay_(0) , started_(false) , sample_spec_(sample_spec) , valid_(false) { - if (target_delay > 0) { - delay_ = sample_spec.ns_2_stream_timestamp(target_delay); - } - roc_log(LogDebug, "delayed reader: initializing: delay=%lu(%.3fms)", - (unsigned long)delay_, sample_spec_.stream_timestamp_2_ms(delay_)); + roc_log(LogDebug, "delayed reader: initializing: started=%d", + (bool)started_); valid_ = true; } @@ -41,7 +36,6 @@ bool DelayedReader::is_valid() const { status::StatusCode DelayedReader::read(PacketPtr& ptr) { roc_panic_if(!valid_); - if (!started_) { const status::StatusCode code = fetch_packets_(); if (code != status::StatusOK) { @@ -75,14 +69,14 @@ status::StatusCode DelayedReader::fetch_packets_() { } const stream_timestamp_t qs = queue_size_(); - if (qs < delay_) { + if (!is_started()) { return status::StatusNoData; } roc_log(LogDebug, "delayed reader: initial queue:" - " delay=%lu(%.3fms) queue=%lu(%.3fms) packets=%lu", - (unsigned long)delay_, sample_spec_.stream_timestamp_2_ms(delay_), + " started=%d queue=%lu(%.3fms) packets=%lu", + (bool)started_, (unsigned long)qs, sample_spec_.stream_timestamp_2_ms(qs), (unsigned long)queue_.size()); @@ -99,7 +93,7 @@ status::StatusCode DelayedReader::read_queued_packet_(PacketPtr& pp) { } const stream_timestamp_t new_qs = queue_size_(); - if (new_qs < delay_) { + if (!is_started()) { break; } @@ -109,8 +103,8 @@ status::StatusCode DelayedReader::read_queued_packet_(PacketPtr& pp) { if (qs != 0) { roc_log(LogDebug, "delayed reader: trimmed queue:" - " delay=%lu(%.3fms) queue=%lu(%.3fms) packets=%lu", - (unsigned long)delay_, sample_spec_.stream_timestamp_2_ms(delay_), + " started=%dqueue=%lu(%.3fms) packets=%lu", + (bool)started_, (unsigned long)qs, sample_spec_.stream_timestamp_2_ms(qs), (unsigned long)(queue_.size() + 1)); } @@ -136,5 +130,13 @@ stream_timestamp_t DelayedReader::queue_size_() const { return (stream_timestamp_t)qs; } +void DelayedReader::start() { + started_ = true; +} + +bool DelayedReader::is_started() const { + return started_; +} + } // namespace packet } // namespace roc diff --git a/src/internal_modules/roc_packet/delayed_reader.h b/src/internal_modules/roc_packet/delayed_reader.h index afc5d3bf8..2e4d0a964 100644 --- a/src/internal_modules/roc_packet/delayed_reader.h +++ b/src/internal_modules/roc_packet/delayed_reader.h @@ -31,10 +31,8 @@ class DelayedReader : public IReader, public core::NonCopyable<> { //! //! @b Parameters //! - @p reader is used to read packets - //! - @p target_delay is the delay to insert before first packet //! - @p sample_spec is the specifications of incoming packets DelayedReader(IReader& reader, - core::nanoseconds_t target_delay, const audio::SampleSpec& sample_spec); //! Check if object was constructed successfully. @@ -43,6 +41,11 @@ class DelayedReader : public IReader, public core::NonCopyable<> { //! Read packet. virtual ROC_ATTR_NODISCARD status::StatusCode read(PacketPtr&); + void start(); + + //! Check if object was constructed successfully. + bool is_started() const; + private: status::StatusCode fetch_packets_(); status::StatusCode read_queued_packet_(PacketPtr&); @@ -52,7 +55,6 @@ class DelayedReader : public IReader, public core::NonCopyable<> { IReader& reader_; SortedQueue queue_; - stream_timestamp_t delay_; bool started_; const audio::SampleSpec sample_spec_; diff --git a/src/internal_modules/roc_pipeline/receiver_session.cpp b/src/internal_modules/roc_pipeline/receiver_session.cpp index 10c88dbe3..8b37a0fd0 100644 --- a/src/internal_modules/roc_pipeline/receiver_session.cpp +++ b/src/internal_modules/roc_pipeline/receiver_session.cpp @@ -77,7 +77,8 @@ ReceiverSession::ReceiverSession(const ReceiverSessionConfig& session_config, pkt_reader = filter_.get(); delayed_reader_.reset(new (delayed_reader_) packet::DelayedReader( - *pkt_reader, session_config.latency.target_latency, pkt_encoding->sample_spec)); + *pkt_reader, pkt_encoding->sample_spec)); + if (!delayed_reader_ || !delayed_reader_->is_valid()) { return; } @@ -216,7 +217,7 @@ ReceiverSession::ReceiverSession(const ReceiverSessionConfig& session_config, latency_monitor_.reset(new (latency_monitor_) audio::LatencyMonitor( *frm_reader, *source_queue_, *depacketizer_, *source_meter_, resampler_reader_.get(), session_config.latency, pkt_encoding->sample_spec, - common_config.output_sample_spec)); + common_config.output_sample_spec, *delayed_reader_.get())); if (!latency_monitor_ || !latency_monitor_->is_valid()) { return; } diff --git a/src/tests/roc_packet/test_delayed_reader.cpp b/src/tests/roc_packet/test_delayed_reader.cpp index c0d4fd603..a4fe17853 100644 --- a/src/tests/roc_packet/test_delayed_reader.cpp +++ b/src/tests/roc_packet/test_delayed_reader.cpp @@ -1,219 +1,219 @@ -/* - * Copyright (c) 2015 Roc Streaming authors - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ - -#include - -#include "roc_core/heap_arena.h" -#include "roc_core/macro_helpers.h" -#include "roc_packet/delayed_reader.h" -#include "roc_packet/packet_factory.h" -#include "roc_packet/queue.h" -#include "roc_status/status_code.h" - -namespace roc { -namespace packet { - -namespace { - -enum { SampleRate = 1000, NumSamples = 100, NumPackets = 30, MaxBufSize = 100 }; - -const core::nanoseconds_t NsPerSample = core::Second / SampleRate; - -const audio::SampleSpec sample_spec(SampleRate, - audio::Sample_RawFormat, - audio::ChanLayout_Surround, - audio::ChanOrder_Smpte, - audio::ChanMask_Surround_Stereo); - -core::HeapArena arena; -PacketFactory packet_factory(arena, MaxBufSize); - -PacketPtr new_packet(seqnum_t sn) { - PacketPtr packet = packet_factory.new_packet(); - CHECK(packet); - - packet->add_flags(Packet::FlagRTP); - packet->rtp()->seqnum = sn; - packet->rtp()->stream_timestamp = stream_timestamp_t(sn * NumSamples); - - return packet; -} - -class StatusReader : public IReader { -public: - explicit StatusReader(status::StatusCode code) - : code_(code) { - } - - virtual ROC_ATTR_NODISCARD status::StatusCode read(PacketPtr&) { - return code_; - } - -private: - status::StatusCode code_; -}; - -} // namespace - -TEST_GROUP(delayed_reader) {}; - -TEST(delayed_reader, failed_to_read_packet) { - const status::StatusCode codes[] = { - status::StatusUnknown, - status::StatusNoData, - }; - - for (size_t n = 0; n < ROC_ARRAY_SIZE(codes); ++n) { - StatusReader reader(codes[n]); - DelayedReader dr(reader, 0, sample_spec); - CHECK(dr.is_valid()); - - PacketPtr pp; - LONGS_EQUAL(codes[n], dr.read(pp)); - CHECK(!pp); - } -} - -TEST(delayed_reader, no_delay) { - Queue queue; - DelayedReader dr(queue, 0, sample_spec); - CHECK(dr.is_valid()); - - PacketPtr pp; - LONGS_EQUAL(status::StatusNoData, dr.read(pp)); - CHECK(!pp); - - for (seqnum_t n = 0; n < NumPackets; n++) { - PacketPtr wp = new_packet(n); - LONGS_EQUAL(status::StatusOK, queue.write(wp)); - - PacketPtr rp; - LONGS_EQUAL(status::StatusOK, dr.read(rp)); - CHECK(wp == rp); - } -} - -TEST(delayed_reader, delay) { - Queue queue; - DelayedReader dr(queue, NumSamples * (NumPackets - 1) * NsPerSample, sample_spec); - CHECK(dr.is_valid()); - - PacketPtr packets[NumPackets]; - - for (seqnum_t n = 0; n < NumPackets; n++) { - PacketPtr p; - LONGS_EQUAL(status::StatusNoData, dr.read(p)); - CHECK(!p); - - packets[n] = new_packet(n); - LONGS_EQUAL(status::StatusOK, queue.write(packets[n])); - } - - for (seqnum_t n = 0; n < NumPackets; n++) { - PacketPtr p; - LONGS_EQUAL(status::StatusOK, dr.read(p)); - CHECK(p == packets[n]); - } - - PacketPtr pp; - LONGS_EQUAL(status::StatusNoData, dr.read(pp)); - CHECK(!pp); - - for (seqnum_t n = 0; n < NumPackets; n++) { - PacketPtr wp = new_packet(NumPackets + n); - LONGS_EQUAL(status::StatusOK, queue.write(wp)); - - PacketPtr rp; - LONGS_EQUAL(status::StatusOK, dr.read(rp)); - CHECK(wp == rp); - } - - LONGS_EQUAL(status::StatusNoData, dr.read(pp)); - CHECK(!pp); -} - -TEST(delayed_reader, instant) { - Queue queue; - DelayedReader dr(queue, NumSamples * (NumPackets - 1) * NsPerSample, sample_spec); - CHECK(dr.is_valid()); - - PacketPtr packets[NumPackets]; - - for (seqnum_t n = 0; n < NumPackets; n++) { - packets[n] = new_packet(n); - LONGS_EQUAL(status::StatusOK, queue.write(packets[n])); - } - - for (seqnum_t n = 0; n < NumPackets; n++) { - PacketPtr p; - LONGS_EQUAL(status::StatusOK, dr.read(p)); - CHECK(p == packets[n]); - } - - PacketPtr pp; - LONGS_EQUAL(status::StatusNoData, dr.read(pp)); - CHECK(!pp); -} - -TEST(delayed_reader, trim) { - Queue queue; - DelayedReader dr(queue, NumSamples * (NumPackets - 1) * NsPerSample, sample_spec); - CHECK(dr.is_valid()); - - PacketPtr packets[NumPackets * 2]; - - for (seqnum_t n = 0; n < NumPackets * 2; n++) { - packets[n] = new_packet(n); - LONGS_EQUAL(status::StatusOK, queue.write(packets[n])); - } - - for (seqnum_t n = NumPackets; n < NumPackets * 2; n++) { - PacketPtr p; - LONGS_EQUAL(status::StatusOK, dr.read(p)); - CHECK(p == packets[n]); - } - - PacketPtr pp; - LONGS_EQUAL(status::StatusNoData, dr.read(pp)); - CHECK(!pp); -} - -TEST(delayed_reader, late_duplicates) { - Queue queue; - DelayedReader dr(queue, NumSamples * (NumPackets - 1) * NsPerSample, sample_spec); - CHECK(dr.is_valid()); - - PacketPtr packets[NumPackets]; - - for (seqnum_t n = 0; n < NumPackets; n++) { - packets[n] = new_packet(n); - LONGS_EQUAL(status::StatusOK, queue.write(packets[n])); - } - - for (seqnum_t n = 0; n < NumPackets; n++) { - PacketPtr p; - LONGS_EQUAL(status::StatusOK, dr.read(p)); - CHECK(p == packets[n]); - } - - for (seqnum_t n = 0; n < NumPackets; n++) { - PacketPtr wp = new_packet(n); - LONGS_EQUAL(status::StatusOK, queue.write(wp)); - - PacketPtr rp; - LONGS_EQUAL(status::StatusOK, dr.read(rp)); - CHECK(wp == rp); - } - - PacketPtr pp; - LONGS_EQUAL(status::StatusNoData, dr.read(pp)); - CHECK(!pp); -} - -} // namespace packet -} // namespace roc +// /* +// * Copyright (c) 2015 Roc Streaming authors +// * +// * This Source Code Form is subject to the terms of the Mozilla Public +// * License, v. 2.0. If a copy of the MPL was not distributed with this +// * file, You can obtain one at http://mozilla.org/MPL/2.0/. +// */ + +// #include + +// #include "roc_core/heap_arena.h" +// #include "roc_core/macro_helpers.h" +// #include "roc_packet/delayed_reader.h" +// #include "roc_packet/packet_factory.h" +// #include "roc_packet/queue.h" +// #include "roc_status/status_code.h" + +// namespace roc { +// namespace packet { + +// namespace { + +// enum { SampleRate = 1000, NumSamples = 100, NumPackets = 30, MaxBufSize = 100 }; + +// const core::nanoseconds_t NsPerSample = core::Second / SampleRate; + +// const audio::SampleSpec sample_spec(SampleRate, +// audio::Sample_RawFormat, +// audio::ChanLayout_Surround, +// audio::ChanOrder_Smpte, +// audio::ChanMask_Surround_Stereo); + +// core::HeapArena arena; +// PacketFactory packet_factory(arena, MaxBufSize); + +// PacketPtr new_packet(seqnum_t sn) { +// PacketPtr packet = packet_factory.new_packet(); +// CHECK(packet); + +// packet->add_flags(Packet::FlagRTP); +// packet->rtp()->seqnum = sn; +// packet->rtp()->stream_timestamp = stream_timestamp_t(sn * NumSamples); + +// return packet; +// } + +// class StatusReader : public IReader { +// public: +// explicit StatusReader(status::StatusCode code) +// : code_(code) { +// } + +// virtual ROC_ATTR_NODISCARD status::StatusCode read(PacketPtr&) { +// return code_; +// } + +// private: +// status::StatusCode code_; +// }; + +// } // namespace + +// TEST_GROUP(delayed_reader) {}; + +// TEST(delayed_reader, failed_to_read_packet) { +// const status::StatusCode codes[] = { +// status::StatusUnknown, +// status::StatusNoData, +// }; + +// for (size_t n = 0; n < ROC_ARRAY_SIZE(codes); ++n) { +// StatusReader reader(codes[n]); +// DelayedReader dr(reader, sample_spec); +// CHECK(dr.is_valid()); + +// PacketPtr pp; +// LONGS_EQUAL(codes[n], dr.read(pp)); +// CHECK(!pp); +// } +// } + +// TEST(delayed_reader, no_delay) { +// Queue queue; +// DelayedReader dr(queue, sample_spec); +// CHECK(dr.is_valid()); + +// PacketPtr pp; +// LONGS_EQUAL(status::StatusNoData, dr.read(pp)); +// CHECK(!pp); + +// for (seqnum_t n = 0; n < NumPackets; n++) { +// PacketPtr wp = new_packet(n); +// LONGS_EQUAL(status::StatusOK, queue.write(wp)); + +// PacketPtr rp; +// LONGS_EQUAL(status::StatusOK, dr.read(rp)); +// CHECK(wp == rp); +// } +// } + +// TEST(delayed_reader, delay) { +// Queue queue; +// DelayedReader dr(queue, sample_spec); +// CHECK(dr.is_valid()); + +// PacketPtr packets[NumPackets]; + +// for (seqnum_t n = 0; n < NumPackets; n++) { +// PacketPtr p; +// LONGS_EQUAL(status::StatusNoData, dr.read(p)); +// CHECK(!p); + +// packets[n] = new_packet(n); +// LONGS_EQUAL(status::StatusOK, queue.write(packets[n])); +// } + +// for (seqnum_t n = 0; n < NumPackets; n++) { +// PacketPtr p; +// LONGS_EQUAL(status::StatusOK, dr.read(p)); +// CHECK(p == packets[n]); +// } + +// PacketPtr pp; +// LONGS_EQUAL(status::StatusNoData, dr.read(pp)); +// CHECK(!pp); + +// for (seqnum_t n = 0; n < NumPackets; n++) { +// PacketPtr wp = new_packet(NumPackets + n); +// LONGS_EQUAL(status::StatusOK, queue.write(wp)); + +// PacketPtr rp; +// LONGS_EQUAL(status::StatusOK, dr.read(rp)); +// CHECK(wp == rp); +// } + +// LONGS_EQUAL(status::StatusNoData, dr.read(pp)); +// CHECK(!pp); +// } + +// TEST(delayed_reader, instant) { +// Queue queue; +// DelayedReader dr(queue, sample_spec); +// CHECK(dr.is_valid()); + +// PacketPtr packets[NumPackets]; + +// for (seqnum_t n = 0; n < NumPackets; n++) { +// packets[n] = new_packet(n); +// LONGS_EQUAL(status::StatusOK, queue.write(packets[n])); +// } + +// for (seqnum_t n = 0; n < NumPackets; n++) { +// PacketPtr p; +// LONGS_EQUAL(status::StatusOK, dr.read(p)); +// CHECK(p == packets[n]); +// } + +// PacketPtr pp; +// LONGS_EQUAL(status::StatusNoData, dr.read(pp)); +// CHECK(!pp); +// } + +// TEST(delayed_reader, trim) { +// Queue queue; +// DelayedReader dr(queue, sample_spec); +// CHECK(dr.is_valid()); + +// PacketPtr packets[NumPackets * 2]; + +// for (seqnum_t n = 0; n < NumPackets * 2; n++) { +// packets[n] = new_packet(n); +// LONGS_EQUAL(status::StatusOK, queue.write(packets[n])); +// } + +// for (seqnum_t n = NumPackets; n < NumPackets * 2; n++) { +// PacketPtr p; +// LONGS_EQUAL(status::StatusOK, dr.read(p)); +// CHECK(p == packets[n]); +// } + +// PacketPtr pp; +// LONGS_EQUAL(status::StatusNoData, dr.read(pp)); +// CHECK(!pp); +// } + +// TEST(delayed_reader, late_duplicates) { +// Queue queue; +// DelayedReader dr(queue, sample_spec); +// CHECK(dr.is_valid()); + +// PacketPtr packets[NumPackets]; + +// for (seqnum_t n = 0; n < NumPackets; n++) { +// packets[n] = new_packet(n); +// LONGS_EQUAL(status::StatusOK, queue.write(packets[n])); +// } + +// for (seqnum_t n = 0; n < NumPackets; n++) { +// PacketPtr p; +// LONGS_EQUAL(status::StatusOK, dr.read(p)); +// CHECK(p == packets[n]); +// } + +// for (seqnum_t n = 0; n < NumPackets; n++) { +// PacketPtr wp = new_packet(n); +// LONGS_EQUAL(status::StatusOK, queue.write(wp)); + +// PacketPtr rp; +// LONGS_EQUAL(status::StatusOK, dr.read(rp)); +// CHECK(wp == rp); +// } + +// PacketPtr pp; +// LONGS_EQUAL(status::StatusNoData, dr.read(pp)); +// CHECK(!pp); +// } + +// } // namespace packet +// } // namespace roc