Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
336 changes: 284 additions & 52 deletions mooncake-integration/store/store_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -604,15 +604,40 @@ std::shared_ptr<SliceBuffer> DistributedObjectStore::get_buffer(
}

PYBIND11_MODULE(store, m) {
// Define the SliceBuffer class
/*
* Mooncake Store Python Bindings
*/

// Define the SliceBuffer class for efficient memory management
py::class_<SliceBuffer, std::shared_ptr<SliceBuffer>>(m, "SliceBuffer",
py::buffer_protocol())
.def("ptr",
[](const SliceBuffer &self) {
// Return the pointer as an integer for Python
return reinterpret_cast<uintptr_t>(self.ptr());
})
.def("size", &SliceBuffer::size)
py::buffer_protocol(),
R"(
A buffer class that holds contiguous data retrieved from the distributed store.

This class provides RAII-style memory management and implements the Python
buffer protocol for efficient data access without copying.

Note: SliceBuffer automatically manages memory deallocation when destroyed.
)")
.def(
"ptr",
[](const SliceBuffer &self) {
// Return the pointer as an integer for Python
return reinterpret_cast<uintptr_t>(self.ptr());
},
R"(
Get the memory address of the buffer as an integer.

Returns:
int: Memory address as an integer (for advanced use cases)
)")
.def("size", &SliceBuffer::size,
R"(
Get the size of the buffer in bytes.

Returns:
int: Size of the buffer in bytes
)")
.def("__len__", &SliceBuffer::size)
.def_buffer([](SliceBuffer &self) -> py::buffer_info {
// SliceBuffer now always contains contiguous memory
Expand Down Expand Up @@ -642,54 +667,261 @@ PYBIND11_MODULE(store, m) {
}
});

// Define the DistributedObjectStore class
py::class_<DistributedObjectStore>(m, "MooncakeDistributedStore")
.def(py::init<>())
.def("setup", &DistributedObjectStore::setup)
.def("init_all", &DistributedObjectStore::initAll)
.def("get", &DistributedObjectStore::get)
// Define the DistributedObjectStore class - Main interface for Mooncake
// Store
py::class_<DistributedObjectStore>(m, "MooncakeDistributedStore",
R"(
Example:
store = MooncakeDistributedStore()
store.setup("localhost:12345", "127.0.0.1:2379")
store.put("my_key", b"my_data")
data = store.get("my_key")
store.close()
)")
.def(py::init<>(),
R"(
Initialize a new MooncakeDistributedStore instance.

Note: You must call setup() or init_all() before using the store.
)")
.def("setup", &DistributedObjectStore::setup, py::arg("local_hostname"),
py::arg("metadata_server"),
py::arg("global_segment_size") = 1024 * 1024 * 16,
py::arg("local_buffer_size") = 1024 * 1024 * 16,
py::arg("protocol") = "tcp", py::arg("rdma_devices") = "",
py::arg("master_server_addr") = "127.0.0.1:50051",
R"(
Configure and initialize the distributed object store.

This method sets up the connection to the metadata server, initializes
the transfer engine, and prepares the local memory segments for storage.

Args:
local_hostname (str): Local host address in format "IP:Port"
(e.g., "192.168.1.100:12345")
metadata_server (str): Metadata server address in format "IP:Port"
(e.g., "127.0.0.1:2379" for etcd)
global_segment_size (int, optional): Size of global memory segment in bytes.
Defaults to 16MB. Set to 0 to skip mounting.
local_buffer_size (int, optional): Size of local buffer for data transfer in bytes.
Defaults to 16MB.
protocol (str, optional): Transport protocol ("tcp" or "rdma"). Defaults to "tcp".
rdma_devices (str, optional): RDMA device specification for RDMA protocol.
Defaults to empty string.
master_server_addr (str, optional): Master server address. Defaults to "127.0.0.1:50051".

Returns:
int: 0 on success, non-zero error code on failure

Note:
For RDMA protocol, ensure proper RDMA devices are available and configured.
)")
.def("init_all", &DistributedObjectStore::initAll, py::arg("protocol"),
py::arg("device_name"),
py::arg("mount_segment_size") = 1024 * 1024 * 16,
R"(
Simplified initialization with default settings.

This is a convenience method that calls setup() with predefined defaults
suitable for local testing and development.

Args:
protocol (str): Transport protocol ("tcp" or "rdma")
device_name (str): Device name for the protocol
mount_segment_size (int, optional): Memory segment size in bytes. Defaults to 16MB.

Returns:
int: 0 on success, non-zero error code on failure
)")
.def("get", &DistributedObjectStore::get, py::arg("key"),
R"(
Retrieve data for the specified key.

This method fetches the complete object data associated with the given key
and returns it as Python bytes. The operation is thread-safe and releases
the GIL during network operations.

Args:
key (str): Object key to retrieve

Returns:
bytes: Object data as bytes, or empty bytes if key doesn't exist or error occurs

Note:
For large objects, consider using get_buffer() for more efficient memory usage.
)")
.def("get_buffer", &DistributedObjectStore::get_buffer,
py::call_guard<py::gil_scoped_release>(),
py::return_value_policy::take_ownership)
py::return_value_policy::take_ownership, py::arg("key"),
R"(
Retrieve data as a SliceBuffer for efficient memory access.

