diff --git a/doc/en/transfer-engine.md b/doc/en/transfer-engine.md index db40d54c6..17bb4d67a 100644 --- a/doc/en/transfer-engine.md +++ b/doc/en/transfer-engine.md @@ -440,4 +440,5 @@ For advanced users, TransferEngine provides the following advanced runtime optio - `MC_REDIS_PASSWORD` The password for Redis storage plugin, only takes effect when Redis is specified as the metadata server. If not set, no authentication will be attempted to log in to the Redis. - `MC_REDIS_DB_INDEX` The database index for Redis storage plugin, must be an integer between 0 and 255. Only takes effect when Redis is specified as the metadata server. If not set or invalid, the default value is 0. - `MC_FRAGMENT_RATIO ` In RdmaTransport::submitTransferTask, if the last data piece after division is ≤ 1/MC_FRAGMENT_RATIO of the block size, it merges with the previous block to reduce overhead. The default value is 4 -- `MC_ENABLE_DEST_DEVICE_AFFINITY` Enable device affinity for RDMA performance optimization. When enabled, Transfer Engine will prioritize communication with remote NICs that have the same name as local NICs to reduce QP count and improve network performance in rail-optimized topologies. The default value is false \ No newline at end of file +- `MC_ENABLE_DEST_DEVICE_AFFINITY` Enable device affinity for RDMA performance optimization. When enabled, Transfer Engine will prioritize communication with remote NICs that have the same name as local NICs to reduce QP count and improve network performance in rail-optimized topologies. The default value is false +- `MC_ENABLE_REUSEADDR` Force the TCP daemon to enable the REUSEADDR option. This avoids rare cases where two processes use the same port. However, it also prevents reusing the port immediately after shutting down previous connections. diff --git a/mooncake-transfer-engine/include/common.h b/mooncake-transfer-engine/include/common.h index a18b19ad0..2677073a3 100644 --- a/mooncake-transfer-engine/include/common.h +++ b/mooncake-transfer-engine/include/common.h @@ -15,10 +15,10 @@ #ifndef COMMON_H #define COMMON_H -#include -#include #include +#include #include +#include #include #include #include @@ -123,48 +123,54 @@ static inline std::string getCurrentDateTime() { uint16_t getDefaultHandshakePort(); -template +template std::optional parseFromString(std::string_view str) { T result = T(); - auto [ptr, ec] = std::from_chars(str.data(), str.data() + str.size(), result); + auto [ptr, ec] = + std::from_chars(str.data(), str.data() + str.size(), result); if (ec != std::errc() || ptr != str.data() + str.size()) { return {}; } return {std::move(result)}; } -static inline uint16_t getPortFromString(std::string_view port_string, uint16_t default_port) { +static inline uint16_t getPortFromString(std::string_view port_string, + uint16_t default_port) { std::optional port = parseFromString(port_string); if (port.has_value()) { return *port; } - LOG(WARNING) << "Illegal port number in " << port_string << ". Use default port " << default_port << " instead"; + LOG(WARNING) << "Illegal port number in " << port_string + << ". Use default port " << default_port << " instead"; return default_port; } -static inline bool isValidIpV6(const std::string& address) { +static inline bool isValidIpV6(const std::string &address) { sockaddr_in6 addr; std::memset(&addr, 0, sizeof(addr)); return inet_pton(AF_INET6, address.c_str(), &addr.sin6_addr) == 1; } -static inline std::string maybeWrapIpV6(const std::string& address) { +static inline std::string maybeWrapIpV6(const std::string &address) { if (isValidIpV6(address)) { return "[" + address + "]"; } return address; } -static inline std::pair parseHostNameWithPort(const std::string &server_name) { +static inline std::pair parseHostNameWithPort( + const std::string &server_name) { uint16_t port = getDefaultHandshakePort(); if (server_name.starts_with("[")) { // [ipv6] or [ipv6]:port const size_t closing_bracket_pos = server_name.find(']'); const size_t colon_pos = server_name.find(':', closing_bracket_pos); - std::string potentialHost = server_name.substr(1, closing_bracket_pos - 1); + std::string potentialHost = + server_name.substr(1, closing_bracket_pos - 1); if (isValidIpV6(potentialHost)) { - return {std::move(potentialHost), getPortFromString(server_name.substr(colon_pos + 1), port)}; + return {std::move(potentialHost), + getPortFromString(server_name.substr(colon_pos + 1), port)}; } // Not valid ipv6, fallback to ipv4/host/etc mode } else if (isValidIpV6(server_name)) { @@ -177,10 +183,13 @@ static inline std::pair parseHostNameWithPort(const std:: if (colon_pos == server_name.npos) { return {server_name, port}; } - return {server_name.substr(0, colon_pos), getPortFromString(server_name.substr(colon_pos + 1), port)}; + return {server_name.substr(0, colon_pos), + getPortFromString(server_name.substr(colon_pos + 1), port)}; } -static inline uint16_t parsePortAndDevice(std::string_view suffix, uint16_t default_port, int *device_id) { +static inline uint16_t parsePortAndDevice(std::string_view suffix, + uint16_t default_port, + int *device_id) { auto colon_pos = suffix.find(':'); if (colon_pos == suffix.npos) { return getPortFromString(suffix, default_port); @@ -189,8 +198,10 @@ static inline uint16_t parsePortAndDevice(std::string_view suffix, uint16_t defa auto npu_str = suffix.substr(colon_pos + 1); auto npu_ops = npu_str.find('_'); - if (npu_ops != npu_str.npos && npu_ops != 0 && npu_ops != npu_str.size() - 1) { - *device_id = parseFromString(npu_str.substr(npu_ops + 1)).value_or(0); + if (npu_ops != npu_str.npos && npu_ops != 0 && + npu_ops != npu_str.size() - 1) { + *device_id = + parseFromString(npu_str.substr(npu_ops + 1)).value_or(0); } return getPortFromString(port_str, default_port); } @@ -203,12 +214,12 @@ static inline std::pair parseHostNameWithPortAscend( // [ipv6] or [ipv6]:port const size_t closing_bracket_pos = server_name.find(']'); const size_t colon_pos = server_name.find(':', closing_bracket_pos); - std::string potentialHost = server_name.substr(1, closing_bracket_pos - 1); + std::string potentialHost = + server_name.substr(1, closing_bracket_pos - 1); if (isValidIpV6(potentialHost)) { - return { - std::move(potentialHost), - parsePortAndDevice(server_name.substr(colon_pos + 1), port, device_id) - }; + return {std::move(potentialHost), + parsePortAndDevice(server_name.substr(colon_pos + 1), port, + device_id)}; } // Not valid ipv6, fallback to ipv4/host/etc mode } else if (isValidIpV6(server_name)) { @@ -222,8 +233,7 @@ static inline std::pair parseHostNameWithPortAscend( return { server_name.substr(0, colon_pos), - parsePortAndDevice(server_name.substr(colon_pos + 1), port, device_id) - }; + parsePortAndDevice(server_name.substr(colon_pos + 1), port, device_id)}; } static inline ssize_t writeFully(int fd, const void *buf, size_t len) { @@ -505,10 +515,9 @@ class SimpleRandom { return g_random; } - // 生成下一个伪随机数 uint32_t next() { current = (a * current + c) % m; - return current; + return current >> 12; // Shift right to add randomness of the LSBs } uint32_t next(uint32_t max) { return next() % max; } diff --git a/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp b/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp index 846422b71..fbe9f287c 100644 --- a/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp +++ b/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp @@ -592,12 +592,14 @@ struct SocketHandShakePlugin : public HandShakePlugin { return ERR_SOCKET; } - if (setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &on, - sizeof(on))) { - PLOG(ERROR) - << "SocketHandShakePlugin: setsockopt(SO_REUSEADDR)"; - closeListen(); - return ERR_SOCKET; + if (getenv("MC_ENABLE_REUSEADDR")) { + if (setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &on, + sizeof(on))) { + PLOG(ERROR) + << "SocketHandShakePlugin: setsockopt(SO_REUSEADDR)"; + closeListen(); + return ERR_SOCKET; + } } if (globalConfig().use_ipv6) { @@ -688,9 +690,11 @@ struct SocketHandShakePlugin : public HandShakePlugin { // old protocol equals Connection type if (type == HandShakeRequestType::Connection || type == HandShakeRequestType::OldProtocol) { - if (on_connection_callback_) on_connection_callback_(peer, local); + if (on_connection_callback_) + on_connection_callback_(peer, local); } else if (type == HandShakeRequestType::Metadata) { - if (on_metadata_callback_) on_metadata_callback_(peer, local); + if (on_metadata_callback_) + on_metadata_callback_(peer, local); } else if (type == HandShakeRequestType::Notify) { if (on_notify_callback_) on_notify_callback_(peer, local); } else { @@ -1086,11 +1090,13 @@ uint16_t findAvailableTcpPort(int &sockfd) { continue; } - int on = 1; - if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) { - close(sockfd); - sockfd = -1; - continue; + if (getenv("MC_ENABLE_REUSEADDR")) { + int on = 1; + if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) { + close(sockfd); + sockfd = -1; + continue; + } } if (use_ipv6) { diff --git a/mooncake-transfer-engine/src/transport/ascend_transport/hccl_transport/ascend_transport_c/hccl_transport_mem_c.cpp b/mooncake-transfer-engine/src/transport/ascend_transport/hccl_transport/ascend_transport_c/hccl_transport_mem_c.cpp index 0548fb4be..eb3ab233b 100644 --- a/mooncake-transfer-engine/src/transport/ascend_transport/hccl_transport/ascend_transport_c/hccl_transport_mem_c.cpp +++ b/mooncake-transfer-engine/src/transport/ascend_transport/hccl_transport/ascend_transport_c/hccl_transport_mem_c.cpp @@ -63,7 +63,7 @@ uint16_t findAvailableTcpPort(int &sockfd, bool use_ipv6) { static std::random_device rand_gen; std::mt19937 gen(rand_gen()); const int min_port = 15000; - const int max_port = 17000; + const int max_port = 25000; const int max_attempts = 500; std::uniform_int_distribution<> rand_dist(min_port, max_port);