From f2afaeb9715c6050cdd558e54278051cfa75edd8 Mon Sep 17 00:00:00 2001 From: GabrielPerez0522 Date: Wed, 12 Feb 2025 16:03:05 -0600 Subject: [PATCH 1/4] Added optional control packet support to test::Proxy --- src/tests/public_api/test_helpers/proxy.h | 144 +++++++++++++++++++++- 1 file changed, 141 insertions(+), 3 deletions(-) diff --git a/src/tests/public_api/test_helpers/proxy.h b/src/tests/public_api/test_helpers/proxy.h index 1ddcc2d37..62a3602fa 100644 --- a/src/tests/public_api/test_helpers/proxy.h +++ b/src/tests/public_api/test_helpers/proxy.h @@ -43,8 +43,10 @@ class Proxy : public core::Thread, private packet::IWriter { , net_loop_(packet_pool_, buffer_pool_, arena_) , n_source_packets_(n_source_packets) , n_repair_packets_(n_repair_packets) + , n_control_packets_(0) , flags_(flags) , pos_(0) { + input_control_endp_ = NULL; LONGS_EQUAL(status::StatusOK, net_loop_.init_status()); roc_protocol source_proto; @@ -119,9 +121,129 @@ class Proxy : public core::Thread, private packet::IWriter { == 0); } + Proxy(const roc_endpoint* receiver_source_endp, + const roc_endpoint* receiver_repair_endp, + const roc_endpoint* receiver_control_endp, + size_t n_source_packets, + size_t n_repair_packets, + size_t n_control_packets, + unsigned flags) + : packet_pool_("proxy_packet_pool", arena_) + , buffer_pool_("proxy_buffer_pool", arena_, 2000) + , queue_(packet::ConcurrentQueue::Blocking) + , net_loop_(packet_pool_, buffer_pool_, arena_) + , n_source_packets_(n_source_packets) + , n_repair_packets_(n_repair_packets) + , n_control_packets_(n_control_packets) + , flags_(flags) + , pos_(0) { + LONGS_EQUAL(status::StatusOK, net_loop_.init_status()); + + roc_protocol source_proto; + CHECK(roc_endpoint_get_protocol(receiver_source_endp, &source_proto) == 0); + + roc_protocol repair_proto; + CHECK(roc_endpoint_get_protocol(receiver_repair_endp, &repair_proto) == 0); + + roc_protocol control_proto; + CHECK(roc_endpoint_get_protocol(receiver_control_endp, &control_proto) == 0); + + int source_port = 0; + CHECK(roc_endpoint_get_port(receiver_source_endp, &source_port) == 0); + + int repair_port = 0; + CHECK(roc_endpoint_get_port(receiver_repair_endp, &repair_port) == 0); + + int control_port = 0; + CHECK(roc_endpoint_get_port(receiver_control_endp, &control_port) == 0); + + CHECK(receiver_source_endp_.set_host_port(address::Family_IPv4, "127.0.0.1", + source_port)); + CHECK(receiver_repair_endp_.set_host_port(address::Family_IPv4, "127.0.0.1", + repair_port)); + + CHECK(receiver_control_endp_.set_host_port(address::Family_IPv4, "127.0.0.1", + control_port)); + + CHECK(send_config_.bind_address.set_host_port(address::Family_IPv4, "127.0.0.1", + 0)); + + CHECK(recv_source_config_.bind_address.set_host_port(address::Family_IPv4, + "127.0.0.1", 0)); + CHECK(recv_repair_config_.bind_address.set_host_port(address::Family_IPv4, + "127.0.0.1", 0)); + CHECK(recv_control_config_.bind_address.set_host_port(address::Family_IPv4, + "127.0.0.1", 0)); + + netio::NetworkLoop::PortHandle send_port = NULL; + + { + netio::NetworkLoop::Tasks::AddUdpPort add_task(send_config_); + CHECK(net_loop_.schedule_and_wait(add_task)); + + send_port = add_task.get_handle(); + CHECK(send_port); + + netio::NetworkLoop::Tasks::StartUdpSend send_task(add_task.get_handle()); + CHECK(net_loop_.schedule_and_wait(send_task)); + writer_ = &send_task.get_outbound_writer(); + } + + { + netio::NetworkLoop::Tasks::AddUdpPort add_task(recv_source_config_); + CHECK(net_loop_.schedule_and_wait(add_task)); + + netio::NetworkLoop::Tasks::StartUdpRecv recv_task(add_task.get_handle(), + *this); + CHECK(net_loop_.schedule_and_wait(recv_task)); + } + + { + netio::NetworkLoop::Tasks::AddUdpPort add_task(recv_repair_config_); + CHECK(net_loop_.schedule_and_wait(add_task)); + + netio::NetworkLoop::Tasks::StartUdpRecv recv_task(add_task.get_handle(), + *this); + CHECK(net_loop_.schedule_and_wait(recv_task)); + } + + { + netio::NetworkLoop::Tasks::AddUdpPort add_task(recv_control_config_); + CHECK(net_loop_.schedule_and_wait(add_task)); + + netio::NetworkLoop::Tasks::StartUdpRecv recv_task(add_task.get_handle(), + *this); + CHECK(net_loop_.schedule_and_wait(recv_task)); + } + + CHECK(roc_endpoint_allocate(&input_source_endp_) == 0); + CHECK(roc_endpoint_set_protocol(input_source_endp_, source_proto) == 0); + CHECK(roc_endpoint_set_host(input_source_endp_, "127.0.0.1") == 0); + CHECK(roc_endpoint_set_port(input_source_endp_, + recv_source_config_.bind_address.port()) + == 0); + + CHECK(roc_endpoint_allocate(&input_repair_endp_) == 0); + CHECK(roc_endpoint_set_protocol(input_repair_endp_, repair_proto) == 0); + CHECK(roc_endpoint_set_host(input_repair_endp_, "127.0.0.1") == 0); + CHECK(roc_endpoint_set_port(input_repair_endp_, + recv_repair_config_.bind_address.port()) + == 0); + + CHECK(roc_endpoint_allocate(&input_control_endp_) == 0); + CHECK(roc_endpoint_set_protocol(input_control_endp_, control_proto) == 0); + CHECK(roc_endpoint_set_host(input_control_endp_, "127.0.0.1") == 0); + CHECK(roc_endpoint_set_port(input_control_endp_, + recv_control_config_.bind_address.port()) + == 0); + } + ~Proxy() { CHECK(roc_endpoint_deallocate(input_source_endp_) == 0); CHECK(roc_endpoint_deallocate(input_repair_endp_) == 0); + if (input_control_endp_ != NULL) { + CHECK(roc_endpoint_deallocate(input_control_endp_) == 0); + } } const roc_endpoint* source_endpoint() const { @@ -132,6 +254,10 @@ class Proxy : public core::Thread, private packet::IWriter { return input_repair_endp_; } + const roc_endpoint* control_endpoint() const { + return input_control_endp_; + } + size_t n_dropped_packets() const { return n_dropped_packets_; } @@ -180,13 +306,17 @@ class Proxy : public core::Thread, private packet::IWriter { if (pp->udp()->dst_addr == recv_source_config_.bind_address) { pp->udp()->dst_addr = receiver_source_endp_; LONGS_EQUAL(status::StatusOK, source_queue_.write(pp)); + } else if (pp->udp()->dst_addr == recv_control_config_.bind_address) { + pp->udp()->dst_addr = receiver_control_endp_; + LONGS_EQUAL(status::StatusOK, control_queue_.write(pp)); } else { pp->udp()->dst_addr = receiver_repair_endp_; LONGS_EQUAL(status::StatusOK, repair_queue_.write(pp)); } for (;;) { - const size_t block_pos = pos_ % (n_source_packets_ + n_repair_packets_); + const size_t block_pos = + pos_ % (n_source_packets_ + n_repair_packets_ + n_control_packets_); if (block_pos < n_source_packets_) { const bool drop_packet = (flags_ & FlagLoseSomePkts) && (block_pos == 1); @@ -194,12 +324,15 @@ class Proxy : public core::Thread, private packet::IWriter { if (!send_packet_(source_queue_, drop_packet)) { break; } - } else { + } else if (block_pos < (n_source_packets_ + n_repair_packets_)) { const bool drop_packet = (flags_ & FlagLoseAllRepairPkts); - if (!send_packet_(repair_queue_, drop_packet)) { break; } + } else { + if (!send_packet_(control_queue_, false)) { + break; + } } } @@ -244,15 +377,19 @@ class Proxy : public core::Thread, private packet::IWriter { netio::UdpConfig send_config_; netio::UdpConfig recv_source_config_; netio::UdpConfig recv_repair_config_; + netio::UdpConfig recv_control_config_; roc_endpoint* input_source_endp_; roc_endpoint* input_repair_endp_; + roc_endpoint* input_control_endp_; address::SocketAddr receiver_source_endp_; address::SocketAddr receiver_repair_endp_; + address::SocketAddr receiver_control_endp_; packet::FifoQueue source_queue_; packet::FifoQueue repair_queue_; + packet::FifoQueue control_queue_; packet::ConcurrentQueue queue_; packet::IWriter* writer_; @@ -261,6 +398,7 @@ class Proxy : public core::Thread, private packet::IWriter { const size_t n_source_packets_; const size_t n_repair_packets_; + const size_t n_control_packets_; core::Atomic n_dropped_packets_; const unsigned flags_; From 32dae4cca8fce4984704c4b385fdb9f12835afce Mon Sep 17 00:00:00 2001 From: GabrielPerez0522 Date: Thu, 6 Mar 2025 16:11:13 -0600 Subject: [PATCH 2/4] Updated implementation for bi-directional control packets --- src/tests/public_api/test_helpers/proxy.h | 148 ++++++---------------- 1 file changed, 38 insertions(+), 110 deletions(-) diff --git a/src/tests/public_api/test_helpers/proxy.h b/src/tests/public_api/test_helpers/proxy.h index 62a3602fa..8d32ac585 100644 --- a/src/tests/public_api/test_helpers/proxy.h +++ b/src/tests/public_api/test_helpers/proxy.h @@ -34,6 +34,7 @@ class Proxy : public core::Thread, private packet::IWriter { public: Proxy(const roc_endpoint* receiver_source_endp, const roc_endpoint* receiver_repair_endp, + const roc_endpoint* receiver_control_endp, size_t n_source_packets, size_t n_repair_packets, unsigned flags) @@ -43,7 +44,6 @@ class Proxy : public core::Thread, private packet::IWriter { , net_loop_(packet_pool_, buffer_pool_, arena_) , n_source_packets_(n_source_packets) , n_repair_packets_(n_repair_packets) - , n_control_packets_(0) , flags_(flags) , pos_(0) { input_control_endp_ = NULL; @@ -73,98 +73,20 @@ class Proxy : public core::Thread, private packet::IWriter { "127.0.0.1", 0)); CHECK(recv_repair_config_.bind_address.set_host_port(address::Family_IPv4, "127.0.0.1", 0)); + if (receiver_control_endp) { + roc_protocol control_proto; + CHECK(roc_endpoint_get_protocol(receiver_control_endp, &control_proto) == 0); - netio::NetworkLoop::PortHandle send_port = NULL; - - { - netio::NetworkLoop::Tasks::AddUdpPort add_task(send_config_); - CHECK(net_loop_.schedule_and_wait(add_task)); - - send_port = add_task.get_handle(); - CHECK(send_port); - - netio::NetworkLoop::Tasks::StartUdpSend send_task(add_task.get_handle()); - CHECK(net_loop_.schedule_and_wait(send_task)); - writer_ = &send_task.get_outbound_writer(); - } - - { - netio::NetworkLoop::Tasks::AddUdpPort add_task(recv_source_config_); - CHECK(net_loop_.schedule_and_wait(add_task)); - - netio::NetworkLoop::Tasks::StartUdpRecv recv_task(add_task.get_handle(), - *this); - CHECK(net_loop_.schedule_and_wait(recv_task)); - } + int control_port = 0; + CHECK(roc_endpoint_get_port(receiver_control_endp, &control_port) == 0); - { - netio::NetworkLoop::Tasks::AddUdpPort add_task(recv_repair_config_); - CHECK(net_loop_.schedule_and_wait(add_task)); + CHECK(receiver_control_endp_.set_host_port(address::Family_IPv4, "127.0.0.1", + control_port)); - netio::NetworkLoop::Tasks::StartUdpRecv recv_task(add_task.get_handle(), - *this); - CHECK(net_loop_.schedule_and_wait(recv_task)); + CHECK(recv_control_config_.bind_address.set_host_port(address::Family_IPv4, + "127.0.0.1", 0)); } - CHECK(roc_endpoint_allocate(&input_source_endp_) == 0); - CHECK(roc_endpoint_set_protocol(input_source_endp_, source_proto) == 0); - CHECK(roc_endpoint_set_host(input_source_endp_, "127.0.0.1") == 0); - CHECK(roc_endpoint_set_port(input_source_endp_, - recv_source_config_.bind_address.port()) - == 0); - - CHECK(roc_endpoint_allocate(&input_repair_endp_) == 0); - CHECK(roc_endpoint_set_protocol(input_repair_endp_, repair_proto) == 0); - CHECK(roc_endpoint_set_host(input_repair_endp_, "127.0.0.1") == 0); - CHECK(roc_endpoint_set_port(input_repair_endp_, - recv_repair_config_.bind_address.port()) - == 0); - } - - Proxy(const roc_endpoint* receiver_source_endp, - const roc_endpoint* receiver_repair_endp, - const roc_endpoint* receiver_control_endp, - size_t n_source_packets, - size_t n_repair_packets, - size_t n_control_packets, - unsigned flags) - : packet_pool_("proxy_packet_pool", arena_) - , buffer_pool_("proxy_buffer_pool", arena_, 2000) - , queue_(packet::ConcurrentQueue::Blocking) - , net_loop_(packet_pool_, buffer_pool_, arena_) - , n_source_packets_(n_source_packets) - , n_repair_packets_(n_repair_packets) - , n_control_packets_(n_control_packets) - , flags_(flags) - , pos_(0) { - LONGS_EQUAL(status::StatusOK, net_loop_.init_status()); - - roc_protocol source_proto; - CHECK(roc_endpoint_get_protocol(receiver_source_endp, &source_proto) == 0); - - roc_protocol repair_proto; - CHECK(roc_endpoint_get_protocol(receiver_repair_endp, &repair_proto) == 0); - - roc_protocol control_proto; - CHECK(roc_endpoint_get_protocol(receiver_control_endp, &control_proto) == 0); - - int source_port = 0; - CHECK(roc_endpoint_get_port(receiver_source_endp, &source_port) == 0); - - int repair_port = 0; - CHECK(roc_endpoint_get_port(receiver_repair_endp, &repair_port) == 0); - - int control_port = 0; - CHECK(roc_endpoint_get_port(receiver_control_endp, &control_port) == 0); - - CHECK(receiver_source_endp_.set_host_port(address::Family_IPv4, "127.0.0.1", - source_port)); - CHECK(receiver_repair_endp_.set_host_port(address::Family_IPv4, "127.0.0.1", - repair_port)); - - CHECK(receiver_control_endp_.set_host_port(address::Family_IPv4, "127.0.0.1", - control_port)); - CHECK(send_config_.bind_address.set_host_port(address::Family_IPv4, "127.0.0.1", 0)); @@ -172,8 +94,6 @@ class Proxy : public core::Thread, private packet::IWriter { "127.0.0.1", 0)); CHECK(recv_repair_config_.bind_address.set_host_port(address::Family_IPv4, "127.0.0.1", 0)); - CHECK(recv_control_config_.bind_address.set_host_port(address::Family_IPv4, - "127.0.0.1", 0)); netio::NetworkLoop::PortHandle send_port = NULL; @@ -207,7 +127,7 @@ class Proxy : public core::Thread, private packet::IWriter { CHECK(net_loop_.schedule_and_wait(recv_task)); } - { + if (receiver_control_endp) { netio::NetworkLoop::Tasks::AddUdpPort add_task(recv_control_config_); CHECK(net_loop_.schedule_and_wait(add_task)); @@ -229,13 +149,17 @@ class Proxy : public core::Thread, private packet::IWriter { CHECK(roc_endpoint_set_port(input_repair_endp_, recv_repair_config_.bind_address.port()) == 0); - - CHECK(roc_endpoint_allocate(&input_control_endp_) == 0); - CHECK(roc_endpoint_set_protocol(input_control_endp_, control_proto) == 0); - CHECK(roc_endpoint_set_host(input_control_endp_, "127.0.0.1") == 0); - CHECK(roc_endpoint_set_port(input_control_endp_, - recv_control_config_.bind_address.port()) - == 0); + if (receiver_control_endp) { + roc_protocol control_proto; + CHECK(roc_endpoint_get_protocol(receiver_control_endp, &control_proto) == 0); + + CHECK(roc_endpoint_allocate(&input_control_endp_) == 0); + CHECK(roc_endpoint_set_protocol(input_control_endp_, control_proto) == 0); + CHECK(roc_endpoint_set_host(input_control_endp_, "127.0.0.1") == 0); + CHECK(roc_endpoint_set_port(input_control_endp_, + recv_control_config_.bind_address.port()) + == 0); + } } ~Proxy() { @@ -255,6 +179,7 @@ class Proxy : public core::Thread, private packet::IWriter { } const roc_endpoint* control_endpoint() const { + CHECK(input_control_endp_); return input_control_endp_; } @@ -306,17 +231,21 @@ class Proxy : public core::Thread, private packet::IWriter { if (pp->udp()->dst_addr == recv_source_config_.bind_address) { pp->udp()->dst_addr = receiver_source_endp_; LONGS_EQUAL(status::StatusOK, source_queue_.write(pp)); - } else if (pp->udp()->dst_addr == recv_control_config_.bind_address) { - pp->udp()->dst_addr = receiver_control_endp_; - LONGS_EQUAL(status::StatusOK, control_queue_.write(pp)); - } else { + } else if (pp->udp()->dst_addr == recv_repair_config_.bind_address) { pp->udp()->dst_addr = receiver_repair_endp_; LONGS_EQUAL(status::StatusOK, repair_queue_.write(pp)); + } else if (input_control_endp_ + && pp->udp()->dst_addr == recv_control_config_.bind_address) { + pp->udp()->dst_addr = receiver_control_endp_; + LONGS_EQUAL(status::StatusOK, control_queue_.write(pp)); + } else if (input_control_endp_ && pp->udp()->dst_addr == receiver_control_endp_) { + pp->udp()->src_addr = recv_control_config_.bind_address; + pp->udp()->dst_addr = send_config_.bind_address; + LONGS_EQUAL(status::StatusOK, control_queue_.write(pp)); } for (;;) { - const size_t block_pos = - pos_ % (n_source_packets_ + n_repair_packets_ + n_control_packets_); + const size_t block_pos = pos_ % (n_source_packets_ + n_repair_packets_); if (block_pos < n_source_packets_) { const bool drop_packet = (flags_ & FlagLoseSomePkts) && (block_pos == 1); @@ -324,18 +253,18 @@ class Proxy : public core::Thread, private packet::IWriter { if (!send_packet_(source_queue_, drop_packet)) { break; } - } else if (block_pos < (n_source_packets_ + n_repair_packets_)) { + } else { const bool drop_packet = (flags_ & FlagLoseAllRepairPkts); if (!send_packet_(repair_queue_, drop_packet)) { break; } - } else { - if (!send_packet_(control_queue_, false)) { - break; - } } } + if (input_control_endp_) { + send_packet_(control_queue_, false); + } + return status::StatusOK; } @@ -398,7 +327,6 @@ class Proxy : public core::Thread, private packet::IWriter { const size_t n_source_packets_; const size_t n_repair_packets_; - const size_t n_control_packets_; core::Atomic n_dropped_packets_; const unsigned flags_; From afeeaf5aacd74107e70b44cf37258d347557c1f6 Mon Sep 17 00:00:00 2001 From: GabrielPerez0522 Date: Thu, 6 Mar 2025 16:17:48 -0600 Subject: [PATCH 3/4] Updated tests for bi-directional control packets --- src/tests/public_api/test_loopback_sender_2_receiver.cpp | 6 +++--- src/tests/public_api/test_plugin_plc.cpp | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/tests/public_api/test_loopback_sender_2_receiver.cpp b/src/tests/public_api/test_loopback_sender_2_receiver.cpp index a83415afc..70def5e7c 100644 --- a/src/tests/public_api/test_loopback_sender_2_receiver.cpp +++ b/src/tests/public_api/test_loopback_sender_2_receiver.cpp @@ -214,7 +214,7 @@ TEST(loopback_sender_2_receiver, rs8m_with_losses) { receiver.bind(); - test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), + test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), NULL, test::SourcePackets, test::RepairPackets, Flags); test::Sender sender(context, sender_conf, sample_step, FrameChans, test::FrameSamples, @@ -280,7 +280,7 @@ TEST(loopback_sender_2_receiver, ldpc_with_losses) { receiver.bind(); - test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), + test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), NULL, test::SourcePackets, test::RepairPackets, Flags); test::Sender sender(context, sender_conf, sample_step, FrameChans, test::FrameSamples, @@ -656,7 +656,7 @@ TEST(loopback_sender_2_receiver, metrics_measurements) { receiver.bind(); - test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), + test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), NULL, test::SourcePackets, test::RepairPackets, Flags); test::Sender sender(context, sender_conf, sample_step, FrameChans, test::FrameSamples, diff --git a/src/tests/public_api/test_plugin_plc.cpp b/src/tests/public_api/test_plugin_plc.cpp index cae9ab713..b7bbf1f5b 100644 --- a/src/tests/public_api/test_plugin_plc.cpp +++ b/src/tests/public_api/test_plugin_plc.cpp @@ -248,7 +248,7 @@ TEST(plugin_plc, losses_restored_by_fec) { receiver.bind(); - test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), + test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), NULL, test::SourcePackets, test::RepairPackets, Flags); test::Sender sender(context, sender_conf, SampleStep, NumChans, @@ -303,7 +303,7 @@ TEST(plugin_plc, losses_restored_by_plc) { receiver.bind(); - test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), + test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(), NULL, test::SourcePackets, test::RepairPackets, Flags); test::Sender sender(context, sender_conf, SampleStep, NumChans, From 01f2bb7cac91f6801486ab198cf0fb04edd0048f Mon Sep 17 00:00:00 2001 From: GabrielPerez0522 Date: Thu, 6 Mar 2025 16:33:34 -0600 Subject: [PATCH 4/4] Finalized formatting for bi-directional control packet handling --- src/tests/public_api/test_helpers/proxy.h | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/src/tests/public_api/test_helpers/proxy.h b/src/tests/public_api/test_helpers/proxy.h index 8d32ac585..53f259671 100644 --- a/src/tests/public_api/test_helpers/proxy.h +++ b/src/tests/public_api/test_helpers/proxy.h @@ -55,6 +55,11 @@ class Proxy : public core::Thread, private packet::IWriter { roc_protocol repair_proto; CHECK(roc_endpoint_get_protocol(receiver_repair_endp, &repair_proto) == 0); + roc_protocol control_proto; + if (receiver_control_endp) { + CHECK(roc_endpoint_get_protocol(receiver_control_endp, &control_proto) == 0); + } + int source_port = 0; CHECK(roc_endpoint_get_port(receiver_source_endp, &source_port) == 0); @@ -74,9 +79,6 @@ class Proxy : public core::Thread, private packet::IWriter { CHECK(recv_repair_config_.bind_address.set_host_port(address::Family_IPv4, "127.0.0.1", 0)); if (receiver_control_endp) { - roc_protocol control_proto; - CHECK(roc_endpoint_get_protocol(receiver_control_endp, &control_proto) == 0); - int control_port = 0; CHECK(roc_endpoint_get_port(receiver_control_endp, &control_port) == 0); @@ -87,14 +89,6 @@ class Proxy : public core::Thread, private packet::IWriter { "127.0.0.1", 0)); } - CHECK(send_config_.bind_address.set_host_port(address::Family_IPv4, "127.0.0.1", - 0)); - - CHECK(recv_source_config_.bind_address.set_host_port(address::Family_IPv4, - "127.0.0.1", 0)); - CHECK(recv_repair_config_.bind_address.set_host_port(address::Family_IPv4, - "127.0.0.1", 0)); - netio::NetworkLoop::PortHandle send_port = NULL; { @@ -150,9 +144,6 @@ class Proxy : public core::Thread, private packet::IWriter { recv_repair_config_.bind_address.port()) == 0); if (receiver_control_endp) { - roc_protocol control_proto; - CHECK(roc_endpoint_get_protocol(receiver_control_endp, &control_proto) == 0); - CHECK(roc_endpoint_allocate(&input_control_endp_) == 0); CHECK(roc_endpoint_set_protocol(input_control_endp_, control_proto) == 0); CHECK(roc_endpoint_set_host(input_control_endp_, "127.0.0.1") == 0);