This method returns a SliceBuffer object that implements the Python buffer
protocol, allowing zero-copy access to the retrieved data. This is more
memory-efficient than get() for large objects.

Args:
key (str): Object key to retrieve

Returns:
SliceBuffer or None: Buffer containing the data, or None if key doesn't exist or error occurs

Example:
buffer = store.get_buffer("my_key")
if buffer:
data = bytes(buffer) # Convert to bytes if needed
print(f"Retrieved {buffer.size()} bytes")
)")
.def("remove", &DistributedObjectStore::remove,
py::call_guard<py::gil_scoped_release>())
py::call_guard<py::gil_scoped_release>(), py::arg("key"),
R"(
Remove a single object from the store.

This method deletes the object and all its replicas from the Mooncake store.

Args:
key (str): Object key to remove

Returns:
int: 0 on success, non-zero error code on failure
)")
.def("remove_all", &DistributedObjectStore::removeAll,
py::call_guard<py::gil_scoped_release>())
py::call_guard<py::gil_scoped_release>(),
R"(
Remove all objects from the store.

This method deletes all objects and their replicas from the distributed store.
Use with caution as this operation cannot be undone.

Returns:
int: Number of objects removed, or negative value on error
)")
.def("is_exist", &DistributedObjectStore::isExist,
py::call_guard<py::gil_scoped_release>())
.def("close", &DistributedObjectStore::tearDownAll)
.def("get_size", &DistributedObjectStore::getSize,
py::call_guard<py::gil_scoped_release>())
.def("put",
[](DistributedObjectStore &self, const std::string &key,
py::buffer buf) {
py::buffer_info info = buf.request(/*writable=*/false);
py::gil_scoped_release release;
return self.put(key, std::span<const char>(
static_cast<char *>(info.ptr),
static_cast<size_t>(info.size)));
})
.def("put_parts", [](DistributedObjectStore &self,
const std::string &key, py::args parts) {
// 1) Python buffer → span
std::vector<py::buffer_info> infos;
std::vector<std::span<const char>> spans;
infos.reserve(parts.size());
spans.reserve(parts.size());

for (auto &obj : parts) {
py::buffer buf = py::reinterpret_borrow<py::buffer>(obj);
infos.emplace_back(buf.request(false));
const auto &info = infos.back();
if (info.ndim != 1 || info.itemsize != 1)
throw std::runtime_error("parts must be 1-D bytes-like");

spans.emplace_back(static_cast<const char *>(info.ptr),
static_cast<size_t>(info.size));
}
py::call_guard<py::gil_scoped_release>(), py::arg("key"),
R"(
Check if an object exists in the store.

// 2) Call C++ function
py::gil_scoped_release unlock;
return self.put_parts(key, spans);
});
This method queries the metadata to determine if the specified key exists
without transferring the actual data.

Args:
key (str): Object key to check

