From ba2971af6b1cbfca672a6071df57048de96455d2 Mon Sep 17 00:00:00 2001 From: muxue Date: Wed, 17 Sep 2025 12:01:54 +0800 Subject: [PATCH] [Store] Add Pure C Interface of Client and ut --- mooncake-store/include/client_c.h | 141 ++++ mooncake-store/src/CMakeLists.txt | 1 + mooncake-store/src/client_c.cpp | 352 +++++++++ mooncake-store/tests/CMakeLists.txt | 1 + .../tests/client_c_integration_test.cpp | 681 ++++++++++++++++++ 5 files changed, 1176 insertions(+) create mode 100644 mooncake-store/include/client_c.h create mode 100644 mooncake-store/src/client_c.cpp create mode 100644 mooncake-store/tests/client_c_integration_test.cpp diff --git a/mooncake-store/include/client_c.h b/mooncake-store/include/client_c.h new file mode 100644 index 000000000..d0ef204f8 --- /dev/null +++ b/mooncake-store/include/client_c.h @@ -0,0 +1,141 @@ +// Copyright 2024 KVCache.AI +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef MOONCAKE_CLIENT_C +#define MOONCAKE_CLIENT_C + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif // __cplusplus + +// types.h 中的 ErrorCode Code +typedef int32_t ErrorCode_t; + +#define MOONCAKE_ERROR_OK ((ErrorCode_t)0) +#define MOONCAKE_ERROR_INTERNAL_ERROR ((ErrorCode_t) - 1) +#define MOONCAKE_ERROR_BUFFER_OVERFLOW ((ErrorCode_t) - 10) +#define MOONCAKE_ERROR_SHARD_INDEX_OUT_OF_RANGE ((ErrorCode_t) - 100) +#define MOONCAKE_ERROR_SEGMENT_NOT_FOUND ((ErrorCode_t) - 101) +#define MOONCAKE_ERROR_SEGMENT_ALREADY_EXISTS ((ErrorCode_t) - 102) +#define MOONCAKE_ERROR_NO_AVAILABLE_HANDLE ((ErrorCode_t) - 200) +#define MOONCAKE_ERROR_INVALID_VERSION ((ErrorCode_t) - 300) +#define MOONCAKE_ERROR_INVALID_KEY ((ErrorCode_t) - 400) +#define MOONCAKE_ERROR_WRITE_FAIL ((ErrorCode_t) - 500) +#define MOONCAKE_ERROR_INVALID_PARAMS ((ErrorCode_t) - 600) +#define MOONCAKE_ERROR_INVALID_WRITE ((ErrorCode_t) - 700) +#define MOONCAKE_ERROR_INVALID_READ ((ErrorCode_t) - 701) +#define MOONCAKE_ERROR_INVALID_REPLICA ((ErrorCode_t) - 702) +#define MOONCAKE_ERROR_REPLICA_IS_NOT_READY ((ErrorCode_t) - 703) +#define MOONCAKE_ERROR_OBJECT_NOT_FOUND ((ErrorCode_t) - 704) +#define MOONCAKE_ERROR_OBJECT_ALREADY_EXISTS ((ErrorCode_t) - 705) +#define MOONCAKE_ERROR_OBJECT_HAS_LEASE ((ErrorCode_t) - 706) +#define MOONCAKE_ERROR_TRANSFER_FAIL ((ErrorCode_t) - 800) +#define MOONCAKE_ERROR_RPC_FAIL ((ErrorCode_t) - 900) +#define MOONCAKE_ERROR_ETCD_OPERATION_ERROR ((ErrorCode_t) - 1000) +#define MOONCAKE_ERROR_ETCD_KEY_NOT_EXIST ((ErrorCode_t) - 1001) +#define MOONCAKE_ERROR_ETCD_TRANSACTION_FAIL ((ErrorCode_t) - 1002) +#define MOONCAKE_ERROR_ETCD_CTX_CANCELLED ((ErrorCode_t) - 1003) +#define MOONCAKE_ERROR_UNAVAILABLE_IN_CURRENT_STATUS ((ErrorCode_t) - 1010) +#define MOONCAKE_ERROR_UNAVAILABLE_IN_CURRENT_MODE ((ErrorCode_t) - 1011) +#define MOONCAKE_ERROR_FILE_NOT_FOUND ((ErrorCode_t) - 1100) +#define MOONCAKE_ERROR_FILE_OPEN_FAIL ((ErrorCode_t) - 1101) +#define MOONCAKE_ERROR_FILE_READ_FAIL ((ErrorCode_t) - 1102) +#define MOONCAKE_ERROR_FILE_WRITE_FAIL ((ErrorCode_t) - 1103) +#define MOONCAKE_ERROR_FILE_INVALID_BUFFER ((ErrorCode_t) - 1104) +#define MOONCAKE_ERROR_FILE_LOCK_FAIL ((ErrorCode_t) - 1105) +#define MOONCAKE_ERROR_FILE_INVALID_HANDLE ((ErrorCode_t) - 1106) + +typedef struct { + void* ptr = NULL; + size_t size = 0; +} Slice_t; + +typedef struct { + Slice_t* slices; + size_t slices_count; +} Slice_span_t; + +typedef struct { + size_t replica_num; + const char* preferred_segment; +} ReplicateConfig_t; + +typedef struct { + const char* key; + Slice_span_t* slices_span; +} BatchItem_t; + +typedef struct { + ErrorCode_t* results; + size_t results_count; +} ErrorCode_span_t; + +typedef void* client_t; + +client_t mooncake_client_create(const char* local_hostname, + const char* metadata_connstring, + const char* protocol, const char* rdma_devices, + const char* master_server_entry); + +ErrorCode_t mooncake_client_register_local_memory(client_t client, void* addr, + size_t length, + const char* location, + bool remote_accessible, + bool update_metadata); + +ErrorCode_t mooncake_client_unregister_local_memory(client_t client, void* addr, + bool update_metadata); + +ErrorCode_t mooncake_client_mount_segment(client_t client, void* segment_ptr, + size_t size); + +ErrorCode_t mooncake_client_unmount_segment(client_t client, void* segment_ptr, + size_t size); + +ErrorCode_t mooncake_client_get(client_t client, const char* key, + Slice_t* slices, size_t slices_count); + +ErrorCode_t mooncake_client_put(client_t client, const char* key, + Slice_t* slices, size_t slices_count, + const ReplicateConfig_t config); + +ErrorCode_t mooncake_client_isexist(client_t client, const char* key); + +ErrorCode_t mooncake_client_remove(client_t client, const char* key); + +long mooncake_client_remove_byregex(client_t client, const char* regex); + +long mooncake_client_remove_all(client_t client); + +ErrorCode_span_t mooncake_client_batch_get(client_t client, BatchItem_t* items, + size_t items_count); + +ErrorCode_span_t mooncake_client_batch_put(client_t client, BatchItem_t* items, + size_t items_count, + const ReplicateConfig_t config); + +void mooncake_client_destroy(client_t client); + +uint64_t mooncake_max_slice_size(); + +void* mooncake_allocate_segment_memory(size_t size); + +#ifdef __cplusplus +} +#endif // __cplusplus + +#endif // MOONCAKE_CLIENT_C \ No newline at end of file diff --git a/mooncake-store/src/CMakeLists.txt b/mooncake-store/src/CMakeLists.txt index f45e4d751..4ff8170a7 100644 --- a/mooncake-store/src/CMakeLists.txt +++ b/mooncake-store/src/CMakeLists.txt @@ -24,6 +24,7 @@ set(MOONCAKE_STORE_SOURCES client_buffer.cpp pybind_client.cpp http_metadata_server.cpp + client_c.cpp ) set(EXTRA_LIBS "") diff --git a/mooncake-store/src/client_c.cpp b/mooncake-store/src/client_c.cpp new file mode 100644 index 000000000..f973f29a5 --- /dev/null +++ b/mooncake-store/src/client_c.cpp @@ -0,0 +1,352 @@ +// Copyright 2024 KVCache.AI +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "client_c.h" +#include "client.h" +#include "utils.h" +#include +#include + +using namespace mooncake; + +// 管理 client 对象的生命周期 +void* create_obj(std::shared_ptr client) { + return new std::shared_ptr(client); +} + +void* get_raw(void* handle) { + return reinterpret_cast*>(handle)->get(); +} + +void destroy_obj(void* handle) { + delete reinterpret_cast*>(handle); +} + +client_t mooncake_client_create(const char* local_hostname, + const char* metadata_connstring, + const char* protocol, const char* rdma_devices, + const char* master_server_entry) { + std::optional device_name = + (rdma_devices && strcmp(rdma_devices, "") != 0) + ? std::make_optional(rdma_devices) + : std::nullopt; + std::optional> native = + Client::Create(local_hostname, metadata_connstring, protocol, + device_name, master_server_entry); + if (native) { + return create_obj(native.value()); + } else { + return nullptr; + } +} + +ErrorCode_t mooncake_client_register_local_memory(client_t client, void* addr, + size_t length, + const char* location, + bool remote_accessible, + bool update_metadata) { + Client* native_client = (Client*)get_raw(client); + if (native_client == nullptr) { + return MOONCAKE_ERROR_INVALID_PARAMS; + } + if (length == 0 || addr == nullptr) { + return MOONCAKE_ERROR_OK; + } + auto result = native_client->RegisterLocalMemory( + addr, length, location, remote_accessible, update_metadata); + if (result) { + return MOONCAKE_ERROR_OK; + } else { + LOG(ERROR) << "Failed to register local memory: " + << toString(result.error()); + return static_cast(result.error()); + } +} + +ErrorCode_t mooncake_client_unregister_local_memory(client_t client, void* addr, + bool update_metadata) { + Client* native_client = (Client*)get_raw(client); + if (native_client == nullptr) { + return MOONCAKE_ERROR_INVALID_PARAMS; + } + if (addr == nullptr) { + return MOONCAKE_ERROR_OK; + } + auto result = native_client->unregisterLocalMemory(addr, update_metadata); + if (result) { + return MOONCAKE_ERROR_OK; + } else { + LOG(ERROR) << "Failed to unregister local memory: " + << toString(result.error()); + return static_cast(result.error()); + } +} + +ErrorCode_t mooncake_client_mount_segment(client_t client, void* segment_ptr, + size_t size) { + Client* native_client = (Client*)get_raw(client); + if (native_client == nullptr) { + return MOONCAKE_ERROR_INVALID_PARAMS; + } + if (segment_ptr == nullptr || size == 0) { + return MOONCAKE_ERROR_INVALID_PARAMS; + } + auto result = native_client->MountSegment(segment_ptr, size); + if (result) { + return MOONCAKE_ERROR_OK; + } else { + LOG(ERROR) << "Failed to mount segment: " << toString(result.error()); + return static_cast(result.error()); + } +} + +ErrorCode_t mooncake_client_unmount_segment(client_t client, void* segment_ptr, + size_t size) { + Client* native_client = (Client*)get_raw(client); + if (native_client == nullptr) { + return MOONCAKE_ERROR_INVALID_PARAMS; + } + if (segment_ptr == nullptr || size == 0) { + return MOONCAKE_ERROR_INVALID_PARAMS; + } + auto result = native_client->UnmountSegment(segment_ptr, size); + if (result) { + return MOONCAKE_ERROR_OK; + } else { + LOG(ERROR) << "Failed to unmount segment: " << toString(result.error()); + return static_cast(result.error()); + } +} + +ErrorCode_t mooncake_client_get(client_t client, const char* key, + Slice_t* slices, size_t slices_count) { + Client* native_client = (Client*)get_raw(client); + std::vector slices_vector; + for (size_t i = 0; i < slices_count; i++) { + Slice slice; + slice.ptr = slices[i].ptr; + slice.size = slices[i].size; + slices_vector.push_back(slice); + } + auto result = native_client->Get(key, slices_vector); + if (result) { + return MOONCAKE_ERROR_OK; + } else { + LOG(ERROR) << "Failed to get: " << toString(result.error()); + return static_cast(result.error()); + } +} + +ErrorCode_t mooncake_client_put(client_t client, const char* key, + Slice_t* slices, size_t slices_count, + const ReplicateConfig_t config) { + Client* native_client = (Client*)get_raw(client); + std::vector slices_vector; + for (size_t i = 0; i < slices_count; i++) { + Slice slice; + slice.ptr = slices[i].ptr; + slice.size = slices[i].size; + slices_vector.push_back(slice); + } + ReplicateConfig cpp_config; + cpp_config.replica_num = config.replica_num; + cpp_config.preferred_segment = + config.preferred_segment ? config.preferred_segment : ""; + auto result = native_client->Put(key, slices_vector, cpp_config); + if (result) { + return MOONCAKE_ERROR_OK; + } else { + LOG(ERROR) << "Failed to put: " << toString(result.error()); + return static_cast(result.error()); + } +} + +ErrorCode_t mooncake_client_isexist(client_t client, const char* key) { + Client* native_client = (Client*)get_raw(client); + auto result = native_client->IsExist(key); + if (result) { + if (result.value() == true) { + return MOONCAKE_ERROR_OK; + } else { + return MOONCAKE_ERROR_OBJECT_NOT_FOUND; + } + } else { + LOG(ERROR) << "Failed to query: " << toString(result.error()); + return static_cast(result.error()); + } +} + +ErrorCode_t mooncake_client_remove(client_t client, const char* key) { + Client* native_client = (Client*)get_raw(client); + auto result = native_client->Remove(key); + if (result) { + return MOONCAKE_ERROR_OK; + } else { + LOG(ERROR) << "Failed to remove: " << toString(result.error()); + return static_cast(result.error()); + } +} + +long mooncake_client_remove_byregex(client_t client, const char* regex) { + Client* native_client = (Client*)get_raw(client); + if (regex == nullptr) { + return MOONCAKE_ERROR_INVALID_PARAMS; + } + auto result = native_client->RemoveByRegex(regex); + if (result) { + return result.value(); + } else { + LOG(ERROR) << "Failed to remove by regex: " << toString(result.error()); + return -1; + } +} + +long mooncake_client_remove_all(client_t client) { + Client* native_client = (Client*)get_raw(client); + auto result = native_client->RemoveAll(); + if (result) { + return result.value(); + } else { + LOG(ERROR) << "Failed to remove all: " << toString(result.error()); + return -1; + } +} + +ErrorCode_span_t mooncake_client_batch_get(client_t client, BatchItem_t* items, + size_t items_count) { + ErrorCode_span_t error_span; + error_span.results_count = items_count; + error_span.results = + (ErrorCode_t*)malloc(items_count * sizeof(ErrorCode_t)); + // init all results to MOONCAKE_ERROR_INTERNAL_ERROR + memset(error_span.results, 0xFF, items_count * sizeof(ErrorCode_t)); + + Client* native_client = (Client*)get_raw(client); + if (items == nullptr || items_count == 0) { + return error_span; + } + + // Prepare keys and slices for C++ BatchGet + std::vector object_keys; + std::unordered_map> slices_map; + object_keys.reserve(items_count); + + for (size_t i = 0; i < items_count; ++i) { + std::string key = items[i].key ? items[i].key : ""; + object_keys.push_back(key); + + // Convert C slices to C++ slices + std::vector cpp_slices; + cpp_slices.reserve(items[i].slices_span->slices_count); + for (size_t j = 0; j < items[i].slices_span->slices_count; ++j) { + Slice slice; + slice.ptr = items[i].slices_span->slices[j].ptr; + slice.size = items[i].slices_span->slices[j].size; + cpp_slices.push_back(slice); + } + slices_map[key] = std::move(cpp_slices); + } + + // Call C++ BatchGet + auto cpp_results = native_client->BatchGet(object_keys, slices_map); + + // Copy results back to C results array + for (size_t i = 0; i < cpp_results.size(); ++i) { + const auto& result = cpp_results[i]; + + if (result) { + error_span.results[i] = MOONCAKE_ERROR_OK; + } else { + error_span.results[i] = static_cast(result.error()); + LOG(ERROR) << "BatchGet failed for key " << object_keys[i] << ": " + << toString(result.error()); + } + } + + return error_span; +} + +ErrorCode_span_t mooncake_client_batch_put(client_t client, BatchItem_t* items, + size_t items_count, + const ReplicateConfig_t config) { + ErrorCode_span_t error_span; + error_span.results_count = items_count; + error_span.results = + (ErrorCode_t*)malloc(items_count * sizeof(ErrorCode_t)); + // init all results to MOONCAKE_ERROR_INTERNAL_ERROR + memset(error_span.results, 0xFF, items_count * sizeof(ErrorCode_t)); + + Client* native_client = (Client*)get_raw(client); + if (items == nullptr || items_count == 0) { + return error_span; + } + + // Prepare keys and slices for C++ BatchPut + std::vector object_keys; + std::vector> batched_slices; + object_keys.reserve(items_count); + batched_slices.reserve(items_count); + + for (size_t i = 0; i < items_count; ++i) { + std::string key = items[i].key ? items[i].key : ""; + object_keys.push_back(key); + + // Convert C slices to C++ slices + std::vector cpp_slices; + cpp_slices.reserve(items[i].slices_span->slices_count); + for (size_t j = 0; j < items[i].slices_span->slices_count; ++j) { + Slice slice; + slice.ptr = items[i].slices_span->slices[j].ptr; + slice.size = items[i].slices_span->slices[j].size; + cpp_slices.push_back(slice); + } + batched_slices.push_back(std::move(cpp_slices)); + } + + // Convert C config to C++ config + ReplicateConfig cpp_config; + cpp_config.replica_num = config.replica_num; + cpp_config.preferred_segment = + config.preferred_segment ? config.preferred_segment : ""; + + // Call C++ BatchPut + auto cpp_results = + native_client->BatchPut(object_keys, batched_slices, cpp_config); + + // Copy results back to C results array + for (size_t i = 0; i < cpp_results.size(); ++i) { + const auto& result = cpp_results[i]; + + if (result) { + error_span.results[i] = MOONCAKE_ERROR_OK; + } else { + error_span.results[i] = static_cast(result.error()); + LOG(ERROR) << "BatchPut failed for key " << object_keys[i] << ": " + << toString(result.error()); + } + } + + return error_span; +} + +void mooncake_client_destroy(client_t client) { destroy_obj(client); } + +uint64_t mooncake_max_slice_size() { return kMaxSliceSize; } + +void* mooncake_allocate_segment_memory(size_t size) { + if (size == 0) { + return nullptr; + } + return allocate_buffer_allocator_memory(size); +} diff --git a/mooncake-store/tests/CMakeLists.txt b/mooncake-store/tests/CMakeLists.txt index da426d62e..d0b9c0a6a 100644 --- a/mooncake-store/tests/CMakeLists.txt +++ b/mooncake-store/tests/CMakeLists.txt @@ -18,6 +18,7 @@ add_store_test(eviction_strategy_test eviction_strategy_test.cpp) add_store_test(master_service_test master_service_test.cpp) add_store_test(master_service_ssd_test master_service_ssd_test.cpp) add_store_test(client_integration_test client_integration_test.cpp) +add_store_test(client_c_integration_test client_c_integration_test.cpp) add_store_test(master_metrics_test master_metrics_test.cpp) add_store_test(posix_file_test posix_file_test.cpp) add_store_test(thread_pool_test thread_pool_test.cpp) diff --git a/mooncake-store/tests/client_c_integration_test.cpp b/mooncake-store/tests/client_c_integration_test.cpp new file mode 100644 index 000000000..098bfbe11 --- /dev/null +++ b/mooncake-store/tests/client_c_integration_test.cpp @@ -0,0 +1,681 @@ +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "allocator.h" +#include "client_c.h" +#include "utils.h" + +DEFINE_string(protocol, "tcp", "Transfer protocol: rdma|tcp"); +DEFINE_string(device_name, "ibp6s0", + "Device name to use, valid if protocol=rdma"); +DEFINE_string(transfer_engine_metadata_url, "http://localhost:8080/metadata", + "Metadata connection string for transfer engine"); +DEFINE_uint64(default_kv_lease_ttl, mooncake::DEFAULT_DEFAULT_KV_LEASE_TTL, + "Default lease time for kv objects, must be set to the " + "same as the master's default_kv_lease_ttl"); + +namespace mooncake { +namespace testing { + +// test pure C APIs of client_c.h +class ClientCIntegrationTest : public ::testing::Test { + protected: + static client_t CreateClient(const std::string& host_name) { + client_t client = + mooncake_client_create(host_name.c_str(), // Local hostname + FLAGS_transfer_engine_metadata_url + .c_str(), // Metadata connection string + FLAGS_protocol.c_str(), // Protocol + FLAGS_device_name.c_str(), // RDMA devices + "localhost:50051" // Master server address + ); + + EXPECT_NE(client, nullptr) + << "Failed to create client with host_name: " << host_name; + return client; + } + + static void SetUpTestSuite() { + // Initialize glog + google::InitGoogleLogging("ClientCIntegrationTest"); + FLAGS_logtostderr = 1; + + // Override flags from environment variables if present + if (getenv("PROTOCOL")) FLAGS_protocol = getenv("PROTOCOL"); + if (getenv("DEVICE_NAME")) FLAGS_device_name = getenv("DEVICE_NAME"); + if (getenv("MC_METADATA_SERVER")) + FLAGS_transfer_engine_metadata_url = getenv("MC_METADATA_SERVER"); + + LOG(INFO) << "Protocol: " << FLAGS_protocol + << ", Device name: " << FLAGS_device_name + << ", Metadata URL: " << FLAGS_transfer_engine_metadata_url; + + if (getenv("DEFAULT_KV_LEASE_TTL")) { + default_kv_lease_ttl_ = std::stoul(getenv("DEFAULT_KV_LEASE_TTL")); + } else { + default_kv_lease_ttl_ = FLAGS_default_kv_lease_ttl; + } + LOG(INFO) << "Default KV lease TTL: " << default_kv_lease_ttl_; + + InitializeTestClient(); + InitializeSegmentProviderClient(); + } + + static void TearDownTestSuite() { + CleanupSegment(); + CleanupClients(); + google::ShutdownGoogleLogging(); + } + + static void InitializeSegmentProviderClient() { + // segment_provider_client_ only provide segments. + segment_provider_client_ = CreateClient("localhost:17912"); + ASSERT_NE(segment_provider_client_, nullptr); + + // mount segment for segment_provider_client_ + segment_size_ = 512 * 1024 * 1024; // 512 MB + segment_ptr_ = mooncake_allocate_segment_memory(segment_size_); + LOG_ASSERT(segment_ptr_); + + auto mount_result = mooncake_client_mount_segment( + segment_provider_client_, segment_ptr_, segment_size_); + ASSERT_EQ(mount_result, MOONCAKE_ERROR_OK) << "Failed to mount segment"; + LOG(INFO) << "Segment provider client initialized successfully"; + } + + static void InitializeTestClient() { + // test_client_ is used for testing purposes. + test_client_ = CreateClient("localhost:17913"); + ASSERT_NE(test_client_, nullptr); + + // register local memory for test_client_ + client_buffer_allocator_ = + std::make_unique(128 * 1024 * 1024); + auto register_result = mooncake_client_register_local_memory( + test_client_, client_buffer_allocator_->getBase(), + 128 * 1024 * 1024, "cpu:0", false, false); + ASSERT_EQ(register_result, MOONCAKE_ERROR_OK) + << "Failed to register local memory"; + + // Mount segment for test_client_ as well + test_client_segment_size_ = 512 * 1024 * 1024; // 512 MB + test_client_segment_ptr_ = + mooncake_allocate_segment_memory(test_client_segment_size_); + ASSERT_NE(test_client_segment_ptr_, nullptr); + + auto test_client_mount_result = mooncake_client_mount_segment( + test_client_, test_client_segment_ptr_, test_client_segment_size_); + ASSERT_EQ(test_client_mount_result, MOONCAKE_ERROR_OK) + << "Failed to mount segment for test_client_"; + LOG(INFO) << "Test client initialized successfully"; + } + + static void CleanupClients() { + // cleanup test_client_ first + if (test_client_) { + mooncake_client_destroy(test_client_); + client_buffer_allocator_.reset(); + free(test_client_segment_ptr_); + } + + // cleanup segment_provider_client_ + if (segment_provider_client_) { + mooncake_client_destroy(segment_provider_client_); + free(segment_ptr_); + } + } + + static void CleanupSegment() { + // test unmount_segment and unregister_local_memory + if (segment_provider_client_ && segment_ptr_) { + auto unmount_result = mooncake_client_unmount_segment( + segment_provider_client_, segment_ptr_, segment_size_); + ASSERT_EQ(unmount_result, MOONCAKE_ERROR_OK) + << "Failed to unmount segment"; + } + + if (test_client_) { + // unregister test + if (client_buffer_allocator_) { + auto unregister_result = + mooncake_client_unregister_local_memory( + test_client_, client_buffer_allocator_->getBase(), + false); + ASSERT_EQ(unregister_result, MOONCAKE_ERROR_OK) + << "Failed to unregister local memory"; + } + if (test_client_segment_ptr_) { + auto unmount_result = mooncake_client_unmount_segment( + test_client_, test_client_segment_ptr_, + test_client_segment_size_); + ASSERT_EQ(unmount_result, MOONCAKE_ERROR_OK) + << "Failed to unmount segment"; + } + } + } + + static client_t test_client_; + static client_t segment_provider_client_; + static std::unique_ptr client_buffer_allocator_; + static void* segment_ptr_; + static size_t segment_size_; + static void* test_client_segment_ptr_; + static size_t test_client_segment_size_; + static uint64_t default_kv_lease_ttl_; +}; + +// Static members initialization +client_t ClientCIntegrationTest::test_client_ = nullptr; +client_t ClientCIntegrationTest::segment_provider_client_ = nullptr; +void* ClientCIntegrationTest::segment_ptr_ = nullptr; +void* ClientCIntegrationTest::test_client_segment_ptr_ = nullptr; +std::unique_ptr + ClientCIntegrationTest::client_buffer_allocator_ = nullptr; +size_t ClientCIntegrationTest::segment_size_ = 0; +size_t ClientCIntegrationTest::test_client_segment_size_ = 0; +uint64_t ClientCIntegrationTest::default_kv_lease_ttl_ = 0; + +// Test basic Put/Get operations through the C client +TEST_F(ClientCIntegrationTest, BasicPutGetOperations) { + const std::string test_data = "Hello, World!"; + const std::string key = "test_key_c"; + + // Prepare buffer and slice for Put + void* buffer = client_buffer_allocator_->allocate(test_data.size()); + ASSERT_NE(buffer, nullptr); + memcpy(buffer, test_data.data(), test_data.size()); + + Slice_t put_slice; + put_slice.ptr = buffer; + put_slice.size = test_data.size(); + + // Test Put operation + ReplicateConfig_t config; + config.replica_num = 1; + + auto put_result = + mooncake_client_put(test_client_, key.c_str(), &put_slice, 1, config); + ASSERT_EQ(put_result, MOONCAKE_ERROR_OK) << "Put operation failed"; + client_buffer_allocator_->deallocate(buffer, test_data.size()); + + // Prepare buffer for Get operation (larger buffer like C++ version) + void* get_buffer = client_buffer_allocator_->allocate(1 * 1024 * 1024); + ASSERT_NE(get_buffer, nullptr); + + Slice_t get_slice; + get_slice.ptr = get_buffer; + get_slice.size = test_data.size(); + + // Test Get operation + auto get_result = + mooncake_client_get(test_client_, key.c_str(), &get_slice, 1); + ASSERT_EQ(get_result, MOONCAKE_ERROR_OK) << "Get operation failed"; + + // Verify data + ASSERT_EQ(get_slice.size, test_data.size()); + ASSERT_EQ(get_slice.ptr, get_buffer); + ASSERT_EQ(memcmp(get_slice.ptr, test_data.data(), test_data.size()), 0); + client_buffer_allocator_->deallocate(get_buffer, test_data.size()); + + // Put again with the same key, should succeed + buffer = client_buffer_allocator_->allocate(test_data.size()); + memcpy(buffer, test_data.data(), test_data.size()); + put_slice.ptr = buffer; + put_slice.size = test_data.size(); + auto put_result2 = + mooncake_client_put(test_client_, key.c_str(), &put_slice, 1, config); + ASSERT_EQ(put_result2, MOONCAKE_ERROR_OK) << "Second Put operation failed"; + + // Sleep for lease TTL and remove + std::this_thread::sleep_for( + std::chrono::milliseconds(default_kv_lease_ttl_)); + auto remove_result = mooncake_client_remove(test_client_, key.c_str()); + ASSERT_EQ(remove_result, MOONCAKE_ERROR_OK) << "Remove operation failed"; + client_buffer_allocator_->deallocate(buffer, test_data.size()); +} + +// Test Remove operation +TEST_F(ClientCIntegrationTest, RemoveOperation) { + const std::string test_data = "Test data for removal"; + const std::string key = "remove_test_key_c"; + + // Put data first + void* buffer = client_buffer_allocator_->allocate(test_data.size()); + ASSERT_NE(buffer, nullptr); + memcpy(buffer, test_data.data(), test_data.size()); + + Slice_t slice; + slice.ptr = buffer; + slice.size = test_data.size(); + + ReplicateConfig_t config; + config.replica_num = 1; + + auto put_result = + mooncake_client_put(test_client_, key.c_str(), &slice, 1, config); + ASSERT_EQ(put_result, MOONCAKE_ERROR_OK) << "Put operation failed"; + client_buffer_allocator_->deallocate(buffer, test_data.size()); + + // Remove the data + auto remove_result = mooncake_client_remove(test_client_, key.c_str()); + ASSERT_EQ(remove_result, MOONCAKE_ERROR_OK) << "Remove operation failed"; + + // Verify that the data is removed using IsExist operation + auto exist_result = mooncake_client_isexist(test_client_, key.c_str()); + ASSERT_NE(exist_result, MOONCAKE_ERROR_OK) + << "IsExist should not return OK for removed key"; + + // Try to get the removed data - should fail + void* get_buffer = client_buffer_allocator_->allocate(test_data.size()); + ASSERT_NE(get_buffer, nullptr); + + Slice_t get_slice; + get_slice.ptr = get_buffer; + get_slice.size = test_data.size(); + + auto get_result = + mooncake_client_get(test_client_, key.c_str(), &get_slice, 1); + ASSERT_NE(get_result, MOONCAKE_ERROR_OK) + << "Get should fail for removed key"; + client_buffer_allocator_->deallocate(get_buffer, test_data.size()); +} + +// Test local preferred allocation strategy +TEST_F(ClientCIntegrationTest, LocalPreferredAllocationTest) { + const std::string test_data = "Test data for local preferred allocation"; + const std::string key = "local_preferred_test_key_c"; + + void* buffer = client_buffer_allocator_->allocate(test_data.size()); + ASSERT_NE(buffer, nullptr); + memcpy(buffer, test_data.data(), test_data.size()); + + Slice_t slice; + slice.ptr = buffer; + slice.size = test_data.size(); + + ReplicateConfig_t config; + config.replica_num = 1; + // Although there is only one segment now, in order to test the preferred + // allocation logic, we still set it. This will prevent potential + // compatibility issues in the future. + config.preferred_segment = "localhost:17912"; // Local segment + + auto put_result = + mooncake_client_put(test_client_, key.c_str(), &slice, 1, config); + ASSERT_EQ(put_result, MOONCAKE_ERROR_OK) << "Put operation failed"; + client_buffer_allocator_->deallocate(buffer, test_data.size()); + + // Verify data through Get operation + void* get_buffer = client_buffer_allocator_->allocate(test_data.size()); + ASSERT_NE(get_buffer, nullptr); + + Slice_t get_slice; + get_slice.ptr = get_buffer; + get_slice.size = test_data.size(); + + auto get_result = + mooncake_client_get(test_client_, key.c_str(), &get_slice, 1); + ASSERT_EQ(get_result, MOONCAKE_ERROR_OK) << "Get operation failed"; + ASSERT_EQ(get_slice.size, test_data.size()); + ASSERT_EQ(memcmp(get_slice.ptr, test_data.data(), test_data.size()), 0); + client_buffer_allocator_->deallocate(get_buffer, test_data.size()); + + // Clean up + std::this_thread::sleep_for( + std::chrono::milliseconds(default_kv_lease_ttl_)); + auto remove_result = mooncake_client_remove(test_client_, key.c_str()); + ASSERT_EQ(remove_result, MOONCAKE_ERROR_OK) << "Remove operation failed"; +} + +// Test large allocation operations +TEST_F(ClientCIntegrationTest, LargeAllocateTest) { + const size_t data_size = 1 * 1024 * 1024; // 1MB + const uint64_t kNumBuffers = 5; + const std::string key = "large_test_key_c"; + + ReplicateConfig_t config; + config.replica_num = 1; + + // Allocate buffers and fill with data + std::vector buffers(kNumBuffers); + std::vector slices(kNumBuffers); + + for (size_t i = 0; i < kNumBuffers; ++i) { + buffers[i] = client_buffer_allocator_->allocate(data_size); + ASSERT_NE(buffers[i], nullptr); + + // Fill with pattern data + char pattern = 'A' + i; + memset(buffers[i], pattern, data_size); + + slices[i].ptr = buffers[i]; + slices[i].size = data_size; + } + + // Put operation + auto put_result = mooncake_client_put(test_client_, key.c_str(), + slices.data(), kNumBuffers, config); + ASSERT_EQ(put_result, MOONCAKE_ERROR_OK) << "Put operation failed"; + + // Clear buffers before Get + for (size_t i = 0; i < kNumBuffers; ++i) { + memset(buffers[i], 0, data_size); + } + + // Get operation + auto get_result = mooncake_client_get(test_client_, key.c_str(), + slices.data(), kNumBuffers); + ASSERT_EQ(get_result, MOONCAKE_ERROR_OK) << "Get operation failed"; + + // Verify data + for (size_t i = 0; i < kNumBuffers; ++i) { + ASSERT_EQ(slices[i].size, data_size); + char expected_pattern = 'A' + i; + char* buffer_data = static_cast(slices[i].ptr); + for (size_t j = 0; j < data_size; ++j) { + ASSERT_EQ(buffer_data[j], expected_pattern) + << "Data mismatch at buffer " << i << " position " << j; + } + } + + // Remove the key + std::this_thread::sleep_for( + std::chrono::milliseconds(default_kv_lease_ttl_)); + auto remove_result = mooncake_client_remove(test_client_, key.c_str()); + ASSERT_EQ(remove_result, MOONCAKE_ERROR_OK) << "Remove operation failed"; + + // Cleanup + for (size_t i = 0; i < kNumBuffers; ++i) { + client_buffer_allocator_->deallocate(buffers[i], data_size); + } +} + +// Test error handling +TEST_F(ClientCIntegrationTest, ErrorHandlingTest) { + const std::string key = "error_test_key_c"; + + // Test Get on non-existent key + void* buffer = client_buffer_allocator_->allocate(1024); + ASSERT_NE(buffer, nullptr); + + Slice_t slice; + slice.ptr = buffer; + slice.size = 1024; + + auto get_result = mooncake_client_get(test_client_, key.c_str(), &slice, 1); + ASSERT_NE(get_result, MOONCAKE_ERROR_OK) + << "Get should fail for non-existent key"; + + // Test IsExist on non-existent key + auto exist_result = mooncake_client_isexist(test_client_, key.c_str()); + ASSERT_NE(exist_result, MOONCAKE_ERROR_OK) + << "IsExist should fail for non-existent key"; + + // Test Remove on non-existent key + auto remove_result = mooncake_client_remove(test_client_, key.c_str()); + ASSERT_NE(remove_result, MOONCAKE_ERROR_OK) + << "Remove should fail for non-existent key"; + + client_buffer_allocator_->deallocate(buffer, 1024); +} + +// Test max slice size +TEST_F(ClientCIntegrationTest, MaxSliceSizeTest) { + uint64_t max_size = mooncake_max_slice_size(); + ASSERT_GT(max_size, 0) << "Max slice size should be greater than 0"; + LOG(INFO) << "Max slice size: " << max_size << " bytes"; +} + +// Test batch Put/Get operations through the C client +TEST_F(ClientCIntegrationTest, BatchPutGetOperations) { + int batch_sz = 100; + std::vector keys; + std::vector test_data_list; + std::vector batch_items; + + std::vector slice_spans; // 存有所有 slice_span, 用于内存管理 + std::vector all_slices; // 存有所有 slice, 用于内存管理 + + for (int i = 0; i < batch_sz; i++) { + keys.push_back("test_key_batch_put_c_" + std::to_string(i)); + test_data_list.push_back("test_data_c_" + std::to_string(i)); + } + + void* buffer = nullptr; + void* target_buffer = nullptr; + batch_items.reserve(batch_sz); + slice_spans.reserve(batch_sz); + all_slices.reserve(batch_sz); + + for (int i = 0; i < batch_sz; i++) { + buffer = client_buffer_allocator_->allocate(test_data_list[i].size()); + ASSERT_NE(buffer, nullptr); + memcpy(buffer, test_data_list[i].data(), test_data_list[i].size()); + + // Create slice + Slice_t slice; + slice.ptr = buffer; + slice.size = test_data_list[i].size(); + all_slices.push_back(slice); + + // Create slice span + Slice_span_t slice_span; + slice_span.slices = &all_slices[i]; + slice_span.slices_count = 1; + slice_spans.push_back(slice_span); + + // Create batch item + BatchItem_t item; + item.key = keys[i].c_str(); + item.slices_span = &slice_spans[i]; + batch_items.push_back(item); + } + // Test Batch Put operation + ReplicateConfig_t config; + config.replica_num = 1; + auto start = std::chrono::high_resolution_clock::now(); + ErrorCode_span_t batch_put_results = mooncake_client_batch_put( + test_client_, batch_items.data(), batch_sz, config); + // Check that all operations succeeded + for (size_t i = 0; i < batch_put_results.results_count; i++) { + ASSERT_EQ(batch_put_results.results[i], MOONCAKE_ERROR_OK) + << "BatchPut operation failed for key " << keys[i]; + client_buffer_allocator_->deallocate(all_slices[i].ptr, + test_data_list[i].size()); + } + free(batch_put_results.results); + auto end = std::chrono::high_resolution_clock::now(); + LOG(INFO) << "Time taken for BatchPut: " + << std::chrono::duration_cast(end - + start) + .count() + << "us"; + + start = std::chrono::high_resolution_clock::now(); + for (int i = 0; i < batch_sz; i++) { + target_buffer = + client_buffer_allocator_->allocate(test_data_list[i].size()); + ASSERT_NE(target_buffer, nullptr); + + Slice_t get_slice; + get_slice.ptr = target_buffer; + get_slice.size = test_data_list[i].size(); + + auto get_result = + mooncake_client_get(test_client_, keys[i].c_str(), &get_slice, 1); + ASSERT_EQ(get_result, MOONCAKE_ERROR_OK) + << "Get operation failed for key " << keys[i]; + ASSERT_EQ(get_slice.size, test_data_list[i].size()); + ASSERT_EQ(memcmp(get_slice.ptr, test_data_list[i].data(), + test_data_list[i].size()), + 0); + client_buffer_allocator_->deallocate(target_buffer, + test_data_list[i].size()); + } + end = std::chrono::high_resolution_clock::now(); + LOG(INFO) << "Time taken for single Get: " + << std::chrono::duration_cast(end - + start) + .count() + << "us"; + + // Prepare for Batch Get operation + std::vector batch_get_items; + std::vector get_slice_spans; + std::vector get_all_slices; + + batch_get_items.reserve(batch_sz); + get_slice_spans.reserve(batch_sz); + get_all_slices.reserve(batch_sz); + + for (int i = 0; i < batch_sz; i++) { + // Allocate buffer for get + target_buffer = + client_buffer_allocator_->allocate(test_data_list[i].size()); + ASSERT_NE(target_buffer, nullptr); + + // Create slice + Slice_t get_slice; + get_slice.ptr = target_buffer; + get_slice.size = test_data_list[i].size(); + get_all_slices.push_back(get_slice); + + // Create slice span + Slice_span_t get_slice_span; + get_slice_span.slices = &get_all_slices[i]; + get_slice_span.slices_count = 1; + get_slice_spans.push_back(get_slice_span); + + // Create batch item + BatchItem_t get_item; + get_item.key = keys[i].c_str(); + get_item.slices_span = &get_slice_spans[i]; + batch_get_items.push_back(get_item); + } + + // Test Batch Get operation + start = std::chrono::high_resolution_clock::now(); + ErrorCode_span_t batch_get_results = mooncake_client_batch_get( + test_client_, batch_get_items.data(), batch_sz); + for (size_t i = 0; i < batch_get_results.results_count; i++) { + ASSERT_EQ(batch_get_results.results[i], MOONCAKE_ERROR_OK) + << "BatchGet operation failed for key " << keys[i]; + } + free(batch_get_results.results); + end = std::chrono::high_resolution_clock::now(); + + LOG(INFO) << "Time taken for BatchGet: " + << std::chrono::duration_cast(end - + start) + .count() + << "us"; + + // Verify data from BatchGet + for (int i = 0; i < batch_sz; i++) { + ASSERT_EQ(get_all_slices[i].size, test_data_list[i].size()); + ASSERT_EQ(memcmp(get_all_slices[i].ptr, test_data_list[i].data(), + test_data_list[i].size()), + 0); + client_buffer_allocator_->deallocate(get_all_slices[i].ptr, + test_data_list[i].size()); + } +} + +// Test batch put with duplicate keys +TEST_F(ClientCIntegrationTest, BatchPutDuplicateKeys) { + const std::string test_data = "test_data_duplicate_c"; + const std::string key = "duplicate_key_c"; + + // Create two identical keys + std::vector keys = {key, key}; + std::vector batch_items; + std::vector slice_spans; + std::vector all_slices; + std::vector allocated_buffers; + + batch_items.reserve(2); + slice_spans.reserve(2); + all_slices.reserve(2); + + // Prepare data for both keys + for (int i = 0; i < 2; i++) { + // Allocate buffer and copy data + void* buffer = client_buffer_allocator_->allocate(test_data.size()); + ASSERT_NE(buffer, nullptr); + memcpy(buffer, test_data.data(), test_data.size()); + + // Create slice + Slice_t slice; + slice.ptr = buffer; + slice.size = test_data.size(); + all_slices.push_back(slice); + + // Create slice span + Slice_span_t slice_span; + slice_span.slices = &all_slices[i]; + slice_span.slices_count = 1; + slice_spans.push_back(slice_span); + + // Create batch item + BatchItem_t item; + item.key = keys[i].c_str(); + item.slices_span = &slice_spans[i]; + batch_items.push_back(item); + } + + ReplicateConfig_t config; + config.replica_num = 1; + + // Test batch put with duplicate keys + ErrorCode_span_t batch_put_results = + mooncake_client_batch_put(test_client_, batch_items.data(), 2, config); + + // Check that we got results for both operations + ASSERT_EQ(batch_put_results.results_count, 2); + + // Both operations should succeed + // Because we currently consider `OBJECT_ALREADY_EXISTS` as success + + for (size_t i = 0; i < batch_put_results.results_count; i++) { + ASSERT_EQ(batch_put_results.results[i], MOONCAKE_ERROR_OK) + << "BatchPut operation failed for duplicate key at index " << i; + } + + // Cleanup result array + free(batch_put_results.results); + + // Clean up allocated memory + for (int i = 0; i < 2; i++) { + client_buffer_allocator_->deallocate(all_slices[i].ptr, + test_data.size()); + } + + // Clean up the key that was successfully put + std::this_thread::sleep_for( + std::chrono::milliseconds(default_kv_lease_ttl_)); + auto remove_result = mooncake_client_remove(test_client_, key.c_str()); + ASSERT_EQ(remove_result, MOONCAKE_ERROR_OK) << "Remove operation failed"; +} + +} // namespace testing +} // namespace mooncake + +int main(int argc, char** argv) { + // Initialize Google Test + ::testing::InitGoogleTest(&argc, argv); + + // Initialize Google's flags library + gflags::ParseCommandLineFlags(&argc, &argv, false); + + // Run all tests + return RUN_ALL_TESTS(); +}