Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 59 additions & 2 deletions src/tests/public_api/test_helpers/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class Proxy : public core::Thread, private packet::IWriter {
public:
Proxy(const roc_endpoint* receiver_source_endp,
const roc_endpoint* receiver_repair_endp,
const roc_endpoint* receiver_control_endp,
size_t n_source_packets,
size_t n_repair_packets,
unsigned flags)
Expand All @@ -45,6 +46,7 @@ class Proxy : public core::Thread, private packet::IWriter {
, n_repair_packets_(n_repair_packets)
, flags_(flags)
, pos_(0) {
input_control_endp_ = NULL;
LONGS_EQUAL(status::StatusOK, net_loop_.init_status());

roc_protocol source_proto;
Expand All @@ -53,6 +55,11 @@ class Proxy : public core::Thread, private packet::IWriter {
roc_protocol repair_proto;
CHECK(roc_endpoint_get_protocol(receiver_repair_endp, &repair_proto) == 0);

roc_protocol control_proto;
if (receiver_control_endp) {
CHECK(roc_endpoint_get_protocol(receiver_control_endp, &control_proto) == 0);
}

int source_port = 0;
CHECK(roc_endpoint_get_port(receiver_source_endp, &source_port) == 0);

Expand All @@ -71,6 +78,16 @@ class Proxy : public core::Thread, private packet::IWriter {
"127.0.0.1", 0));
CHECK(recv_repair_config_.bind_address.set_host_port(address::Family_IPv4,
"127.0.0.1", 0));
if (receiver_control_endp) {
int control_port = 0;
CHECK(roc_endpoint_get_port(receiver_control_endp, &control_port) == 0);

CHECK(receiver_control_endp_.set_host_port(address::Family_IPv4, "127.0.0.1",
control_port));

CHECK(recv_control_config_.bind_address.set_host_port(address::Family_IPv4,
"127.0.0.1", 0));
}
Comment on lines +81 to +90
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The part of ctor before netio::NetworkLoop::PortHandle send_port = NULL became quite hard to read, I suggest to reorganize it a bit and group configuration by type, like this:

...

// receiver's source endpoint
roc_protocol source_proto;
int source_port = 0;
CHECK(roc_endpoint_get_protocol(receiver_source_endp, &source_proto) == 0);
CHECK(roc_endpoint_get_port(receiver_source_endp, &source_port) == 0);
CHECK(receiver_source_endp_.set_host_port(address::Family_IPv4, "127.0.0.1",
                                          source_port));

// receiver's repair endpoint
roc_protocol repair_proto;
int repair_port = 0;
CHECK(roc_endpoint_get_protocol(receiver_repair_endp, &repair_proto) == 0);
CHECK(roc_endpoint_get_port(receiver_repair_endp, &repair_port) == 0);
CHECK(receiver_repair_endp_.set_host_port(address::Family_IPv4, "127.0.0.1",
                                          repair_port));

// receiver's control endpoint
roc_protocol control_proto;
int control_port = 0;
if (receiver_control_endp) {
    CHECK(roc_endpoint_get_protocol(receiver_control_endp, &control_proto) == 0);
    CHECK(roc_endpoint_get_port(receiver_control_endp, &control_port) == 0);
    CHECK(receiver_control_endp_.set_host_port(address::Family_IPv4, "127.0.0.1",
                                               control_port));
}

// proxy's receiving addresses
CHECK(recv_source_config_.bind_address.set_host_port(address::Family_IPv4,
                                                     "127.0.0.1", 0));
CHECK(recv_repair_config_.bind_address.set_host_port(address::Family_IPv4,
                                                     "127.0.0.1", 0));
if (receiver_control_endp) {
    CHECK(recv_control_config_.bind_address.set_host_port(address::Family_IPv4,
                                                          "127.0.0.1", 0));
}

// proxy's sending address
CHECK(send_config_.bind_address.set_host_port(address::Family_IPv4, "127.0.0.1",
                                              0));

...

(it's same code, but reordered and with added comments)


netio::NetworkLoop::PortHandle send_port = NULL;

Expand Down Expand Up @@ -104,6 +121,15 @@ class Proxy : public core::Thread, private packet::IWriter {
CHECK(net_loop_.schedule_and_wait(recv_task));
}

if (receiver_control_endp) {
netio::NetworkLoop::Tasks::AddUdpPort add_task(recv_control_config_);
CHECK(net_loop_.schedule_and_wait(add_task));

netio::NetworkLoop::Tasks::StartUdpRecv recv_task(add_task.get_handle(),
*this);
CHECK(net_loop_.schedule_and_wait(recv_task));
}

CHECK(roc_endpoint_allocate(&input_source_endp_) == 0);
CHECK(roc_endpoint_set_protocol(input_source_endp_, source_proto) == 0);
CHECK(roc_endpoint_set_host(input_source_endp_, "127.0.0.1") == 0);
Expand All @@ -117,11 +143,22 @@ class Proxy : public core::Thread, private packet::IWriter {
CHECK(roc_endpoint_set_port(input_repair_endp_,
recv_repair_config_.bind_address.port())
== 0);
if (receiver_control_endp) {
CHECK(roc_endpoint_allocate(&input_control_endp_) == 0);
CHECK(roc_endpoint_set_protocol(input_control_endp_, control_proto) == 0);
CHECK(roc_endpoint_set_host(input_control_endp_, "127.0.0.1") == 0);
CHECK(roc_endpoint_set_port(input_control_endp_,
recv_control_config_.bind_address.port())
== 0);
}
}

~Proxy() {
CHECK(roc_endpoint_deallocate(input_source_endp_) == 0);
CHECK(roc_endpoint_deallocate(input_repair_endp_) == 0);
if (input_control_endp_ != NULL) {
CHECK(roc_endpoint_deallocate(input_control_endp_) == 0);
}
}

const roc_endpoint* source_endpoint() const {
Expand All @@ -132,6 +169,11 @@ class Proxy : public core::Thread, private packet::IWriter {
return input_repair_endp_;
}

const roc_endpoint* control_endpoint() const {
CHECK(input_control_endp_);
return input_control_endp_;
}

size_t n_dropped_packets() const {
return n_dropped_packets_;
}
Expand Down Expand Up @@ -180,9 +222,17 @@ class Proxy : public core::Thread, private packet::IWriter {
if (pp->udp()->dst_addr == recv_source_config_.bind_address) {
pp->udp()->dst_addr = receiver_source_endp_;
LONGS_EQUAL(status::StatusOK, source_queue_.write(pp));
} else {
} else if (pp->udp()->dst_addr == recv_repair_config_.bind_address) {
pp->udp()->dst_addr = receiver_repair_endp_;
LONGS_EQUAL(status::StatusOK, repair_queue_.write(pp));
} else if (input_control_endp_
&& pp->udp()->dst_addr == recv_control_config_.bind_address) {
pp->udp()->dst_addr = receiver_control_endp_;
LONGS_EQUAL(status::StatusOK, control_queue_.write(pp));
} else if (input_control_endp_ && pp->udp()->dst_addr == receiver_control_endp_) {
pp->udp()->src_addr = recv_control_config_.bind_address;
pp->udp()->dst_addr = send_config_.bind_address;
LONGS_EQUAL(status::StatusOK, control_queue_.write(pp));
}
Comment on lines +228 to 236
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, have you tested this, could you describe your testing approach? Or if you haven't, please do.

So we need to proxy control packets in two directions: receiver[control_endpoint] -> proxy -> sender[control_endpoint] and sender[control_endpoint] -> proxy -> receiver[control_endpoint].

In both cases dst_addr is the same - it's proxy.

In the second direction, proxy receives control (feedback) packets from sender on the same address from which proxy sends control packets to sender. But in constructor, I don't see that we create any UDP port that is both sending and receiving?

I guess the easiest way is to make the port that is currently sending (the very first AddUdpPort task) to be also receiving, and match packets by source address. You can find example of setting up bi-directional port in node::Sender::configure (look for StartUdpSend and StartUdpRecv).


for (;;) {
Expand All @@ -196,13 +246,16 @@ class Proxy : public core::Thread, private packet::IWriter {
}
} else {
const bool drop_packet = (flags_ & FlagLoseAllRepairPkts);

if (!send_packet_(repair_queue_, drop_packet)) {
break;
}
}
}

if (input_control_endp_) {
send_packet_(control_queue_, false);
}

return status::StatusOK;
}

Expand Down Expand Up @@ -244,15 +297,19 @@ class Proxy : public core::Thread, private packet::IWriter {
netio::UdpConfig send_config_;
netio::UdpConfig recv_source_config_;
netio::UdpConfig recv_repair_config_;
netio::UdpConfig recv_control_config_;

roc_endpoint* input_source_endp_;
roc_endpoint* input_repair_endp_;
roc_endpoint* input_control_endp_;

address::SocketAddr receiver_source_endp_;
address::SocketAddr receiver_repair_endp_;
address::SocketAddr receiver_control_endp_;

packet::FifoQueue source_queue_;
packet::FifoQueue repair_queue_;
packet::FifoQueue control_queue_;

packet::ConcurrentQueue queue_;
packet::IWriter* writer_;
Expand Down
6 changes: 3 additions & 3 deletions src/tests/public_api/test_loopback_sender_2_receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ TEST(loopback_sender_2_receiver, rs8m_with_losses) {

receiver.bind();

test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(),
test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), NULL,
test::SourcePackets, test::RepairPackets, Flags);

test::Sender sender(context, sender_conf, sample_step, FrameChans, test::FrameSamples,
Expand Down Expand Up @@ -280,7 +280,7 @@ TEST(loopback_sender_2_receiver, ldpc_with_losses) {

receiver.bind();

test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(),
test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), NULL,
test::SourcePackets, test::RepairPackets, Flags);

test::Sender sender(context, sender_conf, sample_step, FrameChans, test::FrameSamples,
Expand Down Expand Up @@ -656,7 +656,7 @@ TEST(loopback_sender_2_receiver, metrics_measurements) {

receiver.bind();

test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(),
test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), NULL,
test::SourcePackets, test::RepairPackets, Flags);

test::Sender sender(context, sender_conf, sample_step, FrameChans, test::FrameSamples,
Expand Down
4 changes: 2 additions & 2 deletions src/tests/public_api/test_plugin_plc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ TEST(plugin_plc, losses_restored_by_fec) {

receiver.bind();

test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(),
test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), NULL,
test::SourcePackets, test::RepairPackets, Flags);

test::Sender sender(context, sender_conf, SampleStep, NumChans,
Expand Down Expand Up @@ -303,7 +303,7 @@ TEST(plugin_plc, losses_restored_by_plc) {

receiver.bind();

test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(),
test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), NULL,
test::SourcePackets, test::RepairPackets, Flags);

test::Sender sender(context, sender_conf, SampleStep, NumChans,
Expand Down