Skip to content

Commit 73448d9

Browse files
committed
Preparation for E2E latency tuning
In order to be able to tune receiver's latency relying on timestamp mapping that we get from RTCP feedback, and UDP::Receive_timestamp, adding these features: * gh-674: Use receive timestamp (RTS) as report time when processing RTCP report; * RTT dumping for debugging (csvplotter ts_offset branch); * SCHED_RR for network io thread (run with root privs).
1 parent d90cf31 commit 73448d9

File tree

11 files changed

+178
-142
lines changed

11 files changed

+178
-142
lines changed

src/internal_modules/roc_core/target_posix/roc_core/thread.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ bool Thread::enable_realtime() {
5757
errno_to_str(err).c_str());
5858
return false;
5959
}
60+
roc_log(LogDebug, "thread: set realtime priority");
6061

6162
return true;
6263
}

src/internal_modules/roc_netio/target_libuv/roc_netio/network_loop.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ NetworkLoop::NetworkLoop(core::IPool& packet_pool,
140140
task_sem_.data = this;
141141
task_sem_initialized_ = true;
142142

143+
enable_realtime();
143144
if (!(started_ = Thread::start())) {
144145
init_status_ = status::StatusErrThread;
145146
return;

src/internal_modules/roc_pipeline/receiver_session_group.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,9 @@ ReceiverSessionGroup::create_control_pipeline(ReceiverEndpoint* control_endpoint
6969
// We pass this as implementation of rtcp::IParticipant.
7070
// rtcp::Communicator will call our methods right now (in constructor)
7171
// and later when we call generate_packets() or process_packets().
72-
rtcp_communicator_.reset(new (rtcp_communicator_) rtcp::Communicator(
73-
source_config_.common.rtcp, *this, *control_endpoint->outbound_writer(),
74-
*control_endpoint->outbound_composer(), packet_factory_, arena_));
72+
rtcp_communicator_.reset(new(rtcp_communicator_) rtcp::Communicator(
73+
source_config_.common.rtcp, *this, *control_endpoint->outbound_writer(),
74+
*control_endpoint->outbound_composer(), packet_factory_, arena_, dumper_));
7575

7676
const status::StatusCode code = rtcp_communicator_->init_status();
7777
if (code != status::StatusOK) {

src/internal_modules/roc_pipeline/sender_session.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -233,9 +233,9 @@ SenderSession::create_control_pipeline(SenderEndpoint* control_endpoint) {
233233

234234
rtcp_outbound_addr_ = control_endpoint->outbound_address();
235235

236-
rtcp_communicator_.reset(new (rtcp_communicator_) rtcp::Communicator(
237-
sink_config_.rtcp, *this, control_endpoint->outbound_writer(),
238-
control_endpoint->outbound_composer(), packet_factory_, arena_));
236+
rtcp_communicator_.reset(new(rtcp_communicator_) rtcp::Communicator(
237+
sink_config_.rtcp, *this, control_endpoint->outbound_writer(),
238+
control_endpoint->outbound_composer(), packet_factory_, arena_, dumper_));
239239

240240
const status::StatusCode code = rtcp_communicator_->init_status();
241241
if (code != status::StatusOK) {

src/internal_modules/roc_rtcp/communicator.cpp

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include "roc_core/log.h"
1111
#include "roc_core/panic.h"
1212
#include "roc_core/time.h"
13+
#include "roc_dbgio/csv_dumper.h"
1314
#include "roc_packet/ntp.h"
1415
#include "roc_packet/units.h"
1516
#include "roc_rtcp/headers.h"
@@ -25,17 +26,14 @@ const core::nanoseconds_t LogInterval = core::Second * 30;
2526

2627
} // namespace
2728

28-
Communicator::Communicator(const Config& config,
29-
IParticipant& participant,
30-
packet::IWriter& packet_writer,
31-
packet::IComposer& packet_composer,
32-
packet::PacketFactory& packet_factory,
33-
core::IArena& arena)
29+
Communicator::Communicator(const Config &config, IParticipant &participant, packet::IWriter &packet_writer,
30+
packet::IComposer &packet_composer, packet::PacketFactory &packet_factory,
31+
core::IArena &arena, dbgio::CsvDumper* dumper)
3432
: packet_factory_(packet_factory)
3533
, packet_writer_(packet_writer)
3634
, packet_composer_(packet_composer)
3735
, config_(config)
38-
, reporter_(config, participant, arena)
36+
, reporter_(config, participant, arena, dumper)
3937
, next_deadline_(0)
4038
, dest_addr_count_(0)
4139
, dest_addr_index_(0)
@@ -50,7 +48,8 @@ Communicator::Communicator(const Config& config,
5048
, processed_packet_count_(0)
5149
, generated_packet_count_(0)
5250
, log_limiter_(LogInterval)
53-
, init_status_(status::NoStatus) {
51+
, init_status_(status::NoStatus)
52+
, dumper_(dumper) {
5453
if ((init_status_ = reporter_.init_status()) != status::StatusOK) {
5554
return;
5655
}
@@ -76,7 +75,6 @@ status::StatusCode Communicator::process_packet(const packet::PacketPtr& packet,
7675
roc_panic_if_msg(!packet, "rtcp communicator: null packet");
7776
roc_panic_if_msg(!packet->udp(), "rtcp communicator: non-udp packet");
7877
roc_panic_if_msg(!packet->rtcp(), "rtcp communicator: non-rtcp packet");
79-
roc_panic_if_msg(current_time <= 0, "rtcp communicator: invalid timestamp");
8078

8179
roc_log(LogTrace, "rtcp communicator: processing incoming packet");
8280

@@ -90,7 +88,7 @@ status::StatusCode Communicator::process_packet(const packet::PacketPtr& packet,
9088
}
9189

9290
status::StatusCode status =
93-
reporter_.begin_processing(packet->udp()->src_addr, current_time);
91+
reporter_.begin_processing(packet->udp()->src_addr, packet->udp()->receive_timestamp);
9492
roc_log(LogTrace, "rtcp communicator: begin_processing(): status=%s",
9593
status::code_to_str(status));
9694

src/internal_modules/roc_rtcp/communicator.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "roc_core/rate_limiter.h"
1818
#include "roc_core/stddefs.h"
1919
#include "roc_core/time.h"
20+
#include "roc_dbgio/csv_dumper.h"
2021
#include "roc_packet/icomposer.h"
2122
#include "roc_packet/iwriter.h"
2223
#include "roc_packet/packet.h"
@@ -56,12 +57,9 @@ namespace rtcp {
5657
class Communicator : public core::NonCopyable<> {
5758
public:
5859
//! Initialize.
59-
Communicator(const Config& config,
60-
IParticipant& participant,
61-
packet::IWriter& packet_writer,
62-
packet::IComposer& packet_composer,
63-
packet::PacketFactory& packet_factory,
64-
core::IArena& arena);
60+
Communicator(const Config &config, IParticipant &participant, packet::IWriter &packet_writer,
61+
packet::IComposer &packet_composer, packet::PacketFactory &packet_factory,
62+
core::IArena &arena, dbgio::CsvDumper* dumper);
6563

6664
//! Check if the object was successfully constructed.
6765
status::StatusCode init_status() const;
@@ -169,6 +167,8 @@ class Communicator : public core::NonCopyable<> {
169167
core::RateLimiter log_limiter_;
170168

171169
status::StatusCode init_status_;
170+
171+
dbgio::CsvDumper* dumper_;
172172
};
173173

174174
} // namespace rtcp

src/internal_modules/roc_rtcp/reporter.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
namespace roc {
2222
namespace rtcp {
2323

24-
Reporter::Reporter(const Config& config, IParticipant& participant, core::IArena& arena)
24+
Reporter::Reporter(const Config &config, IParticipant &participant, core::IArena &arena, dbgio::CsvDumper *dumper)
2525
: arena_(arena)
2626
, participant_(participant)
2727
, local_source_id_(0)
@@ -43,7 +43,8 @@ Reporter::Reporter(const Config& config, IParticipant& participant, core::IArena
4343
, report_time_(0)
4444
, config_(config)
4545
, max_delay_(packet::ntp_2_nanoseconds(header::MaxDelay))
46-
, init_status_(status::NoStatus) {
46+
, init_status_(status::NoStatus)
47+
, dumper_(dumper) {
4748
memset(local_cname_, 0, sizeof(local_cname_));
4849

4950
const ParticipantInfo part_info = participant_.participant_info();
@@ -1400,7 +1401,7 @@ Reporter::find_stream_(packet::stream_source_t source_id, CreateMode mode) {
14001401
(unsigned long)source_id);
14011402

14021403
stream =
1403-
new (stream_pool_) Stream(stream_pool_, source_id, report_time_, config_.rtt);
1404+
new(stream_pool_) Stream(arena_, stream_pool_, source_id, report_time_, config_.rtt, dumper_);
14041405
if (!stream) {
14051406
report_error_ = status::StatusNoMem;
14061407
return NULL;

src/internal_modules/roc_rtcp/reporter.h

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "roc_rtcp/rtt_estimator.h"
3535
#include "roc_rtcp/sdes.h"
3636
#include "roc_status/status_code.h"
37+
#include "roc_dbgio/csv_dumper.h"
3738

3839
namespace roc {
3940
namespace rtcp {
@@ -90,7 +91,7 @@ namespace rtcp {
9091
class Reporter : public core::NonCopyable<> {
9192
public:
9293
//! Initialize.
93-
Reporter(const Config& config, IParticipant& participant, core::IArena& arena);
94+
Reporter(const Config &config, IParticipant &participant, core::IArena &arena, dbgio::CsvDumper *dumper);
9495
~Reporter();
9596

9697
//! Check if the object was successfully constructed.
@@ -261,16 +262,15 @@ class Reporter : public core::NonCopyable<> {
261262
struct Stream : core::RefCounted<Stream, core::PoolAllocation>,
262263
core::HashmapNode<>,
263264
core::ListNode<> {
264-
Stream(core::IPool& pool,
265-
packet::stream_source_t source_id,
265+
Stream(core::IArena &arena, core::IPool &pool, packet::stream_source_t source_id,
266266
core::nanoseconds_t report_time,
267-
const RttConfig& rtt_config)
267+
const RttConfig &rtt_config, dbgio::CsvDumper *dumper)
268268
: core::RefCounted<Stream, core::PoolAllocation>(pool)
269269
, source_id(source_id)
270270
, has_remote_recv_report(false)
271-
, remote_recv_rtt(rtt_config)
271+
, remote_recv_rtt(arena, rtt_config, dumper)
272272
, has_remote_send_report(false)
273-
, remote_send_rtt(rtt_config)
273+
, remote_send_rtt(arena, rtt_config, dumper)
274274
, local_recv_report(NULL)
275275
, last_update(report_time)
276276
, last_local_sr(0)
@@ -483,6 +483,8 @@ class Reporter : public core::NonCopyable<> {
483483
const core::nanoseconds_t max_delay_;
484484

485485
status::StatusCode init_status_;
486+
487+
dbgio::CsvDumper* dumper_;
486488
};
487489

488490
} // namespace rtcp

src/internal_modules/roc_rtcp/rtt_estimator.cpp

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,15 @@
1212
namespace roc {
1313
namespace rtcp {
1414

15-
RttEstimator::RttEstimator(const RttConfig& config)
15+
RttEstimator::RttEstimator(core::IArena &arena, const RttConfig &config, dbgio::CsvDumper *dumper)
1616
: config_(config)
1717
, metrics_()
1818
, has_metrics_(false)
1919
, first_report_ts_(0)
20-
, last_report_ts_(0) {
20+
, last_report_ts_(0)
21+
, dumper_(dumper)
22+
, rtt_stats_(arena, 15, 0.5)
23+
, clock_offset_stats_(arena, 100, 0.5) {
2124
}
2225

2326
bool RttEstimator::has_metrics() const {
@@ -85,10 +88,31 @@ void RttEstimator::update(core::nanoseconds_t local_report_ts,
8588
}
8689
last_report_ts_ = local_report_ts;
8790

88-
metrics_.clock_offset = clock_offset;
89-
metrics_.rtt = rtt;
91+
rtt_stats_.add(rtt);
92+
clock_offset_stats_.add(clock_offset);
93+
metrics_.rtt = rtt_stats_.mov_quantile();
94+
metrics_.clock_offset = clock_offset_stats_.mov_quantile();
9095

9196
has_metrics_ = true;
97+
98+
if (dumper_) {
99+
dump_(local_report_ts, remote_report_ts, remote_reply_ts, local_reply_ts);
100+
}
101+
}
102+
103+
void RttEstimator::dump_(core::nanoseconds_t local_report_ts, core::nanoseconds_t remote_report_ts,
104+
core::nanoseconds_t remote_reply_ts, core::nanoseconds_t local_reply_ts) {
105+
dbgio::CsvEntry e;
106+
e.type = 'r';
107+
e.n_fields = 7;
108+
e.fields[0] = core::timestamp(core::ClockUnix);
109+
e.fields[1] = metrics_.rtt;
110+
e.fields[2] = metrics_.clock_offset;
111+
e.fields[3] = local_report_ts;
112+
e.fields[4] = remote_report_ts;
113+
e.fields[5] = remote_reply_ts;
114+
e.fields[6] = local_reply_ts;
115+
dumper_->write(e);
92116
}
93117

94118
} // namespace rtcp

src/internal_modules/roc_rtcp/rtt_estimator.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
#include "roc_core/time.h"
1616
#include "roc_packet/units.h"
17+
#include "roc_dbgio/csv_dumper.h"
18+
#include "roc_stat/mov_quantile.h"
1719

1820
namespace roc {
1921
namespace rtcp {
@@ -51,7 +53,7 @@ struct RttMetrics {
5153
class RttEstimator {
5254
public:
5355
//! Initialize.
54-
RttEstimator(const RttConfig& config);
56+
RttEstimator(core::IArena &arena, const RttConfig &config, dbgio::CsvDumper *dumper);
5557

5658
//! Check whether metrics are already available.
5759
bool has_metrics() const;
@@ -71,12 +73,19 @@ class RttEstimator {
7173
core::nanoseconds_t local_reply_ts);
7274

7375
private:
76+
void dump_(core::nanoseconds_t local_report_ts, core::nanoseconds_t remote_report_ts,
77+
core::nanoseconds_t remote_reply_ts, core::nanoseconds_t local_reply_ts);
78+
7479
const RttConfig config_;
7580
RttMetrics metrics_;
7681
bool has_metrics_;
7782

7883
core::nanoseconds_t first_report_ts_;
7984
core::nanoseconds_t last_report_ts_;
85+
86+
dbgio::CsvDumper *dumper_;
87+
stat::MovQuantile<core::nanoseconds_t> rtt_stats_;
88+
stat::MovQuantile<core::nanoseconds_t> clock_offset_stats_;
8089
};
8190

8291
} // namespace rtcp

0 commit comments

Comments
 (0)