Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ uint64_t Thread::get_tid() {
#endif
}

bool Thread::enable_realtime() {
bool Thread::enable_realtime(const int sched_prio) {
sched_param param;
memset(&param, 0, sizeof(param));
param.sched_priority = sched_get_priority_max(SCHED_RR);
param.sched_priority = sched_prio;

roc_log(LogInfo, "thread: set realtime priority");
if (int err = pthread_setschedparam(pthread_self(), SCHED_RR, &param)) {
roc_log(LogDebug,
roc_log(LogError,
"thread: can't set realtime priority: pthread_setschedparam(): %s",
errno_to_str(err).c_str());
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class Thread : public NonCopyable<Thread> {
static uint64_t get_tid();

//! Raise current thread priority to realtime.
ROC_NODISCARD static bool enable_realtime();
ROC_NODISCARD static bool enable_realtime(const int sched_prio);

//! Check if thread was started and can be joined.
//! @returns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,13 @@ NetworkLoop::Tasks::ResolveEndpointAddress::get_address() const {

NetworkLoop::NetworkLoop(core::IPool& packet_pool,
core::IPool& buffer_pool,
const int realtime_prio,
core::IArena& arena)
: packet_factory_(packet_pool, buffer_pool)
, realtime_prio_(realtime_prio)
, arena_(arena)
, started_(false)
, thr_init_cond_(thr_init_mutex_)
, loop_initialized_(false)
, stop_sem_initialized_(false)
, task_sem_initialized_(false)
Expand Down Expand Up @@ -145,7 +148,12 @@ NetworkLoop::NetworkLoop(core::IPool& packet_pool,
return;
}

init_status_ = status::StatusOK;
{
core::Mutex::Lock lock(thr_init_mutex_);
while (init_status_ == status::NoStatus) {
thr_init_cond_.wait();
}
}
}

NetworkLoop::~NetworkLoop() {
Expand Down Expand Up @@ -280,6 +288,21 @@ void NetworkLoop::handle_resolved(ResolverRequest& req) {

void NetworkLoop::run() {
roc_log(LogDebug, "network loop: starting event loop");
if (realtime_prio_ > 0 && !enable_realtime(realtime_prio_)) {
core::Mutex::Lock lock(thr_init_mutex_);

roc_log(LogError,
"network loop: can't set realtime priority of network thread. May need "
"to be root");
init_status_ = status::StatusFailedRealtime;
thr_init_cond_.signal();
} else {
core::Mutex::Lock lock(thr_init_mutex_);

roc_log(LogDebug, "network loop: elevated realtime priority");
init_status_ = status::StatusOK;
thr_init_cond_.signal();
}

int err = uv_run(&loop_, UV_RUN_DEFAULT);
if (err != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "roc_address/socket_addr.h"
#include "roc_core/atomic_int.h"
#include "roc_core/attributes.h"
#include "roc_core/cond.h"
#include "roc_core/iarena.h"
#include "roc_core/ipool.h"
#include "roc_core/list.h"
Expand Down Expand Up @@ -54,6 +55,8 @@ class NetworkLoop : private ITerminateHandler,
//! Opaque port handle.
typedef struct PortHandle* PortHandle;

enum { DEFAULT_PRIORITY = 0 };

//! Subclasses for specific tasks.
class Tasks {
public:
Expand Down Expand Up @@ -189,7 +192,10 @@ class NetworkLoop : private ITerminateHandler,
//! Initialize.
//! @remarks
//! Start background thread if the object was successfully constructed.
NetworkLoop(core::IPool& packet_pool, core::IPool& buffer_pool, core::IArena& arena);
NetworkLoop(core::IPool& packet_pool,
core::IPool& buffer_pool,
const int realtime_prio,
core::IArena& arena);

//! Destroy. Stop all receivers and senders.
//! @remarks
Expand Down Expand Up @@ -248,10 +254,14 @@ class NetworkLoop : private ITerminateHandler,
void task_resolve_endpoint_address_(NetworkTask&);

packet::PacketFactory packet_factory_;
const uint8_t realtime_prio_;
core::IArena& arena_;

bool started_;

core::Mutex thr_init_mutex_;
core::Cond thr_init_cond_;

uv_loop_t loop_;
bool loop_initialized_;

Expand Down
2 changes: 1 addition & 1 deletion src/internal_modules/roc_node/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Context::Context(const ContextConfig& config, core::IArena& arena)
"frame_buffer_pool", arena_, sizeof(core::Buffer) + config.max_frame_size)
, processor_map_(arena_)
, encoding_map_(arena_)
, network_loop_(packet_pool_, packet_buffer_pool_, arena_)
, network_loop_(packet_pool_, packet_buffer_pool_, (int)config.realtime_prio, arena_)
, control_loop_(network_loop_, arena_)
, init_status_(status::NoStatus) {
roc_log(LogDebug, "context: initializing");
Expand Down
6 changes: 5 additions & 1 deletion src/internal_modules/roc_node/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@ struct ContextConfig {
//! Maximum size in bytes of an audio frame.
size_t max_frame_size;

//! Set real-time priority. Requires root priviligies.
int realtime_prio;

ContextConfig()
: max_packet_size(2048)
, max_frame_size(4096) {
, max_frame_size(4096)
, realtime_prio(0) {
}
};

Expand Down
3 changes: 3 additions & 0 deletions src/internal_modules/roc_node/receiver_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ status::StatusCode ReceiverDecoder::write_packet(address::Interface iface,
roc_panic_if(!bytes);
roc_panic_if(n_bytes == 0);

const core::nanoseconds_t capture_ts = core::timestamp(core::ClockUnix);

if (n_bytes > packet_factory_.packet_buffer_size()) {
roc_log(LogError,
"receiver decoder node:"
Expand All @@ -195,6 +197,7 @@ status::StatusCode ReceiverDecoder::write_packet(address::Interface iface,
}

packet->add_flags(packet::Packet::FlagUDP);
packet->udp()->receive_timestamp = capture_ts;
packet->set_buffer(buffer);

packet::IWriter* writer = endpoint_writers_[iface];
Expand Down
3 changes: 3 additions & 0 deletions src/internal_modules/roc_node/sender_encoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ SenderEncoder::write_packet(address::Interface iface, const void* bytes, size_t
roc_panic_if(!bytes);
roc_panic_if(n_bytes == 0);

const core::nanoseconds_t capture_ts = core::timestamp(core::ClockUnix);

if (n_bytes > packet_factory_.packet_buffer_size()) {
roc_log(LogError,
"sender encoder node:"
Expand All @@ -252,6 +254,7 @@ SenderEncoder::write_packet(address::Interface iface, const void* bytes, size_t
}

packet->add_flags(packet::Packet::FlagUDP);
packet->udp()->receive_timestamp = capture_ts;
packet->set_buffer(buffer);

packet::IWriter* writer = endpoint_writers_[iface];
Expand Down
7 changes: 6 additions & 1 deletion src/internal_modules/roc_pipeline/receiver_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,19 @@ status::StatusCode ReceiverEndpoint::handle_packet_(const packet::PacketPtr& pac
core::nanoseconds_t current_time) {
status::StatusCode code = status::NoStatus;

// Apparently the packet is not from network, set its TS manually.
if (packet->udp() && packet->udp()->receive_timestamp == 0 && current_time != 0) {
packet->udp()->receive_timestamp = current_time;
}

if ((code = parser_->parse(*packet, packet->buffer())) != status::StatusOK) {
roc_log(LogDebug,
"receiver endpoint: dropping bad packet: can't parse: status=%s",
status::code_to_str(code));
return status::StatusOK;
}

code = session_group_.route_packet(packet, current_time);
code = session_group_.route_packet(packet);

if (code == status::StatusNoRoute) {
roc_log(LogDebug, "receiver endpoint: dropping bad packet: can't route");
Expand Down
12 changes: 5 additions & 7 deletions src/internal_modules/roc_pipeline/receiver_session_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ ReceiverSessionGroup::create_control_pipeline(ReceiverEndpoint* control_endpoint
// and later when we call generate_packets() or process_packets().
rtcp_communicator_.reset(new (rtcp_communicator_) rtcp::Communicator(
source_config_.common.rtcp, *this, *control_endpoint->outbound_writer(),
*control_endpoint->outbound_composer(), packet_factory_, arena_));
*control_endpoint->outbound_composer(), packet_factory_, arena_, dumper_));

const status::StatusCode code = rtcp_communicator_->init_status();
if (code != status::StatusOK) {
Expand Down Expand Up @@ -140,12 +140,11 @@ void ReceiverSessionGroup::reclock_sessions(core::nanoseconds_t playback_time) {
}
}

status::StatusCode ReceiverSessionGroup::route_packet(const packet::PacketPtr& packet,
core::nanoseconds_t current_time) {
status::StatusCode ReceiverSessionGroup::route_packet(const packet::PacketPtr& packet) {
roc_panic_if(init_status_ != status::StatusOK);

if (packet->has_flags(packet::Packet::FlagControl)) {
return route_control_packet_(packet, current_time);
return route_control_packet_(packet);
}

return route_transport_packet_(packet);
Expand Down Expand Up @@ -344,15 +343,14 @@ ReceiverSessionGroup::route_transport_packet_(const packet::PacketPtr& packet) {
}

status::StatusCode
ReceiverSessionGroup::route_control_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time) {
ReceiverSessionGroup::route_control_packet_(const packet::PacketPtr& packet) {
if (!rtcp_communicator_) {
roc_panic("session group: rtcp communicator is null");
}

// This will invoke IParticipant methods implemented by us,
// in particular notify_recv_stream() and maybe halt_recv_stream().
return rtcp_communicator_->process_packet(packet, current_time);
return rtcp_communicator_->process_packet(packet);
}

bool ReceiverSessionGroup::can_create_session_(const packet::PacketPtr& packet) {
Expand Down
6 changes: 2 additions & 4 deletions src/internal_modules/roc_pipeline/receiver_session_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip
void reclock_sessions(core::nanoseconds_t playback_time);

//! Route packet to session.
ROC_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet,
core::nanoseconds_t current_time);
ROC_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet);

//! Get number of sessions in group.
size_t num_sessions() const;
Expand Down Expand Up @@ -129,8 +128,7 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip
virtual void halt_recv_stream(packet::stream_source_t send_source_id);

status::StatusCode route_transport_packet_(const packet::PacketPtr& packet);
status::StatusCode route_control_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time);
status::StatusCode route_control_packet_(const packet::PacketPtr& packet);

bool can_create_session_(const packet::PacketPtr& packet);

Expand Down
4 changes: 2 additions & 2 deletions src/internal_modules/roc_pipeline/receiver_slot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ status::StatusCode ReceiverSlot::refresh(core::nanoseconds_t current_time,
status::StatusCode code = status::NoStatus;

if (source_endpoint_) {
if ((code = source_endpoint_->pull_packets(current_time)) != status::StatusOK) {
if ((code = source_endpoint_->pull_packets(0)) != status::StatusOK) {
return code;
}
}

if (repair_endpoint_) {
if ((code = repair_endpoint_->pull_packets(current_time)) != status::StatusOK) {
if ((code = repair_endpoint_->pull_packets(0)) != status::StatusOK) {
return code;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since current_time is used only as a fallback when RTS isn't set, we can safely pass it to all calls I guess? Because why not to guarantee RTS for source and repair packets as well.

Expand Down
8 changes: 7 additions & 1 deletion src/internal_modules/roc_pipeline/sender_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,19 @@ status::StatusCode SenderEndpoint::handle_packet_(const packet::PacketPtr& packe
core::nanoseconds_t current_time) {
status::StatusCode code = status::NoStatus;

// Apparently the packet is not from network, set it's TS manually.
if (packet->udp() && packet->udp()->receive_timestamp == 0 && current_time != 0) {
packet->udp()->receive_timestamp = current_time;
}

if ((code = parser_->parse(*packet, packet->buffer())) != status::StatusOK) {
roc_log(LogDebug, "sender endpoint: dropping bad packet: can't parse: status=%s",
status::code_to_str(code));

return status::StatusOK;
}

code = sender_session_.route_packet(packet, current_time);
code = sender_session_.route_packet(packet);

if (code == status::StatusNoRoute) {
roc_log(LogDebug, "sender endpoint: dropping bad packet: can't route");
Expand Down
13 changes: 5 additions & 8 deletions src/internal_modules/roc_pipeline/sender_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ SenderSession::create_control_pipeline(SenderEndpoint* control_endpoint) {

rtcp_communicator_.reset(new (rtcp_communicator_) rtcp::Communicator(
sink_config_.rtcp, *this, control_endpoint->outbound_writer(),
control_endpoint->outbound_composer(), packet_factory_, arena_));
control_endpoint->outbound_composer(), packet_factory_, arena_, dumper_));

const status::StatusCode code = rtcp_communicator_->init_status();
if (code != status::StatusOK) {
Expand Down Expand Up @@ -281,8 +281,7 @@ status::StatusCode SenderSession::refresh(core::nanoseconds_t current_time,
return status::StatusOK;
}

status::StatusCode SenderSession::route_packet(const packet::PacketPtr& packet,
core::nanoseconds_t current_time) {
status::StatusCode SenderSession::route_packet(const packet::PacketPtr& packet) {
roc_panic_if(init_status_ != status::StatusOK);

if (fail_status_ != status::NoStatus) {
Expand All @@ -294,7 +293,7 @@ status::StatusCode SenderSession::route_packet(const packet::PacketPtr& packet,
roc_panic("sender session: unexpected non-control packet");
}

return route_control_packet_(packet, current_time);
return route_control_packet_(packet);
}

status::StatusCode SenderSession::write(audio::Frame& frame) {
Expand Down Expand Up @@ -439,15 +438,13 @@ void SenderSession::start_feedback_monitor_() {
feedback_monitor_->start();
}

status::StatusCode
SenderSession::route_control_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time) {
status::StatusCode SenderSession::route_control_packet_(const packet::PacketPtr& packet) {
if (!rtcp_communicator_) {
roc_panic("sender session: rtcp communicator is null");
}

// This will invoke IParticipant methods implemented by us.
return rtcp_communicator_->process_packet(packet, current_time);
return rtcp_communicator_->process_packet(packet);
}

} // namespace pipeline
Expand Down
6 changes: 2 additions & 4 deletions src/internal_modules/roc_pipeline/sender_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ class SenderSession : public core::NonCopyable<>,
//! This way feedback packets from receiver reach sender pipeline.
//! Packets are stored inside internal pipeline queues, and then fetched
//! when frame are passed from frame_writer().
ROC_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet,
core::nanoseconds_t current_time);
ROC_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet);

//! Get slot metrics.
//! @remarks
Expand Down Expand Up @@ -133,8 +132,7 @@ class SenderSession : public core::NonCopyable<>,

void start_feedback_monitor_();

status::StatusCode route_control_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time);
status::StatusCode route_control_packet_(const packet::PacketPtr& packet);

core::IArena& arena_;

Expand Down
4 changes: 2 additions & 2 deletions src/internal_modules/roc_pipeline/sender_slot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,13 @@ status::StatusCode SenderSlot::refresh(core::nanoseconds_t current_time,
status::StatusCode code = status::NoStatus;

if (source_endpoint_) {
if ((code = source_endpoint_->pull_packets(current_time)) != status::StatusOK) {
if ((code = source_endpoint_->pull_packets(0)) != status::StatusOK) {
return code;
}
}

if (repair_endpoint_) {
if ((code = repair_endpoint_->pull_packets(current_time)) != status::StatusOK) {
if ((code = repair_endpoint_->pull_packets(0)) != status::StatusOK) {
return code;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as for receiver slot.

Copy link
Member Author

@baranovmv baranovmv Feb 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I support your thought, but it will make pipeline tests dependent on real-time speed.

So far, LatencyMonitor calls core::timestamp(core::ClockUnix) when it computes latency_metrics_.niq_stalling. If we will set receive timestamp on all passing packets (which is your proposition here), we could not tell the difference between test and real use scenario. In order to pass these tests, I've set base_cts to current timestamp:

diff --git a/src/tests/roc_pipeline/test_loopback_sink_2_source.cpp b/src/tests/roc_pipeline/test_loopback_sink_2_source.cpp
index 4c47930e..2d7e188c 100644
--- a/src/tests/roc_pipeline/test_loopback_sink_2_source.cpp
+++ b/src/tests/roc_pipeline/test_loopback_sink_2_source.cpp
@@ -569,7 +569,7 @@ void send_receive(int flags,
     core::nanoseconds_t virtual_e2e_latency = 0;
 
     if (flags & FlagCTS) {
-        send_base_cts = 1000000000000000;
+        send_base_cts = core::timestamp(core::ClockUnix);
         virtual_e2e_latency = core::Millisecond * 100;
     }

In receiver_source test latency_lower_bound I had to do the same.

Please, take a look onto 6bd629e

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I propose to hold this change, until (if) we decide to introduce mock for core::timestamp(...) in pipeline tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. That's not good but I agree that it's outside of scope of this PR. I'll try to take a look at it later, will add it to my todo. Let's follow your proposal then.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reverted these changes for the time being

Expand Down
Loading
Loading