@@ -498,7 +498,12 @@ void client_mode::cleanup_expiring_forwarders()
498498
499499 int64_t time_elapsed = calculate_difference (time_right_now, expire_time);
500500
501- if (time_elapsed > CLEANUP_WAITS / 2 &&
501+ if (time_elapsed > CLEANUP_WAITS / 3 &&
502+ time_elapsed <= CLEANUP_WAITS / 3 * 2 &&
503+ forwarder_ptr != nullptr )
504+ forwarder_ptr->pause (true );
505+
506+ if (time_elapsed > CLEANUP_WAITS / 3 * 2 &&
502507 forwarder_ptr != nullptr )
503508 forwarder_ptr->stop ();
504509
@@ -556,7 +561,7 @@ void client_mode::loop_timeout_sessions()
556561
557562 std::shared_ptr<forwarder> egress_forwarder = std::atomic_load (&(udp_session_ptr->egress_forwarder ));
558563 old_forwarders.push_back (egress_forwarder);
559- egress_forwarder->stop ( );
564+ egress_forwarder->pause ( true );
560565#if __GNUC__ == 12 && __GNUC_MINOR__ < 3
561566 udp_session_ptr->egress_forwarder .store (nullptr );
562567#else
@@ -691,84 +696,69 @@ void client_mode::test_before_change(std::shared_ptr<udp_mappings> udp_mappings_
691696 uint16_t destination_port_end = current_settings.destination_port_end ;
692697 asio::error_code ec;
693698
694- if ( std::shared_ptr<forwarder> egress_hopping_forwarder = std::atomic_load (&(udp_mappings_ptr-> egress_hopping_forwarder )) ;
695- egress_hopping_forwarder == nullptr || destination_port_start == destination_port_end)
699+ std::shared_ptr<forwarder> udp_forwarder = nullptr ;
700+ try
696701 {
697- std::shared_ptr<forwarder> udp_forwarder = nullptr ;
698- try
699- {
700- auto bind_push_func = std::bind (&ttp::task_group_pool::push_task_peer, &sequence_task_pool, _1, _2, _3);
701- auto bind_check_limit_func = [this ](size_t number) -> bool {return sequence_task_pool.get_peer_network_task_count (number) > TASK_COUNT_LIMIT; };
702- auto udp_func = std::bind (&client_mode::udp_forwarder_incoming_to_udp, this , _1, _2, _3, _4, _5);
703- udp_forwarder = std::make_shared<forwarder>(io_context, bind_push_func, bind_check_limit_func, udp_mappings_ptr, udp_func, current_settings.ip_version_only );
704- if (udp_forwarder == nullptr )
705- {
706- udp_mappings_ptr->hopping_timestamp .store (time_right_now + current_settings.dynamic_port_refresh );
707- return ;
708- }
709- }
710- catch (std::exception &ex)
711- {
712- std::string error_message = time_to_string_with_square_brackets () + " Cannot switch to new port, error: " + ex.what () + " \n " ;
713- std::cerr << error_message;
714- print_message_to_file (error_message, current_settings.log_messages );
715- udp_mappings_ptr->hopping_timestamp .store (time_right_now + current_settings.dynamic_port_refresh );
716- return ;
717- }
718-
719- std::shared_ptr<udp::endpoint> egress_target_endpoint = std::atomic_load (&(udp_mappings_ptr->egress_target_endpoint ));
720- if (destination_port_start == destination_port_end)
721- {
722- std::atomic_store (&(udp_mappings_ptr->hopping_endpoint ), std::make_shared<udp::endpoint>(*egress_target_endpoint));
723- }
724- else
725- {
726- uint16_t current_port_number = egress_target_endpoint->port ();
727- uint16_t new_port_numer = generate_new_port_number (destination_port_start, destination_port_end);
728- for (size_t retry_times = 0 ; new_port_numer == current_port_number && retry_times < RETRY_TIMES; retry_times++)
729- {
730- new_port_numer = generate_new_port_number (destination_port_start, destination_port_end);
731- }
732- std::shared_ptr<asio::ip::address> target = std::atomic_load (&(target_address));
733- std::atomic_store (&(udp_mappings_ptr->hopping_endpoint ), std::make_shared<udp::endpoint>(*target, new_port_numer));
734- }
735-
736- std::shared_ptr<forwarder> new_forwarder = udp_forwarder;
737- std::vector<uint8_t > keep_alive_packet = create_empty_data (current_settings.encryption_password , current_settings.encryption , EMPTY_PACKET_SIZE);
738- udp_mappings_ptr->wrapper_ptr ->write_iden (keep_alive_packet.data ());
739-
740- if (current_settings.ip_version_only == ip_only_options::ipv4)
741- new_forwarder->send_out (std::move (keep_alive_packet), local_empty_target_v4, ec);
742- else
743- new_forwarder->send_out (std::move (keep_alive_packet), local_empty_target_v6, ec);
744-
745- if (ec)
702+ auto bind_push_func = std::bind (&ttp::task_group_pool::push_task_peer, &sequence_task_pool, _1, _2, _3);
703+ auto bind_check_limit_func = [this ](size_t number) -> bool {return sequence_task_pool.get_peer_network_task_count (number) > TASK_COUNT_LIMIT; };
704+ auto udp_func = std::bind (&client_mode::udp_forwarder_incoming_to_udp, this , _1, _2, _3, _4, _5);
705+ udp_forwarder = std::make_shared<forwarder>(io_context, bind_push_func, bind_check_limit_func, udp_mappings_ptr, udp_func, current_settings.ip_version_only );
706+ if (udp_forwarder == nullptr )
746707 {
747708 udp_mappings_ptr->hopping_timestamp .store (time_right_now + current_settings.dynamic_port_refresh );
748709 return ;
749710 }
711+ }
712+ catch (std::exception &ex)
713+ {
714+ std::string error_message = time_to_string_with_square_brackets () + " Cannot switch to new port, error: " + ex.what () + " \n " ;
715+ std::cerr << error_message;
716+ print_message_to_file (error_message, current_settings.log_messages );
717+ udp_mappings_ptr->hopping_timestamp .store (time_right_now + current_settings.dynamic_port_refresh );
718+ return ;
719+ }
750720
751- new_forwarder->async_receive ();
752- if (egress_hopping_forwarder != nullptr )
753- {
754- egress_hopping_forwarder->pause (true );
755- std::scoped_lock lock_expiring_forwarders{ mutex_expiring_forwarders };
756- expiring_forwarders[egress_hopping_forwarder] = right_now ();
757- }
758- std::atomic_store (&(udp_mappings_ptr->egress_hopping_forwarder ), new_forwarder);
721+ std::shared_ptr<udp::endpoint> egress_target_endpoint = std::atomic_load (&(udp_mappings_ptr->egress_target_endpoint ));
722+ if (destination_port_start == destination_port_end)
723+ {
724+ std::atomic_store (&(udp_mappings_ptr->hopping_endpoint ), std::make_shared<udp::endpoint>(*egress_target_endpoint));
759725 }
760726 else
761727 {
762- std::shared_ptr<udp::endpoint> hopping_endpoint = std::atomic_load (&(udp_mappings_ptr->hopping_endpoint ));
763- uint16_t current_port_number = hopping_endpoint->port ();
728+ uint16_t current_port_number = egress_target_endpoint->port ();
764729 uint16_t new_port_numer = generate_new_port_number (destination_port_start, destination_port_end);
765730 for (size_t retry_times = 0 ; new_port_numer == current_port_number && retry_times < RETRY_TIMES; retry_times++)
766731 {
767732 new_port_numer = generate_new_port_number (destination_port_start, destination_port_end);
768733 }
769- hopping_endpoint->port (new_port_numer);
770- std::atomic_store (&(udp_mappings_ptr->hopping_endpoint ), hopping_endpoint);
734+ std::shared_ptr<asio::ip::address> target = std::atomic_load (&(target_address));
735+ std::atomic_store (&(udp_mappings_ptr->hopping_endpoint ), std::make_shared<udp::endpoint>(*target, new_port_numer));
736+ }
737+
738+ std::shared_ptr<forwarder> new_forwarder = udp_forwarder;
739+ std::vector<uint8_t > keep_alive_packet = create_empty_data (current_settings.encryption_password , current_settings.encryption , EMPTY_PACKET_SIZE);
740+ udp_mappings_ptr->wrapper_ptr ->write_iden (keep_alive_packet.data ());
741+
742+ if (current_settings.ip_version_only == ip_only_options::ipv4)
743+ new_forwarder->send_out (std::move (keep_alive_packet), local_empty_target_v4, ec);
744+ else
745+ new_forwarder->send_out (std::move (keep_alive_packet), local_empty_target_v6, ec);
746+
747+ if (ec)
748+ {
749+ udp_mappings_ptr->hopping_timestamp .store (time_right_now + current_settings.dynamic_port_refresh );
750+ return ;
751+ }
752+
753+ new_forwarder->async_receive ();
754+ if (std::shared_ptr<forwarder> egress_hopping_forwarder = std::atomic_load (&(udp_mappings_ptr->egress_hopping_forwarder ));
755+ egress_hopping_forwarder != nullptr )
756+ {
757+ egress_hopping_forwarder->pause (true );
758+ std::scoped_lock lock_expiring_forwarders{ mutex_expiring_forwarders };
759+ expiring_forwarders[egress_hopping_forwarder] = right_now ();
771760 }
761+ std::atomic_store (&(udp_mappings_ptr->egress_hopping_forwarder ), new_forwarder);
772762
773763 udp_mappings_ptr->hopping_available .store (hop_status::testing);
774764 udp_mappings *udp_session_ptr = udp_mappings_ptr.get ();
@@ -788,8 +778,9 @@ void client_mode::switch_new_port(std::shared_ptr<udp_mappings> udp_mappings_ptr
788778 udp_mappings_ptr->hopping_timestamp .store (right_now () + current_settings.dynamic_port_refresh );
789779 udp_mappings_ptr->hopping_available .store (hop_status::pending);
790780
781+ std::shared_ptr<udp::endpoint> hopping_endpoint = std::atomic_load (&(udp_mappings_ptr->hopping_endpoint ));
791782 std::atomic_store (&(udp_mappings_ptr->egress_previous_target_endpoint ), std::atomic_load (&(udp_mappings_ptr->egress_target_endpoint )));
792- std::atomic_store (&(udp_mappings_ptr->egress_target_endpoint ), std::make_shared<udp::endpoint>(*std::atomic_load (&(udp_mappings_ptr-> hopping_endpoint )) ));
783+ std::atomic_store (&(udp_mappings_ptr->egress_target_endpoint ), std::make_shared<udp::endpoint>(*hopping_endpoint));
793784
794785 std::shared_ptr<forwarder> new_forwarder = std::atomic_load (&(udp_mappings_ptr->egress_hopping_forwarder ));
795786 std::shared_ptr<forwarder> old_forwarder = std::atomic_load (&(udp_mappings_ptr->egress_forwarder ));
0 commit comments