Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ different eviction policy may need different data structure
class EndpointStore {
public:
virtual std::shared_ptr<RdmaEndPoint> getEndpoint(
const std::string &peer_nic_path) = 0;
const std::string& peer_nic_path) = 0;
virtual std::shared_ptr<RdmaEndPoint> insertEndpoint(
const std::string &peer_nic_path, RdmaContext *context) = 0;
virtual int deleteEndpoint(const std::string &peer_nic_path) = 0;
const std::string& peer_nic_path, RdmaContext* context) = 0;
virtual int deleteEndpoint(const std::string& peer_nic_path) = 0;
virtual void evictEndpoint() = 0;
virtual void reclaimEndpoint() = 0;
virtual void reclaimEndpointUnlocked() = 0;
virtual size_t getSize() = 0;

virtual int destroyQPs() = 0;
Expand All @@ -57,12 +58,13 @@ class FIFOEndpointStore : public EndpointStore {
public:
FIFOEndpointStore(size_t max_size) : max_size_(max_size) {}
std::shared_ptr<RdmaEndPoint> getEndpoint(
const std::string &peer_nic_path) override;
const std::string& peer_nic_path) override;
std::shared_ptr<RdmaEndPoint> insertEndpoint(
const std::string &peer_nic_path, RdmaContext *context) override;
int deleteEndpoint(const std::string &peer_nic_path) override;
const std::string& peer_nic_path, RdmaContext* context) override;
int deleteEndpoint(const std::string& peer_nic_path) override;
void evictEndpoint() override;
void reclaimEndpoint() override;
void reclaimEndpointUnlocked() override;
size_t getSize() override;

int destroyQPs() override;
Expand All @@ -88,12 +90,13 @@ class SIEVEEndpointStore : public EndpointStore {
SIEVEEndpointStore(size_t max_size)
: waiting_list_len_(0), max_size_(max_size) {}
std::shared_ptr<RdmaEndPoint> getEndpoint(
const std::string &peer_nic_path) override;
const std::string& peer_nic_path) override;
std::shared_ptr<RdmaEndPoint> insertEndpoint(
const std::string &peer_nic_path, RdmaContext *context) override;
int deleteEndpoint(const std::string &peer_nic_path) override;
const std::string& peer_nic_path, RdmaContext* context) override;
int deleteEndpoint(const std::string& peer_nic_path) override;
void evictEndpoint() override;
void reclaimEndpoint() override;
void reclaimEndpointUnlocked() override;
size_t getSize() override;

int destroyQPs() override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,45 +42,49 @@ class RdmaEndPoint {
INITIALIZING,
UNCONNECTED,
CONNECTED,
CLOSING,
};

public:
RdmaEndPoint(RdmaContext &context);
RdmaEndPoint(RdmaContext& context);

~RdmaEndPoint();

