Skip to content
Open
3 changes: 2 additions & 1 deletion doc/en/transfer-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -440,4 +440,5 @@ For advanced users, TransferEngine provides the following advanced runtime optio
- `MC_REDIS_PASSWORD` The password for Redis storage plugin, only takes effect when Redis is specified as the metadata server. If not set, no authentication will be attempted to log in to the Redis.
- `MC_REDIS_DB_INDEX` The database index for Redis storage plugin, must be an integer between 0 and 255. Only takes effect when Redis is specified as the metadata server. If not set or invalid, the default value is 0.
- `MC_FRAGMENT_RATIO ` In RdmaTransport::submitTransferTask, if the last data piece after division is ≤ 1/MC_FRAGMENT_RATIO of the block size, it merges with the previous block to reduce overhead. The default value is 4
- `MC_ENABLE_DEST_DEVICE_AFFINITY` Enable device affinity for RDMA performance optimization. When enabled, Transfer Engine will prioritize communication with remote NICs that have the same name as local NICs to reduce QP count and improve network performance in rail-optimized topologies. The default value is false
- `MC_ENABLE_DEST_DEVICE_AFFINITY` Enable device affinity for RDMA performance optimization. When enabled, Transfer Engine will prioritize communication with remote NICs that have the same name as local NICs to reduce QP count and improve network performance in rail-optimized topologies. The default value is false
- `MC_ENABLE_REUSEADDR` Force the TCP daemon to enable the REUSEADDR option. This avoids rare cases where two processes use the same port. However, it also prevents reusing the port immediately after shutting down previous connections.
57 changes: 33 additions & 24 deletions mooncake-transfer-engine/include/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
#ifndef COMMON_H
#define COMMON_H

