diff --git a/mooncake-integration/store/store_py.cpp b/mooncake-integration/store/store_py.cpp index f6339068c..615ac4fe6 100644 --- a/mooncake-integration/store/store_py.cpp +++ b/mooncake-integration/store/store_py.cpp @@ -604,15 +604,40 @@ std::shared_ptr DistributedObjectStore::get_buffer( } PYBIND11_MODULE(store, m) { - // Define the SliceBuffer class + /* + * Mooncake Store Python Bindings + */ + + // Define the SliceBuffer class for efficient memory management py::class_>(m, "SliceBuffer", - py::buffer_protocol()) - .def("ptr", - [](const SliceBuffer &self) { - // Return the pointer as an integer for Python - return reinterpret_cast(self.ptr()); - }) - .def("size", &SliceBuffer::size) + py::buffer_protocol(), + R"( + A buffer class that holds contiguous data retrieved from the distributed store. + + This class provides RAII-style memory management and implements the Python + buffer protocol for efficient data access without copying. + + Note: SliceBuffer automatically manages memory deallocation when destroyed. + )") + .def( + "ptr", + [](const SliceBuffer &self) { + // Return the pointer as an integer for Python + return reinterpret_cast(self.ptr()); + }, + R"( + Get the memory address of the buffer as an integer. + + Returns: + int: Memory address as an integer (for advanced use cases) + )") + .def("size", &SliceBuffer::size, + R"( + Get the size of the buffer in bytes. + + Returns: + int: Size of the buffer in bytes + )") .def("__len__", &SliceBuffer::size) .def_buffer([](SliceBuffer &self) -> py::buffer_info { // SliceBuffer now always contains contiguous memory @@ -642,54 +667,261 @@ PYBIND11_MODULE(store, m) { } }); - // Define the DistributedObjectStore class - py::class_(m, "MooncakeDistributedStore") - .def(py::init<>()) - .def("setup", &DistributedObjectStore::setup) - .def("init_all", &DistributedObjectStore::initAll) - .def("get", &DistributedObjectStore::get) + // Define the DistributedObjectStore class - Main interface for Mooncake + // Store + py::class_(m, "MooncakeDistributedStore", + R"( + Example: + store = MooncakeDistributedStore() + store.setup("localhost:12345", "127.0.0.1:2379") + store.put("my_key", b"my_data") + data = store.get("my_key") + store.close() + )") + .def(py::init<>(), + R"( + Initialize a new MooncakeDistributedStore instance. + + Note: You must call setup() or init_all() before using the store. + )") + .def("setup", &DistributedObjectStore::setup, py::arg("local_hostname"), + py::arg("metadata_server"), + py::arg("global_segment_size") = 1024 * 1024 * 16, + py::arg("local_buffer_size") = 1024 * 1024 * 16, + py::arg("protocol") = "tcp", py::arg("rdma_devices") = "", + py::arg("master_server_addr") = "127.0.0.1:50051", + R"( + Configure and initialize the distributed object store. + + This method sets up the connection to the metadata server, initializes + the transfer engine, and prepares the local memory segments for storage. + + Args: + local_hostname (str): Local host address in format "IP:Port" + (e.g., "192.168.1.100:12345") + metadata_server (str): Metadata server address in format "IP:Port" + (e.g., "127.0.0.1:2379" for etcd) + global_segment_size (int, optional): Size of global memory segment in bytes. + Defaults to 16MB. Set to 0 to skip mounting. + local_buffer_size (int, optional): Size of local buffer for data transfer in bytes. + Defaults to 16MB. + protocol (str, optional): Transport protocol ("tcp" or "rdma"). Defaults to "tcp". + rdma_devices (str, optional): RDMA device specification for RDMA protocol. + Defaults to empty string. + master_server_addr (str, optional): Master server address. Defaults to "127.0.0.1:50051". + + Returns: + int: 0 on success, non-zero error code on failure + + Note: + For RDMA protocol, ensure proper RDMA devices are available and configured. + )") + .def("init_all", &DistributedObjectStore::initAll, py::arg("protocol"), + py::arg("device_name"), + py::arg("mount_segment_size") = 1024 * 1024 * 16, + R"( + Simplified initialization with default settings. + + This is a convenience method that calls setup() with predefined defaults + suitable for local testing and development. + + Args: + protocol (str): Transport protocol ("tcp" or "rdma") + device_name (str): Device name for the protocol + mount_segment_size (int, optional): Memory segment size in bytes. Defaults to 16MB. + + Returns: + int: 0 on success, non-zero error code on failure + )") + .def("get", &DistributedObjectStore::get, py::arg("key"), + R"( + Retrieve data for the specified key. + + This method fetches the complete object data associated with the given key + and returns it as Python bytes. The operation is thread-safe and releases + the GIL during network operations. + + Args: + key (str): Object key to retrieve + + Returns: + bytes: Object data as bytes, or empty bytes if key doesn't exist or error occurs + + Note: + For large objects, consider using get_buffer() for more efficient memory usage. + )") .def("get_buffer", &DistributedObjectStore::get_buffer, py::call_guard(), - py::return_value_policy::take_ownership) + py::return_value_policy::take_ownership, py::arg("key"), + R"( + Retrieve data as a SliceBuffer for efficient memory access. + + This method returns a SliceBuffer object that implements the Python buffer + protocol, allowing zero-copy access to the retrieved data. This is more + memory-efficient than get() for large objects. + + Args: + key (str): Object key to retrieve + + Returns: + SliceBuffer or None: Buffer containing the data, or None if key doesn't exist or error occurs + + Example: + buffer = store.get_buffer("my_key") + if buffer: + data = bytes(buffer) # Convert to bytes if needed + print(f"Retrieved {buffer.size()} bytes") + )") .def("remove", &DistributedObjectStore::remove, - py::call_guard()) + py::call_guard(), py::arg("key"), + R"( + Remove a single object from the store. + + This method deletes the object and all its replicas from the Mooncake store. + + Args: + key (str): Object key to remove + + Returns: + int: 0 on success, non-zero error code on failure + )") .def("remove_all", &DistributedObjectStore::removeAll, - py::call_guard()) + py::call_guard(), + R"( + Remove all objects from the store. + + This method deletes all objects and their replicas from the distributed store. + Use with caution as this operation cannot be undone. + + Returns: + int: Number of objects removed, or negative value on error + )") .def("is_exist", &DistributedObjectStore::isExist, - py::call_guard()) - .def("close", &DistributedObjectStore::tearDownAll) - .def("get_size", &DistributedObjectStore::getSize, - py::call_guard()) - .def("put", - [](DistributedObjectStore &self, const std::string &key, - py::buffer buf) { - py::buffer_info info = buf.request(/*writable=*/false); - py::gil_scoped_release release; - return self.put(key, std::span( - static_cast(info.ptr), - static_cast(info.size))); - }) - .def("put_parts", [](DistributedObjectStore &self, - const std::string &key, py::args parts) { - // 1) Python buffer → span - std::vector infos; - std::vector> spans; - infos.reserve(parts.size()); - spans.reserve(parts.size()); - - for (auto &obj : parts) { - py::buffer buf = py::reinterpret_borrow(obj); - infos.emplace_back(buf.request(false)); - const auto &info = infos.back(); - if (info.ndim != 1 || info.itemsize != 1) - throw std::runtime_error("parts must be 1-D bytes-like"); - - spans.emplace_back(static_cast(info.ptr), - static_cast(info.size)); - } + py::call_guard(), py::arg("key"), + R"( + Check if an object exists in the store. - // 2) Call C++ function - py::gil_scoped_release unlock; - return self.put_parts(key, spans); - }); + This method queries the metadata to determine if the specified key exists + without transferring the actual data. + + Args: + key (str): Object key to check + + Returns: + int: 1 if exists, 0 if not exists, -1 on error + )") + .def("close", &DistributedObjectStore::tearDownAll, + R"( + Clean up and close the distributed store connection. + + This method properly shuts down all connections, unmounts memory segments, + and releases resources. Always call this method when done using the store. + + Returns: + int: 0 on success, non-zero error code on failure + + Note: + The store cannot be used after calling close() unless setup() is called again. + )") + .def("get_size", &DistributedObjectStore::getSize, + py::call_guard(), py::arg("key"), + R"( + Get the size of an object without retrieving its data. + + This method queries the metadata to determine the total size of the object + in bytes without transferring the actual data. + + Args: + key (str): Object key to query + + Returns: + int: Size of the object in bytes, or -1 if object doesn't exist or error occurs + )") + .def( + "put", + [](DistributedObjectStore &self, const std::string &key, + py::buffer buf) { + py::buffer_info info = buf.request(/*writable=*/false); + py::gil_scoped_release release; + return self.put( + key, std::span(static_cast(info.ptr), + static_cast(info.size))); + }, + py::arg("key"), py::arg("value"), + R"( + Store data in the distributed object store. + + This method stores the provided data under the specified key with automatic + replication. The data is automatically split into slices if it exceeds the + maximum slice size. + + Args: + key (str): Unique identifier for the object + value (bytes-like): Data to store (bytes, bytearray, memoryview, etc.) + + Returns: + int: 0 on success, non-zero error code on failure + + Example: + result = store.put("my_key", b"Hello, World!") + if result == 0: + print("Data stored successfully") + + Note: + The operation is atomic - either all data is stored or none is stored. + )") + .def( + "put_parts", + [](DistributedObjectStore &self, const std::string &key, + py::args parts) { + // Convert Python buffer objects to C++ spans + std::vector infos; + std::vector> spans; + infos.reserve(parts.size()); + spans.reserve(parts.size()); + + for (auto &obj : parts) { + py::buffer buf = py::reinterpret_borrow(obj); + infos.emplace_back(buf.request(false)); + const auto &info = infos.back(); + if (info.ndim != 1 || info.itemsize != 1) + throw std::runtime_error( + "parts must be 1-D bytes-like"); + + spans.emplace_back(static_cast(info.ptr), + static_cast(info.size)); + } + + // Call C++ function with GIL released for performance + py::gil_scoped_release unlock; + return self.put_parts(key, spans); + }, + py::arg("key"), + R"( + Store multiple data parts as a single object in the distributed store. + + This method efficiently stores multiple data parts (e.g., list of bytes objects) + as a single contiguous object. The parts are packed together before storage, + which can be more efficient than concatenating them in Python. + + Note: + Data parts are efficiently packed and automatically sliced for optimal storage. + + Args: + key (str): Unique identifier for the object + *parts: Variable number of bytes-like objects to store as parts + + Returns: + int: 0 on success, non-zero error code on failure + + Example: + part1 = b"Hello, " + part2 = b"World!" + result = store.put_parts("greeting", part1, part2) + # Equivalent to: store.put("greeting", b"Hello, World!") + + Note: + All parts must be 1-dimensional bytes-like objects (bytes, bytearray, etc.). + The operation is atomic - either all parts are stored or none are stored. + )"); } diff --git a/mooncake-integration/store/store_py.h b/mooncake-integration/store/store_py.h index c30a5c7c3..901a77f00 100644 --- a/mooncake-integration/store/store_py.h +++ b/mooncake-integration/store/store_py.h @@ -110,8 +110,26 @@ class DistributedObjectStore { int initAll(const std::string &protocol, const std::string &device_name, size_t mount_segment_size = 1024 * 1024 * 16); // Default 16MB + /** + * @brief Store data in the distributed object store + * @param key Unique identifier for the object + * @param value Data to store as a span of characters + * @return 0 on success, non-zero error code on failure + * + * @note Large objects are automatically split into slices for efficient + * distributed storage and transfer. + */ int put(const std::string &key, std::span value); + /** + * @brief Store multiple data parts as a single object + * @param key Unique identifier for the object + * @param values Vector of data parts to store + * @return 0 on success, non-zero error code on failure + * + * @note Data parts are efficiently packed and automatically sliced for + * optimal distributed storage. + */ int put_parts(const std::string &key, std::vector> values); diff --git a/mooncake-wheel/tests/test_distributed_object_store.py b/mooncake-wheel/tests/test_distributed_object_store.py index 22f63d9ba..d0f7e1ff7 100644 --- a/mooncake-wheel/tests/test_distributed_object_store.py +++ b/mooncake-wheel/tests/test_distributed_object_store.py @@ -1,3 +1,8 @@ +""" +Test suite for Mooncake Distributed Object Store Python interface. + +""" + import unittest import os import time @@ -7,10 +12,15 @@ # The lease time of the kv object, should be set equal to # the master's value. -DEFAULT_KV_LEASE_TTL = 200 # 200 milliseconds +DEFAULT_KV_LEASE_TTL = 200 # 200 milliseconds def get_client(store): - """Initialize and setup the distributed store client.""" + """ + Initialize and setup the distributed store client. + + This function demonstrates the enhanced parameter documentation and proper + argument naming in the Python bindings. + """ protocol = os.getenv("PROTOCOL", "tcp") device_name = os.getenv("DEVICE_NAME", "ibp6s0") local_hostname = os.getenv("LOCAL_HOSTNAME", "localhost") @@ -18,17 +28,17 @@ def get_client(store): global_segment_size = 3200 * 1024 * 1024 # 3200 MB local_buffer_size = 512 * 1024 * 1024 # 512 MB master_server_address = os.getenv("MASTER_SERVER", "127.0.0.1:50051") - + retcode = store.setup( - local_hostname, - metadata_server, - global_segment_size, - local_buffer_size, - protocol, - device_name, - master_server_address + local_hostname=local_hostname, + metadata_server=metadata_server, + global_segment_size=global_segment_size, + local_buffer_size=local_buffer_size, + protocol=protocol, + rdma_devices=device_name, + master_server_addr=master_server_address ) - + if retcode: raise RuntimeError(f"Failed to setup store client. Return code: {retcode}") @@ -106,6 +116,93 @@ def test_basic_put_get_exist_operations(self): self.assertLess(self.store.get_size(key_2), 0) self.assertEqual(self.store.is_exist(key_2), 0) + def test_pythonic_interface_features(self): + """Test improved Pythonic interface features including get_buffer and put_parts.""" + # Test get_buffer method for efficient memory access + test_data = b"Testing SliceBuffer functionality with some data" + key = "test_buffer_key" + + # Store data and retrieve as buffer + self.assertEqual(self.store.put(key=key, value=test_data), 0) + + # Test get_buffer method + buffer = self.store.get_buffer(key=key) + self.assertIsNotNone(buffer, "get_buffer should return a valid buffer") + self.assertEqual(buffer.size(), len(test_data)) + + # Convert buffer to bytes and verify content + retrieved_data = bytes(buffer) + self.assertEqual(retrieved_data, test_data) + + # Test put_parts method for efficient multi-part storage + part1 = b"Hello, " + part2 = b"World! " + part3 = b"This is a multi-part message." + parts_key = "test_parts_key" + + # Store multiple parts as a single object + result = self.store.put_parts(parts_key, part1, part2, part3) + self.assertEqual(result, 0, "put_parts should succeed") + + # Verify the combined data + expected_combined = part1 + part2 + part3 + retrieved_combined = self.store.get(key=parts_key) + self.assertEqual(retrieved_combined, expected_combined) + self.assertEqual(self.store.get_size(key=parts_key), len(expected_combined)) + + # Test with different data types (bytearray, memoryview) + array_data = bytearray(b"ByteArray data") + memory_data = memoryview(b"MemoryView data") + mixed_key = "test_mixed_types" + + result = self.store.put_parts(mixed_key, array_data, memory_data) + self.assertEqual(result, 0, "put_parts should handle different buffer types") + + expected_mixed = bytes(array_data) + bytes(memory_data) + retrieved_mixed = self.store.get(key=mixed_key) + self.assertEqual(retrieved_mixed, expected_mixed) + + # Cleanup + time.sleep(DEFAULT_KV_LEASE_TTL / 1000) + self.assertEqual(self.store.remove(key=key), 0) + self.assertEqual(self.store.remove(key=parts_key), 0) + self.assertEqual(self.store.remove(key=mixed_key), 0) + + def test_large_object_handling(self): + """Test automatic slicing for large objects.""" + # Test with objects larger than typical slice size + large_size = 32 * 1024 * 1024 # 32MB + large_data = os.urandom(large_size) + large_key = "test_large_object" + + print(f"\nTesting large object storage ({large_size / 1024 / 1024:.1f}MB)") + + # Store large object + start_time = time.time() + result = self.store.put(key=large_key, value=large_data) + put_time = time.time() - start_time + + self.assertEqual(result, 0, "Large object storage should succeed") + print(f"Large object put time: {put_time:.2f} seconds") + + # Verify size + stored_size = self.store.get_size(key=large_key) + self.assertEqual(stored_size, large_size, "Stored size should match original") + + # Retrieve and verify content + start_time = time.time() + retrieved_data = self.store.get(key=large_key) + get_time = time.time() - start_time + + self.assertEqual(len(retrieved_data), large_size, "Retrieved size should match") + self.assertEqual(retrieved_data, large_data, "Retrieved content should match") + print(f"Large object get time: {get_time:.2f} seconds") + print(f"Throughput: {large_size / 1024 / 1024 / max(put_time, get_time):.1f} MB/s") + + # Cleanup + time.sleep(DEFAULT_KV_LEASE_TTL / 1000) + self.assertEqual(self.store.remove(key=large_key), 0) + def test_concurrent_stress_with_barrier(self): """Test concurrent Put/Get operations with multiple threads using barrier.""" NUM_THREADS = 8