int construct(ibv_cq *cq, size_t num_qp_list = 2, size_t max_sge = 4,
int construct(ibv_cq* cq, size_t num_qp_list = 2, size_t max_sge = 4,
size_t max_wr = 256, size_t max_inline = 64);

private:
int deconstruct();

public:
void setPeerNicPath(const std::string &peer_nic_path);
void setPeerNicPath(const std::string& peer_nic_path);

std::string getPeerNicPath() const { return peer_nic_path_; }

int setupConnectionsByActive();

int setupConnectionsByActive(const std::string &peer_nic_path) {
int setupConnectionsByActive(const std::string& peer_nic_path) {
setPeerNicPath(peer_nic_path);
return setupConnectionsByActive();
}

using HandShakeDesc = TransferMetadata::HandShakeDesc;
int setupConnectionsByPassive(const HandShakeDesc &peer_desc,
HandShakeDesc &local_desc);
int setupConnectionsByPassive(const HandShakeDesc& peer_desc,
HandShakeDesc& local_desc);

bool hasOutstandingSlice() const;

bool active() const { return active_; }
bool active() const { return status_ == CONNECTED; }

void set_active(bool flag) {
RWSpinlock::WriteGuard guard(lock_);
active_ = flag;
if (!flag) inactive_time_ = getCurrentTimeInNano();
if (flag) return;
auto prev_status = status_.exchange(CLOSING);
if (prev_status == CONNECTED) inactive_time_ = getCurrentTimeInNano();
}

double inactiveTime() {
if (active_) return 0.0;
if (active()) return 0.0;
return (getCurrentTimeInNano() - inactive_time_) / 1000000000.0;
}

Expand All @@ -107,37 +111,37 @@ class RdmaEndPoint {
// Submit some work requests to HW
// Submitted tasks (success/failed) are removed in slice_list
// Failed tasks (which must be submitted) are inserted in failed_slice_list
int submitPostSend(std::vector<Transport::Slice *> &slice_list,
std::vector<Transport::Slice *> &failed_slice_list);
int submitPostSend(std::vector<Transport::Slice*>& slice_list,
std::vector<Transport::Slice*>& failed_slice_list);

// Get the number of QPs in this endpoint
size_t getQPNumber() const;

private:
std::vector<uint32_t> qpNum() const;

int doSetupConnection(const std::string &peer_gid, uint16_t peer_lid,
int doSetupConnection(const std::string& peer_gid, uint16_t peer_lid,
std::vector<uint32_t> peer_qp_num_list,
std::string *reply_msg = nullptr);
std::string* reply_msg = nullptr);

int doSetupConnection(int qp_index, const std::string &peer_gid,
int doSetupConnection(int qp_index, const std::string& peer_gid,
uint16_t peer_lid, uint32_t peer_qp_num,
std::string *reply_msg = nullptr);
std::string* reply_msg = nullptr);

private:
RdmaContext &context_;
RdmaContext& context_;
std::atomic<Status> status_;

RWSpinlock lock_;
std::vector<ibv_qp *> qp_list_;
std::vector<ibv_qp*> qp_list_;

std::string peer_nic_path_;

volatile int *wr_depth_list_;
volatile int* wr_depth_list_;
int max_wr_depth_;

volatile bool active_;
volatile int *cq_outstanding_;
// volatile bool active_;
volatile int* cq_outstanding_;
volatile uint64_t inactive_time_;
};

Expand Down
2 changes: 1 addition & 1 deletion mooncake-transfer-engine/src/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void loadGlobalConfig(GlobalConfig &config) {
const char *max_ep_per_ctx_env = std::getenv("MC_MAX_EP_PER_CTX");
if (max_ep_per_ctx_env) {
size_t val = atoi(max_ep_per_ctx_env);
if (val > 0 && val <= UINT16_MAX)
if (val > 0 && val <= UINT32_MAX)
config.max_ep_per_ctx = val;
else
LOG(WARNING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ std::shared_ptr<RdmaEndPoint> FIFOEndpointStore::insertEndpoint(
config.max_wr, config.max_inline);
if (ret) return nullptr;

while (this->getSize() >= max_size_) evictEndpoint();
// make eviction more proactive
size_t high_water = (max_size_ * 95 + 99) / 100;
while (this->getSize() >= high_water) evictEndpoint();

endpoint->setPeerNicPath(peer_nic_path);
endpoint_map_[peer_nic_path] = endpoint;
Expand All @@ -70,47 +72,79 @@ int FIFOEndpointStore::deleteEndpoint(const std::string &peer_nic_path) {
// remove endpoint but leaving it status unchanged
// in case it is setting up connection or submitting slice
if (iter != endpoint_map_.end()) {
iter->second->set_active(false);
waiting_list_.insert(iter->second);
endpoint_map_.erase(iter);
auto fifo_iter = fifo_map_[peer_nic_path];
fifo_list_.erase(fifo_iter);
fifo_map_.erase(peer_nic_path);
auto fifo_it = fifo_map_.find(peer_nic_path);
if (fifo_it != fifo_map_.end()) {
fifo_list_.erase(fifo_it->second);
fifo_map_.erase(fifo_it);
}
}
reclaimEndpointUnlocked();
return 0;
}

void FIFOEndpointStore::evictEndpoint() {
if (fifo_list_.empty()) return;
std::string victim = fifo_list_.front();
fifo_list_.pop_front();
fifo_map_.erase(victim);
auto fifo_it = fifo_map_.find(victim);
if (fifo_it != fifo_map_.end()) {
fifo_map_.erase(fifo_it);
}
auto it = endpoint_map_.find(victim);
if (it == endpoint_map_.end()) {
LOG(WARNING) << "FIFOEndpointStore: victim " << victim
<< " not found in endpoint_map_ during eviction";
return;
}
LOG(INFO) << victim << " evicted";
waiting_list_.insert(endpoint_map_[victim]);
endpoint_map_.erase(victim);
return;
it->second->set_active(false);
waiting_list_.insert(it->second);
endpoint_map_.erase(it);
reclaimEndpointUnlocked();
}

void FIFOEndpointStore::reclaimEndpoint() {
RWSpinlock::WriteGuard guard(endpoint_map_lock_);
reclaimEndpointUnlocked();
}

void FIFOEndpointStore::reclaimEndpointUnlocked() {
std::vector<std::shared_ptr<RdmaEndPoint>> to_delete;
for (auto &endpoint : waiting_list_)
if (!endpoint->hasOutstandingSlice()) to_delete.push_back(endpoint);
for (auto &endpoint : to_delete) waiting_list_.erase(endpoint);
for (auto &endpoint : waiting_list_) {
if (!endpoint->hasOutstandingSlice()) {
endpoint->destroyQP();
to_delete.push_back(endpoint);
}
}
for (auto &endpoint : to_delete) {
waiting_list_.erase(endpoint);
}
}

size_t FIFOEndpointStore::getSize() { return endpoint_map_.size(); }

int FIFOEndpointStore::destroyQPs() {
RWSpinlock::WriteGuard guard(endpoint_map_lock_);
for (auto &kv : endpoint_map_) {
kv.second->destroyQP();
}
for (auto &ep : waiting_list_) {
ep->destroyQP();
}
return 0;
}

int FIFOEndpointStore::disconnectQPs() {
RWSpinlock::WriteGuard guard(endpoint_map_lock_);
for (auto &kv : endpoint_map_) {
kv.second->disconnect();
}
for (auto &ep : waiting_list_) {
ep->disconnect();
}
return 0;
}

Expand All @@ -120,6 +154,9 @@ size_t FIFOEndpointStore::getTotalQPNumber() {
for (const auto &kv : endpoint_map_) {
total_qps += kv.second->getQPNumber();
}
for (const auto &endpoint : waiting_list_) {
total_qps += endpoint->getQPNumber();
}
return total_qps;
}

Expand Down Expand Up @@ -157,7 +194,9 @@ std::shared_ptr<RdmaEndPoint> SIEVEEndpointStore::insertEndpoint(
config.max_wr, config.max_inline);
if (ret) return nullptr;

while (this->getSize() >= max_size_) evictEndpoint();
// make eviction more proactive
size_t high_water = (max_size_ * 95 + 99) / 100;
while (this->getSize() >= high_water) evictEndpoint();

endpoint->setPeerNicPath(peer_nic_path);
endpoint_map_[peer_nic_path] = std::make_pair(endpoint, true);
Expand All @@ -172,8 +211,10 @@ int SIEVEEndpointStore::deleteEndpoint(const std::string &peer_nic_path) {
// remove endpoint but leaving it status unchanged
// in case it is setting up connection or submitting slice
if (iter != endpoint_map_.end()) {
auto ep = iter->second.first;
ep->set_active(false);
waiting_list_len_++;
waiting_list_.insert(iter->second.first);
waiting_list_.insert(ep);
endpoint_map_.erase(iter);
auto fifo_iter = fifo_map_[peer_nic_path];
if (hand_.has_value() && hand_.value() == fifo_iter) {
Expand All @@ -183,6 +224,7 @@ int SIEVEEndpointStore::deleteEndpoint(const std::string &peer_nic_path) {
fifo_list_.erase(fifo_iter);
fifo_map_.erase(peer_nic_path);
}
reclaimEndpointUnlocked();
return 0;
}

Expand All @@ -203,36 +245,70 @@ void SIEVEEndpointStore::evictEndpoint() {
}
}
o == fifo_list_.begin() ? hand_ = std::nullopt : hand_ = std::prev(o);

fifo_list_.erase(o);
fifo_map_.erase(victim);
auto fifo_it = fifo_map_.find(victim);
if (fifo_it != fifo_map_.end()) {
fifo_map_.erase(fifo_it);
}

auto map_it = endpoint_map_.find(victim);
if (map_it == endpoint_map_.end()) {
LOG(WARNING) << "SIEVEEndpointStore: victim " << victim
<< " not found in endpoint_map_ during eviction";
return;
}

LOG(INFO) << victim << " evicted";
auto victim_instance = endpoint_map_[victim].first;
auto victim_instance = map_it->second.first;
victim_instance->set_active(false);
waiting_list_len_++;
waiting_list_.insert(victim_instance);
endpoint_map_.erase(victim);
return;
endpoint_map_.erase(map_it);
reclaimEndpointUnlocked();
}

void SIEVEEndpointStore::reclaimEndpoint() {
if (waiting_list_len_.load(std::memory_order_relaxed) == 0) return;
RWSpinlock::WriteGuard guard(endpoint_map_lock_);
reclaimEndpointUnlocked();
}

void SIEVEEndpointStore::reclaimEndpointUnlocked() {
if (waiting_list_len_.load(std::memory_order_relaxed) == 0) return;
std::vector<std::shared_ptr<RdmaEndPoint>> to_delete;
for (auto &endpoint : waiting_list_)
if (!endpoint->hasOutstandingSlice()) to_delete.push_back(endpoint);
for (auto &endpoint : to_delete) waiting_list_.erase(endpoint);
waiting_list_len_ -= to_delete.size();
for (auto &endpoint : waiting_list_) {
if (!endpoint->hasOutstandingSlice()) {
endpoint->destroyQP();
to_delete.push_back(endpoint);
}
}
for (auto &endpoint : to_delete) {
waiting_list_.erase(endpoint);
}
waiting_list_len_.fetch_sub(static_cast<int>(to_delete.size()),
std::memory_order_relaxed);
}

int SIEVEEndpointStore::destroyQPs() {
for (auto &endpoint : waiting_list_) endpoint->destroyQP();
for (auto &kv : endpoint_map_) kv.second.first->destroyQP();
RWSpinlock::WriteGuard guard(endpoint_map_lock_);
for (auto &endpoint : waiting_list_) {
endpoint->destroyQP();
}
for (auto &kv : endpoint_map_) {
kv.second.first->destroyQP();
}
return 0;
}

int SIEVEEndpointStore::disconnectQPs() {
for (auto &endpoint : waiting_list_) endpoint->disconnect();
for (auto &kv : endpoint_map_) kv.second.first->disconnect();
RWSpinlock::WriteGuard guard(endpoint_map_lock_);
for (auto &endpoint : waiting_list_) {
endpoint->disconnect();
}
for (auto &kv : endpoint_map_) {
kv.second.first->disconnect();
}
return 0;
}

Expand Down
Loading
Loading