Returns:
int: 1 if exists, 0 if not exists, -1 on error
)")
.def("close", &DistributedObjectStore::tearDownAll,
R"(
Clean up and close the distributed store connection.

This method properly shuts down all connections, unmounts memory segments,
and releases resources. Always call this method when done using the store.

Returns:
int: 0 on success, non-zero error code on failure

Note:
The store cannot be used after calling close() unless setup() is called again.
)")
.def("get_size", &DistributedObjectStore::getSize,
py::call_guard<py::gil_scoped_release>(), py::arg("key"),
R"(
Get the size of an object without retrieving its data.

This method queries the metadata to determine the total size of the object
in bytes without transferring the actual data.

Args:
key (str): Object key to query

Returns:
int: Size of the object in bytes, or -1 if object doesn't exist or error occurs
)")
.def(
"put",
[](DistributedObjectStore &self, const std::string &key,
py::buffer buf) {
py::buffer_info info = buf.request(/*writable=*/false);
py::gil_scoped_release release;
return self.put(
key, std::span<const char>(static_cast<char *>(info.ptr),
static_cast<size_t>(info.size)));
},
py::arg("key"), py::arg("value"),
R"(
Store data in the distributed object store.

This method stores the provided data under the specified key with automatic
replication. The data is automatically split into slices if it exceeds the
maximum slice size.

Args:
key (str): Unique identifier for the object
value (bytes-like): Data to store (bytes, bytearray, memoryview, etc.)

Returns:
int: 0 on success, non-zero error code on failure

Example:
result = store.put("my_key", b"Hello, World!")
if result == 0:
print("Data stored successfully")

Note:
The operation is atomic - either all data is stored or none is stored.
)")
.def(
"put_parts",
[](DistributedObjectStore &self, const std::string &key,
py::args parts) {
// Convert Python buffer objects to C++ spans
std::vector<py::buffer_info> infos;
std::vector<std::span<const char>> spans;
infos.reserve(parts.size());
spans.reserve(parts.size());

for (auto &obj : parts) {
py::buffer buf = py::reinterpret_borrow<py::buffer>(obj);
infos.emplace_back(buf.request(false));
const auto &info = infos.back();
if (info.ndim != 1 || info.itemsize != 1)
throw std::runtime_error(
"parts must be 1-D bytes-like");

spans.emplace_back(static_cast<const char *>(info.ptr),
static_cast<size_t>(info.size));
}

// Call C++ function with GIL released for performance
py::gil_scoped_release unlock;
return self.put_parts(key, spans);
},
py::arg("key"),
R"(
Store multiple data parts as a single object in the distributed store.

This method efficiently stores multiple data parts (e.g., list of bytes objects)
as a single contiguous object. The parts are packed together before storage,
which can be more efficient than concatenating them in Python.

Note:
Data parts are efficiently packed and automatically sliced for optimal storage.

Args:
key (str): Unique identifier for the object
*parts: Variable number of bytes-like objects to store as parts

Returns:
int: 0 on success, non-zero error code on failure

Example:
part1 = b"Hello, "
part2 = b"World!"
result = store.put_parts("greeting", part1, part2)
# Equivalent to: store.put("greeting", b"Hello, World!")

Note:
All parts must be 1-dimensional bytes-like objects (bytes, bytearray, etc.).
The operation is atomic - either all parts are stored or none are stored.
)");
}
18 changes: 18 additions & 0 deletions mooncake-integration/store/store_py.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,26 @@ class DistributedObjectStore {
int initAll(const std::string &protocol, const std::string &device_name,
size_t mount_segment_size = 1024 * 1024 * 16); // Default 16MB

/**
* @brief Store data in the distributed object store
* @param key Unique identifier for the object
* @param value Data to store as a span of characters
* @return 0 on success, non-zero error code on failure
*
* @note Large objects are automatically split into slices for efficient
* distributed storage and transfer.
*/
int put(const std::string &key, std::span<const char> value);

/**
* @brief Store multiple data parts as a single object
* @param key Unique identifier for the object
* @param values Vector of data parts to store
* @return 0 on success, non-zero error code on failure
*
* @note Data parts are efficiently packed and automatically sliced for
* optimal distributed storage.
*/
int put_parts(const std::string &key,
std::vector<std::span<const char>> values);

Expand Down
Loading
Loading