diff --git a/src/tests/public_api/test_helpers/proxy.h b/src/tests/public_api/test_helpers/proxy.h index 1ddcc2d37..53f259671 100644 --- a/src/tests/public_api/test_helpers/proxy.h +++ b/src/tests/public_api/test_helpers/proxy.h @@ -34,6 +34,7 @@ class Proxy : public core::Thread, private packet::IWriter { public: Proxy(const roc_endpoint* receiver_source_endp, const roc_endpoint* receiver_repair_endp, + const roc_endpoint* receiver_control_endp, size_t n_source_packets, size_t n_repair_packets, unsigned flags) @@ -45,6 +46,7 @@ class Proxy : public core::Thread, private packet::IWriter { , n_repair_packets_(n_repair_packets) , flags_(flags) , pos_(0) { + input_control_endp_ = NULL; LONGS_EQUAL(status::StatusOK, net_loop_.init_status()); roc_protocol source_proto; @@ -53,6 +55,11 @@ class Proxy : public core::Thread, private packet::IWriter { roc_protocol repair_proto; CHECK(roc_endpoint_get_protocol(receiver_repair_endp, &repair_proto) == 0); + roc_protocol control_proto; + if (receiver_control_endp) { + CHECK(roc_endpoint_get_protocol(receiver_control_endp, &control_proto) == 0); + } + int source_port = 0; CHECK(roc_endpoint_get_port(receiver_source_endp, &source_port) == 0); @@ -71,6 +78,16 @@ class Proxy : public core::Thread, private packet::IWriter { "127.0.0.1", 0)); CHECK(recv_repair_config_.bind_address.set_host_port(address::Family_IPv4, "127.0.0.1", 0)); + if (receiver_control_endp) { + int control_port = 0; + CHECK(roc_endpoint_get_port(receiver_control_endp, &control_port) == 0); + + CHECK(receiver_control_endp_.set_host_port(address::Family_IPv4, "127.0.0.1", + control_port)); + + CHECK(recv_control_config_.bind_address.set_host_port(address::Family_IPv4, + "127.0.0.1", 0)); + } netio::NetworkLoop::PortHandle send_port = NULL; @@ -104,6 +121,15 @@ class Proxy : public core::Thread, private packet::IWriter { CHECK(net_loop_.schedule_and_wait(recv_task)); } + if (receiver_control_endp) { + netio::NetworkLoop::Tasks::AddUdpPort add_task(recv_control_config_); + CHECK(net_loop_.schedule_and_wait(add_task)); + + netio::NetworkLoop::Tasks::StartUdpRecv recv_task(add_task.get_handle(), + *this); + CHECK(net_loop_.schedule_and_wait(recv_task)); + } + CHECK(roc_endpoint_allocate(&input_source_endp_) == 0); CHECK(roc_endpoint_set_protocol(input_source_endp_, source_proto) == 0); CHECK(roc_endpoint_set_host(input_source_endp_, "127.0.0.1") == 0); @@ -117,11 +143,22 @@ class Proxy : public core::Thread, private packet::IWriter { CHECK(roc_endpoint_set_port(input_repair_endp_, recv_repair_config_.bind_address.port()) == 0); + if (receiver_control_endp) { + CHECK(roc_endpoint_allocate(&input_control_endp_) == 0); + CHECK(roc_endpoint_set_protocol(input_control_endp_, control_proto) == 0); + CHECK(roc_endpoint_set_host(input_control_endp_, "127.0.0.1") == 0); + CHECK(roc_endpoint_set_port(input_control_endp_, + recv_control_config_.bind_address.port()) + == 0); + } } ~Proxy() { CHECK(roc_endpoint_deallocate(input_source_endp_) == 0); CHECK(roc_endpoint_deallocate(input_repair_endp_) == 0); + if (input_control_endp_ != NULL) { + CHECK(roc_endpoint_deallocate(input_control_endp_) == 0); + } } const roc_endpoint* source_endpoint() const { @@ -132,6 +169,11 @@ class Proxy : public core::Thread, private packet::IWriter { return input_repair_endp_; } + const roc_endpoint* control_endpoint() const { + CHECK(input_control_endp_); + return input_control_endp_; + } + size_t n_dropped_packets() const { return n_dropped_packets_; } @@ -180,9 +222,17 @@ class Proxy : public core::Thread, private packet::IWriter { if (pp->udp()->dst_addr == recv_source_config_.bind_address) { pp->udp()->dst_addr = receiver_source_endp_; LONGS_EQUAL(status::StatusOK, source_queue_.write(pp)); - } else { + } else if (pp->udp()->dst_addr == recv_repair_config_.bind_address) { pp->udp()->dst_addr = receiver_repair_endp_; LONGS_EQUAL(status::StatusOK, repair_queue_.write(pp)); + } else if (input_control_endp_ + && pp->udp()->dst_addr == recv_control_config_.bind_address) { + pp->udp()->dst_addr = receiver_control_endp_; + LONGS_EQUAL(status::StatusOK, control_queue_.write(pp)); + } else if (input_control_endp_ && pp->udp()->dst_addr == receiver_control_endp_) { + pp->udp()->src_addr = recv_control_config_.bind_address; + pp->udp()->dst_addr = send_config_.bind_address; + LONGS_EQUAL(status::StatusOK, control_queue_.write(pp)); } for (;;) { @@ -196,13 +246,16 @@ class Proxy : public core::Thread, private packet::IWriter { } } else { const bool drop_packet = (flags_ & FlagLoseAllRepairPkts); - if (!send_packet_(repair_queue_, drop_packet)) { break; } } } + if (input_control_endp_) { + send_packet_(control_queue_, false); + } + return status::StatusOK; } @@ -244,15 +297,19 @@ class Proxy : public core::Thread, private packet::IWriter { netio::UdpConfig send_config_; netio::UdpConfig recv_source_config_; netio::UdpConfig recv_repair_config_; + netio::UdpConfig recv_control_config_; roc_endpoint* input_source_endp_; roc_endpoint* input_repair_endp_; + roc_endpoint* input_control_endp_; address::SocketAddr receiver_source_endp_; address::SocketAddr receiver_repair_endp_; + address::SocketAddr receiver_control_endp_; packet::FifoQueue source_queue_; packet::FifoQueue repair_queue_; + packet::FifoQueue control_queue_; packet::ConcurrentQueue queue_; packet::IWriter* writer_; diff --git a/src/tests/public_api/test_loopback_sender_2_receiver.cpp b/src/tests/public_api/test_loopback_sender_2_receiver.cpp index a83415afc..70def5e7c 100644 --- a/src/tests/public_api/test_loopback_sender_2_receiver.cpp +++ b/src/tests/public_api/test_loopback_sender_2_receiver.cpp @@ -214,7 +214,7 @@ TEST(loopback_sender_2_receiver, rs8m_with_losses) { receiver.bind(); - test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), + test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), NULL, test::SourcePackets, test::RepairPackets, Flags); test::Sender sender(context, sender_conf, sample_step, FrameChans, test::FrameSamples, @@ -280,7 +280,7 @@ TEST(loopback_sender_2_receiver, ldpc_with_losses) { receiver.bind(); - test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), + test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), NULL, test::SourcePackets, test::RepairPackets, Flags); test::Sender sender(context, sender_conf, sample_step, FrameChans, test::FrameSamples, @@ -656,7 +656,7 @@ TEST(loopback_sender_2_receiver, metrics_measurements) { receiver.bind(); - test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), + test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), NULL, test::SourcePackets, test::RepairPackets, Flags); test::Sender sender(context, sender_conf, sample_step, FrameChans, test::FrameSamples, diff --git a/src/tests/public_api/test_plugin_plc.cpp b/src/tests/public_api/test_plugin_plc.cpp index cae9ab713..b7bbf1f5b 100644 --- a/src/tests/public_api/test_plugin_plc.cpp +++ b/src/tests/public_api/test_plugin_plc.cpp @@ -248,7 +248,7 @@ TEST(plugin_plc, losses_restored_by_fec) { receiver.bind(); - test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), + test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), NULL, test::SourcePackets, test::RepairPackets, Flags); test::Sender sender(context, sender_conf, SampleStep, NumChans, @@ -303,7 +303,7 @@ TEST(plugin_plc, losses_restored_by_plc) { receiver.bind(); - test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), + test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), NULL, test::SourcePackets, test::RepairPackets, Flags); test::Sender sender(context, sender_conf, SampleStep, NumChans,