Skip to content

Commit b66c445

Browse files
committed
[TransferEngine]: Support registering files as shared buffers
Signed-off-by: Jinlong Chen <[email protected]>
1 parent 69ccf80 commit b66c445

File tree

7 files changed

+222
-0
lines changed

7 files changed

+222
-0
lines changed

mooncake-transfer-engine/include/transfer_engine.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,14 @@ using SegmentHandle = Transport::SegmentHandle;
4646
using SegmentID = Transport::SegmentID;
4747
using BatchID = Transport::BatchID;
4848
using BufferEntry = Transport::BufferEntry;
49+
using FileBufferID = TransferMetadata::FileBufferID;
4950

5051
class TransferEngine {
5152
public:
5253
TransferEngine(bool auto_discover = false)
5354
: metadata_(nullptr),
5455
local_topology_(std::make_shared<Topology>()),
56+
next_file_id_(1),
5557
auto_discover_(auto_discover) {
5658
#ifdef WITH_METRICS
5759
InitializeMetricsConfig();
@@ -62,6 +64,7 @@ class TransferEngine {
6264
TransferEngine(bool auto_discover, const std::vector<std::string> &filter)
6365
: metadata_(nullptr),
6466
local_topology_(std::make_shared<Topology>()),
67+
next_file_id_(1),
6568
auto_discover_(auto_discover),
6669
filter_(filter) {
6770
#ifdef WITH_METRICS
@@ -111,6 +114,25 @@ class TransferEngine {
111114

112115
int unregisterLocalMemoryBatch(const std::vector<void *> &addr_list);
113116

117+
bool supportFileBuffer();
118+
119+
/**
120+
* @brief Register a local file as a shared buffer.
121+
* @param[in] path Local path of the file.
122+
* @param[in] size Available size of the file.
123+
* @param[out] id The id of the registered file buffer.
124+
* @return 0 on success, or error number on failure.
125+
*/
126+
int registerLocalFile(const std::string &path, size_t size,
127+
FileBufferID &id);
128+
129+
/**
130+
* @brief Unregister a previously registered file.
131+
* @param[in] path The path of the registered file buffer.
132+
* @return 0 on success, or error number on failure.
133+
*/
134+
int unregisterLocalFile(const std::string &path);
135+
114136
BatchID allocateBatchID(size_t batch_size) {
115137
return multi_transports_->allocateBatchID(batch_size);
116138
}
@@ -224,13 +246,22 @@ class TransferEngine {
224246
bool remote_accessible;
225247
};
226248

249+
struct LocalFile {
250+
FileBufferID id;
251+
std::string path;
252+
std::size_t size;
253+
};
254+
227255
std::shared_ptr<TransferMetadata> metadata_;
228256
std::string local_server_name_;
229257
std::shared_ptr<MultiTransport> multi_transports_;
230258
std::shared_mutex mutex_;
231259
std::vector<MemoryRegion> local_memory_regions_;
232260
std::shared_ptr<Topology> local_topology_;
233261

262+
std::atomic<FileBufferID> next_file_id_;
263+
std::unordered_map<std::string, LocalFile> local_files_;
264+
234265
RWSpinlock send_notifies_lock_;
235266
std::unordered_map<BatchID,
236267
std::pair<SegmentID, TransferMetadata::NotifyDesc>>

mooncake-transfer-engine/include/transfer_engine_c.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ extern "C" {
2424

2525
#define segment_handle_t int32_t
2626
#define segment_id_t int32_t
27+
#define file_id_t uint32_t
2728
#define batch_id_t uint64_t
2829
#define LOCAL_SEGMENT (0)
2930
#define INVALID_BATCH UINT64_MAX
@@ -35,6 +36,7 @@ struct transfer_request {
3536
int opcode;
3637
void *source;
3738
segment_id_t target_id;
39+
file_id_t file_id;
3840
uint64_t target_offset;
3941
uint64_t length;
4042
};
@@ -135,6 +137,13 @@ int registerLocalMemoryBatch(transfer_engine_t engine,
135137
int unregisterLocalMemoryBatch(transfer_engine_t engine, void **addr_list,
136138
size_t addr_len);
137139

140+
bool supportFileBuffer(transfer_engine_t engine);
141+
142+
int registerLocalFile(transfer_engine_t engine, const char *path, size_t size,
143+
file_id_t *id);
144+
145+
int unregisterLocalFile(transfer_engine_t engine, const char *path);
146+
138147
batch_id_t allocateBatchID(transfer_engine_t engine, size_t batch_size);
139148

140149
int submitTransfer(transfer_engine_t engine, batch_id_t batch_id,

mooncake-transfer-engine/include/transfer_metadata.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,16 @@ class TransferMetadata {
6464
std::unordered_map<std::string, std::string> local_path_map;
6565
};
6666

67+
// Identify a single file in a segment's file buffers.
68+
using FileBufferID = uint32_t;
69+
70+
struct FileBufferDesc {
71+
FileBufferID id;
72+
std::string path;
73+
std::size_t size;
74+
std::size_t align; // For future usage.
75+
};
76+
6777
struct RankInfoDesc {
6878
uint64_t rankId = 0xFFFFFFFF; // rank id, user rank
6979
std::string hostIp;
@@ -87,6 +97,8 @@ class TransferMetadata {
8797
std::vector<BufferDesc> buffers;
8898
// this is for nvmeof.
8999
std::vector<NVMeoFBufferDesc> nvmeof_buffers;
100+
// Generic file buffers.
101+
std::vector<FileBufferDesc> file_buffers;
90102
// this is for cxl.
91103
std::string cxl_name;
92104
uint64_t cxl_base_addr;
@@ -148,6 +160,10 @@ class TransferMetadata {
148160

149161
int removeLocalMemoryBuffer(void *addr, bool update_metadata);
150162

163+
int addFileBuffer(const FileBufferDesc &buffer_desc, bool update_metadata);
164+
165+
int removeFileBuffer(FileBufferID id, bool update_metadata);
166+
151167
int addLocalSegment(SegmentID segment_id, const std::string &segment_name,
152168
std::shared_ptr<SegmentDesc> &&desc);
153169

mooncake-transfer-engine/include/transport/transport.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ class Transport {
4343
using SegmentID = uint64_t;
4444
using SegmentHandle = SegmentID;
4545

46+
using FileBufferID = TransferMetadata::FileBufferID;
47+
4648
using BatchID = uint64_t;
4749
const static BatchID INVALID_BATCH_ID = UINT64_MAX;
4850

@@ -60,6 +62,7 @@ class Transport {
6062
uint64_t target_offset;
6163
size_t length;
6264
int advise_retry_cnt = 0;
65+
FileBufferID file_id;
6366
};
6467

6568
enum TransferStatusEnum {
@@ -92,6 +95,7 @@ class Transport {
9295
SliceStatus status;
9396
TransferTask *task;
9497
bool from_cache;
98+
FileBufferID file_id;
9599

96100
union {
97101
struct {
@@ -284,6 +288,17 @@ class Transport {
284288
virtual int unregisterLocalMemoryBatch(
285289
const std::vector<void *> &addr_list) = 0;
286290

291+
virtual bool supportFileBuffer() { return false; };
292+
293+
virtual int registerLocalFile(FileBufferID id, const std::string &path,
294+
size_t size) {
295+
return ERR_NOT_IMPLEMENTED;
296+
}
297+
298+
virtual int unregisterLocalFile(FileBufferID id) {
299+
return ERR_NOT_IMPLEMENTED;
300+
}
301+
287302
virtual const char *getName() const = 0;
288303
};
289304
} // namespace mooncake

mooncake-transfer-engine/src/transfer_engine.cpp

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,15 @@ Transport *TransferEngine::installTransport(const std::string &proto,
287287
entry.addr, entry.length, entry.location, entry.remote_accessible);
288288
if (ret < 0) return nullptr;
289289
}
290+
291+
if (transport->supportFileBuffer()) {
292+
for (auto &file : local_files_) {
293+
int ret = transport->registerLocalFile(
294+
file.second.id, file.second.path, file.second.size);
295+
if (ret < 0) return nullptr;
296+
}
297+
}
298+
290299
return transport;
291300
}
292301

@@ -436,6 +445,77 @@ int TransferEngine::unregisterLocalMemoryBatch(
436445
return 0;
437446
}
438447

448+
bool TransferEngine::supportFileBuffer() {
449+
bool supported = false;
450+
for (auto &transport : multi_transports_->listTransports()) {
451+
supported = supported || transport->supportFileBuffer();
452+
}
453+
return supported;
454+
}
455+
456+
int TransferEngine::registerLocalFile(const std::string &path, size_t size,
457+
FileBufferID &id) {
458+
if (!supportFileBuffer()) {
459+
LOG(ERROR) << "File buffers not suppotred";
460+
return ERR_NOT_IMPLEMENTED;
461+
}
462+
463+
std::unique_lock<std::shared_mutex> lock(mutex_);
464+
if (local_files_.count(path) > 0) {
465+
LOG(ERROR) << "Registering an already registered file: " << path;
466+
return ERR_ADDRESS_OVERLAPPED;
467+
}
468+
469+
const auto id_ = next_file_id_.fetch_add(1);
470+
471+
for (auto &transport : multi_transports_->listTransports()) {
472+
if (!transport->supportFileBuffer()) {
473+
continue;
474+
}
475+
476+
int ret = transport->registerLocalFile(id_, path, size);
477+
if (ret != 0) {
478+
LOG(ERROR) << "Failed to register file " << path << " to transport "
479+
<< transport->getName() << ", ret=" << ret;
480+
return ret;
481+
}
482+
}
483+
484+
local_files_[path] = {id_, path, size};
485+
id = id_;
486+
return 0;
487+
}
488+
489+
int TransferEngine::unregisterLocalFile(const std::string &path) {
490+
if (!supportFileBuffer()) {
491+
LOG(ERROR) << "File buffers not suppotred";
492+
return ERR_NOT_IMPLEMENTED;
493+
}
494+
495+
std::unique_lock<std::shared_mutex> lock(mutex_);
496+
auto it = local_files_.find(path);
497+
if (it == local_files_.end()) {
498+
return ERR_ADDRESS_NOT_REGISTERED;
499+
}
500+
501+
for (auto &transport : multi_transports_->listTransports()) {
502+
if (!transport->supportFileBuffer()) {
503+
continue;
504+
}
505+
506+
int ret = transport->unregisterLocalFile(it->second.id);
507+
if (ret != 0 && ret != ERR_ADDRESS_NOT_REGISTERED) {
508+
LOG(ERROR) << "Failed to unregister file " << path
509+
<< " from transport " << transport->getName()
510+
<< ", ret=" << ret;
511+
return ret;
512+
}
513+
}
514+
515+
local_files_.erase(it);
516+
return 0;
517+
}
518+
439519
#ifdef WITH_METRICS
440520
// Helper function to convert string to lowercase for case-insensitive
441521
// comparison

mooncake-transfer-engine/src/transfer_engine_c.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,22 @@ int unregisterLocalMemoryBatch(transfer_engine_t engine, void **addr_list,
117117
return native->unregisterLocalMemoryBatch(native_addr_list);
118118
}
119119

120+
bool supportFileBuffer(transfer_engine_t engine) {
121+
TransferEngine *native = (TransferEngine *)engine;
122+
return native->supportFileBuffer();
123+
}
124+
125+
int registerLocalFile(transfer_engine_t engine, const char *path, size_t size,
126+
file_id_t *id) {
127+
TransferEngine *native = (TransferEngine *)engine;
128+
return native->registerLocalFile(path, size, *id);
129+
}
130+
131+
int unregisterLocalFile(transfer_engine_t engine, const char *path) {
132+
TransferEngine *native = (TransferEngine *)engine;
133+
return native->unregisterLocalFile(path);
134+
}
135+
120136
batch_id_t allocateBatchID(transfer_engine_t engine, size_t batch_size) {
121137
TransferEngine *native = (TransferEngine *)engine;
122138
return (batch_id_t)native->allocateBatchID(batch_size);
@@ -132,6 +148,7 @@ int submitTransfer(transfer_engine_t engine, batch_id_t batch_id,
132148
(Transport::TransferRequest::OpCode)entries[index].opcode;
133149
native_entries[index].source = entries[index].source;
134150
native_entries[index].target_id = entries[index].target_id;
151+
native_entries[index].file_id = entries[index].file_id;
135152
native_entries[index].target_offset = entries[index].target_offset;
136153
native_entries[index].length = entries[index].length;
137154
}

mooncake-transfer-engine/src/transfer_metadata.cpp

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,18 @@ int TransferMetadata::encodeSegmentDesc(const SegmentDesc &desc,
231231
<< desc.name << " protocol " << desc.protocol;
232232
return ERR_METADATA;
233233
}
234+
235+
Json::Value fileBuffersJson(Json::arrayValue);
236+
for (const auto &fileBuffer : desc.file_buffers) {
237+
Json::Value bufferJSON;
238+
bufferJSON["id"] = fileBuffer.id;
239+
bufferJSON["path"] = fileBuffer.path;
240+
bufferJSON["size"] = fileBuffer.size;
241+
bufferJSON["align"] = fileBuffer.align;
242+
fileBuffersJson.append(bufferJSON);
243+
}
244+
segmentJSON["file_buffers"] = fileBuffersJson;
245+
234246
return 0;
235247
}
236248

@@ -415,6 +427,16 @@ TransferMetadata::decodeSegmentDesc(Json::Value &segmentJSON,
415427
<< " protocol " << desc->protocol;
416428
return nullptr;
417429
}
430+
431+
for (const auto &bufferJSON : segmentJSON["file_buffers"]) {
432+
FileBufferDesc buffer;
433+
buffer.id = bufferJSON["id"].asUInt();
434+
buffer.path = bufferJSON["path"].asString();
435+
buffer.size = bufferJSON["size"].asUInt64();
436+
buffer.align = bufferJSON["align"].asUInt64();
437+
desc->file_buffers.push_back(buffer);
438+
}
439+
418440
return desc;
419441
}
420442

@@ -605,6 +627,38 @@ int TransferMetadata::removeLocalMemoryBuffer(void *addr,
605627
return ERR_ADDRESS_NOT_REGISTERED;
606628
}
607629

630+
int TransferMetadata::addFileBuffer(const FileBufferDesc &buffer_desc,
631+
bool update_metadata) {
632+
{
633+
RWSpinlock::WriteGuard guard(segment_lock_);
634+
auto &segment_desc = segment_id_to_desc_map_[LOCAL_SEGMENT_ID];
635+
segment_desc->file_buffers.push_back(buffer_desc);
636+
}
637+
if (update_metadata) return updateLocalSegmentDesc();
638+
return 0;
639+
}
640+
641+
int TransferMetadata::removeFileBuffer(FileBufferID id, bool update_metadata) {
642+
bool buffer_exist = false;
643+
{
644+
RWSpinlock::WriteGuard guard(segment_lock_);
645+
auto &segment_desc = segment_id_to_desc_map_[LOCAL_SEGMENT_ID];
646+
for (auto iter = segment_desc->file_buffers.begin();
647+
iter != segment_desc->file_buffers.end(); ++iter) {
648+
if (iter->id == id) {
649+
segment_desc->file_buffers.erase(iter);
650+
buffer_exist = true;
651+
break;
652+
}
653+
}
654+
}
655+
if (buffer_exist) {
656+
if (update_metadata) return updateLocalSegmentDesc();
657+
return 0;
658+
}
659+
return ERR_ADDRESS_NOT_REGISTERED;
660+
}
661+
608662
int TransferMetadata::addRpcMetaEntry(const std::string &server_name,
609663
RpcMetaDesc &desc) {
610664
local_rpc_meta_ = desc;

0 commit comments

Comments
 (0)