#include <glog/logging.h>
#include <numa.h>
#include <arpa/inet.h>
#include <glog/logging.h>
#include <netinet/in.h>
#include <numa.h>
#include <sys/mman.h>
#include <sys/time.h>
#include <unistd.h>
Expand Down Expand Up @@ -123,48 +123,54 @@ static inline std::string getCurrentDateTime() {

uint16_t getDefaultHandshakePort();

template<typename T>
template <typename T>
std::optional<T> parseFromString(std::string_view str) {
T result = T();
auto [ptr, ec] = std::from_chars(str.data(), str.data() + str.size(), result);
auto [ptr, ec] =
std::from_chars(str.data(), str.data() + str.size(), result);
if (ec != std::errc() || ptr != str.data() + str.size()) {
return {};
}
return {std::move(result)};
}

static inline uint16_t getPortFromString(std::string_view port_string, uint16_t default_port) {
static inline uint16_t getPortFromString(std::string_view port_string,
uint16_t default_port) {
std::optional<uint16_t> port = parseFromString<uint16_t>(port_string);
if (port.has_value()) {
return *port;
}
LOG(WARNING) << "Illegal port number in " << port_string << ". Use default port " << default_port << " instead";
LOG(WARNING) << "Illegal port number in " << port_string
<< ". Use default port " << default_port << " instead";
return default_port;
}

static inline bool isValidIpV6(const std::string& address) {
static inline bool isValidIpV6(const std::string &address) {
sockaddr_in6 addr;
std::memset(&addr, 0, sizeof(addr));
return inet_pton(AF_INET6, address.c_str(), &addr.sin6_addr) == 1;
}

static inline std::string maybeWrapIpV6(const std::string& address) {
static inline std::string maybeWrapIpV6(const std::string &address) {
if (isValidIpV6(address)) {
return "[" + address + "]";
}
return address;
}

static inline std::pair<std::string, uint16_t> parseHostNameWithPort(const std::string &server_name) {
static inline std::pair<std::string, uint16_t> parseHostNameWithPort(
const std::string &server_name) {
uint16_t port = getDefaultHandshakePort();

if (server_name.starts_with("[")) {
// [ipv6] or [ipv6]:port
const size_t closing_bracket_pos = server_name.find(']');
const size_t colon_pos = server_name.find(':', closing_bracket_pos);
std::string potentialHost = server_name.substr(1, closing_bracket_pos - 1);
std::string potentialHost =
server_name.substr(1, closing_bracket_pos - 1);
if (isValidIpV6(potentialHost)) {
return {std::move(potentialHost), getPortFromString(server_name.substr(colon_pos + 1), port)};
return {std::move(potentialHost),
getPortFromString(server_name.substr(colon_pos + 1), port)};
}
// Not valid ipv6, fallback to ipv4/host/etc mode
} else if (isValidIpV6(server_name)) {
Expand All @@ -177,10 +183,13 @@ static inline std::pair<std::string, uint16_t> parseHostNameWithPort(const std::
if (colon_pos == server_name.npos) {
return {server_name, port};
}
return {server_name.substr(0, colon_pos), getPortFromString(server_name.substr(colon_pos + 1), port)};
return {server_name.substr(0, colon_pos),
getPortFromString(server_name.substr(colon_pos + 1), port)};
}

static inline uint16_t parsePortAndDevice(std::string_view suffix, uint16_t default_port, int *device_id) {
static inline uint16_t parsePortAndDevice(std::string_view suffix,
uint16_t default_port,
int *device_id) {
auto colon_pos = suffix.find(':');
if (colon_pos == suffix.npos) {
return getPortFromString(suffix, default_port);
Expand All @@ -189,8 +198,10 @@ static inline uint16_t parsePortAndDevice(std::string_view suffix, uint16_t defa
auto npu_str = suffix.substr(colon_pos + 1);

auto npu_ops = npu_str.find('_');
if (npu_ops != npu_str.npos && npu_ops != 0 && npu_ops != npu_str.size() - 1) {
*device_id = parseFromString<int>(npu_str.substr(npu_ops + 1)).value_or(0);
if (npu_ops != npu_str.npos && npu_ops != 0 &&
npu_ops != npu_str.size() - 1) {
*device_id =
parseFromString<int>(npu_str.substr(npu_ops + 1)).value_or(0);
}
return getPortFromString(port_str, default_port);
}
Expand All @@ -203,12 +214,12 @@ static inline std::pair<std::string, uint16_t> parseHostNameWithPortAscend(
// [ipv6] or [ipv6]:port
const size_t closing_bracket_pos = server_name.find(']');
const size_t colon_pos = server_name.find(':', closing_bracket_pos);
std::string potentialHost = server_name.substr(1, closing_bracket_pos - 1);
std::string potentialHost =
server_name.substr(1, closing_bracket_pos - 1);
if (isValidIpV6(potentialHost)) {
return {
std::move(potentialHost),
parsePortAndDevice(server_name.substr(colon_pos + 1), port, device_id)
};
return {std::move(potentialHost),
parsePortAndDevice(server_name.substr(colon_pos + 1), port,
device_id)};
}
// Not valid ipv6, fallback to ipv4/host/etc mode
} else if (isValidIpV6(server_name)) {
Expand All @@ -222,8 +233,7 @@ static inline std::pair<std::string, uint16_t> parseHostNameWithPortAscend(

return {
server_name.substr(0, colon_pos),
parsePortAndDevice(server_name.substr(colon_pos + 1), port, device_id)
};
parsePortAndDevice(server_name.substr(colon_pos + 1), port, device_id)};
}

static inline ssize_t writeFully(int fd, const void *buf, size_t len) {
Expand Down Expand Up @@ -505,10 +515,9 @@ class SimpleRandom {
return g_random;
}

// 生成下一个伪随机数
uint32_t next() {
current = (a * current + c) % m;
return current;
return current >> 12; // Shift right to add randomness of the LSBs
}

uint32_t next(uint32_t max) { return next() % max; }
Expand Down
32 changes: 19 additions & 13 deletions mooncake-transfer-engine/src/transfer_metadata_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -592,12 +592,14 @@ struct SocketHandShakePlugin : public HandShakePlugin {
return ERR_SOCKET;
}

if (setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &on,
sizeof(on))) {
PLOG(ERROR)
<< "SocketHandShakePlugin: setsockopt(SO_REUSEADDR)";
closeListen();
return ERR_SOCKET;
if (getenv("MC_ENABLE_REUSEADDR")) {
if (setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &on,
sizeof(on))) {
PLOG(ERROR)
<< "SocketHandShakePlugin: setsockopt(SO_REUSEADDR)";
closeListen();
return ERR_SOCKET;
}
}

if (globalConfig().use_ipv6) {
Expand Down Expand Up @@ -688,9 +690,11 @@ struct SocketHandShakePlugin : public HandShakePlugin {
// old protocol equals Connection type
if (type == HandShakeRequestType::Connection ||
type == HandShakeRequestType::OldProtocol) {
if (on_connection_callback_) on_connection_callback_(peer, local);
if (on_connection_callback_)
on_connection_callback_(peer, local);
} else if (type == HandShakeRequestType::Metadata) {
if (on_metadata_callback_) on_metadata_callback_(peer, local);
if (on_metadata_callback_)
on_metadata_callback_(peer, local);
} else if (type == HandShakeRequestType::Notify) {
if (on_notify_callback_) on_notify_callback_(peer, local);
} else {
Expand Down Expand Up @@ -1086,11 +1090,13 @@ uint16_t findAvailableTcpPort(int &sockfd) {
continue;
}

int on = 1;
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) {
close(sockfd);
sockfd = -1;
continue;
if (getenv("MC_ENABLE_REUSEADDR")) {
int on = 1;
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) {
close(sockfd);
sockfd = -1;
continue;
}
}
Comment on lines +1093 to 1100
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'd better remove these lines? multiple TE reuse the same port is not expected, since only one TE could accept the connection from client.


if (use_ipv6) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ uint16_t findAvailableTcpPort(int &sockfd, bool use_ipv6) {
static std::random_device rand_gen;
std::mt19937 gen(rand_gen());
const int min_port = 15000;
const int max_port = 17000;
const int max_port = 25000;
const int max_attempts = 500;
std::uniform_int_distribution<> rand_dist(min_port, max_port);

Expand Down
Loading