Skip to content

Commit 69ccf80

Browse files
committed
[TransferEngine]: Support specifying transport specific args when installing
Signed-off-by: Jinlong Chen <[email protected]>
1 parent dcdf1c7 commit 69ccf80

File tree

5 files changed

+53
-7
lines changed

5 files changed

+53
-7
lines changed

mooncake-transfer-engine/include/multi_transport.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ class MultiTransport {
4949
Transport *installTransport(const std::string &proto,
5050
std::shared_ptr<Topology> topo);
5151

52+
bool transportNeedArgs(const std::string &proto);
53+
54+
Transport *installTransport(const std::string &proto, void **args);
55+
5256
Transport *getTransport(const std::string &proto);
5357

5458
std::vector<Transport *> listTransports();

mooncake-transfer-engine/include/transport/transport.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,9 @@ class Transport {
257257
std::shared_ptr<TransferMetadata> meta,
258258
std::shared_ptr<Topology> topo);
259259

260+
virtual int install(std::string &local_server_name,
261+
std::shared_ptr<TransferMetadata> meta, void **args);
262+
260263
std::string local_server_name_;
261264
std::shared_ptr<TransferMetadata> metadata_;
262265

mooncake-transfer-engine/src/multi_transport.cpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,32 @@ Transport *MultiTransport::installTransport(const std::string &proto,
240240
return transport;
241241
}
242242

243+
bool MultiTransport::transportNeedArgs(const std::string &proto) {
244+
return false;
245+
}
246+
247+
Transport *MultiTransport::installTransport(const std::string &proto,
248+
void **args) {
249+
std::shared_ptr<Transport> transport = nullptr;
250+
251+
// Add transport creation logic here.
252+
253+
if (!transport) {
254+
LOG(ERROR) << "Unsupported transport " << proto
255+
<< ", please rebuild Mooncake";
256+
return nullptr;
257+
}
258+
259+
int rc = transport->install(local_server_name_, metadata_, args);
260+
if (rc != 0) {
261+
LOG(ERROR) << "Failed to install transport " << proto << ", rc=" << rc;
262+
return nullptr;
263+
}
264+
265+
transport_map_[proto] = transport;
266+
return transport.get();
267+
}
268+
243269
Status MultiTransport::selectTransport(const TransferRequest &entry,
244270
Transport *&transport) {
245271
auto target_segment_desc = metadata_->getSegmentDescByID(entry.target_id);

mooncake-transfer-engine/src/transfer_engine.cpp

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -259,16 +259,22 @@ Transport *TransferEngine::installTransport(const std::string &proto,
259259
return transport;
260260
}
261261

262-
if (args != nullptr && args[0] != nullptr) {
263-
const std::string nic_priority_matrix = static_cast<char *>(args[0]);
264-
int ret = local_topology_->parse(nic_priority_matrix);
265-
if (ret) {
266-
LOG(ERROR) << "Failed to parse NIC priority matrix";
267-
return nullptr;
262+
if (multi_transports_->transportNeedArgs(proto)) {
263+
transport = multi_transports_->installTransport(proto, args);
264+
} else {
265+
if (args != nullptr && args[0] != nullptr) {
266+
const std::string nic_priority_matrix =
267+
static_cast<char *>(args[0]);
268+
int ret = local_topology_->parse(nic_priority_matrix);
269+
if (ret) {
270+
LOG(ERROR) << "Failed to parse NIC priority matrix";
271+
return nullptr;
272+
}
268273
}
274+
275+
transport = multi_transports_->installTransport(proto, local_topology_);
269276
}
270277

271-
transport = multi_transports_->installTransport(proto, local_topology_);
272278
if (!transport) return nullptr;
273279

274280
// Since installTransport() is only called once during initialization

mooncake-transfer-engine/src/transport/transport.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,11 @@ int Transport::install(std::string &local_server_name,
6464
metadata_ = meta;
6565
return 0;
6666
}
67+
68+
int Transport::install(std::string &local_server_name,
69+
std::shared_ptr<TransferMetadata> meta, void **args) {
70+
local_server_name_ = local_server_name;
71+
metadata_ = meta;
72+
return 0;
73+
}
6774
} // namespace mooncake

0 commit comments

Comments
 (0)