Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 131 additions & 3 deletions mooncake-store/tests/master_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ TEST_F(MasterServiceTest, RandomMountUnmountSegment) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(1, 10);
std::vector<std::string> segmentlist = {};
size_t used;
size_t capacity;
while (times--) {
int random_number = dis(gen);
// Define the size of the segment (16MB).
Expand All @@ -145,6 +148,16 @@ TEST_F(MasterServiceTest, RandomMountUnmountSegment) {

// Test remounting after unmount.
EXPECT_EQ(ErrorCode::OK, service_->MountSegment(segment, client_id));

// Test query after mount.
EXPECT_EQ(ErrorCode::OK, service_->GetAllSegments(segmentlist));
EXPECT_EQ(segmentlist.size(), 1);
EXPECT_EQ(segmentlist[0], segment_name);
EXPECT_EQ(ErrorCode::OK,
service_->QuerySegments(segment_name, used, capacity));
EXPECT_EQ(used, 0);
EXPECT_EQ(capacity, kSegmentSize);

EXPECT_EQ(ErrorCode::OK,
service_->UnmountSegment(segment.id, client_id));
}
Expand Down Expand Up @@ -187,6 +200,49 @@ TEST_F(MasterServiceTest, ConcurrentMountUnmount) {
EXPECT_GT(success_count, 0);
}

TEST_F(MasterServiceTest, ReMountSegment) {
// Define the test segment and client.
constexpr size_t kBufferAddress = 0x300000000;
constexpr size_t kSegmentSize = 1024 * 1024 * 16;
std::string segment_name = "test_segment";
Segment segment(generate_uuid(), segment_name, kBufferAddress,
kSegmentSize);
segment.base = kBufferAddress;
segment.size = kSegmentSize;
std::vector<Segment> segmentlist = {segment};
UUID client_id = generate_uuid();

// Test invalid service mode.
std::unique_ptr<MasterService> service_(new MasterService());
EXPECT_EQ(ErrorCode::UNAVAILABLE_IN_CURRENT_MODE,
service_->ReMountSegment(segmentlist, client_id));

// Test remount in with high availability service.
std::unique_ptr<MasterService> service2_(
new MasterService(true, 200UL, 0.1, 1, 0L, 3L, true));
ViewVersionId view_version;
ClientStatus client_status;
// Test first remount.
EXPECT_EQ(ErrorCode::OK,
service2_->Ping(client_id, view_version, client_status));
EXPECT_EQ(ClientStatus::NEED_REMOUNT, client_status);
EXPECT_EQ(ErrorCode::OK, service2_->ReMountSegment(segmentlist, client_id));
EXPECT_EQ(ErrorCode::OK,
service2_->Ping(client_id, view_version, client_status));
EXPECT_EQ(ClientStatus::OK, client_status);
// Test remount again (idempotent operation should succeed).
EXPECT_EQ(ErrorCode::OK, service2_->ReMountSegment(segmentlist, client_id));
std::this_thread::sleep_for(std::chrono::seconds(5));
// Test remount after expired.
EXPECT_EQ(ErrorCode::OK,
service2_->Ping(client_id, view_version, client_status));
EXPECT_EQ(ClientStatus::NEED_REMOUNT, client_status);
EXPECT_EQ(ErrorCode::OK, service2_->ReMountSegment(segmentlist, client_id));
EXPECT_EQ(ErrorCode::OK,
service2_->Ping(client_id, view_version, client_status));
EXPECT_EQ(ClientStatus::OK, client_status);
}

TEST_F(MasterServiceTest, PutStartInvalidParams) {
std::unique_ptr<MasterService> service_(new MasterService());
constexpr size_t buffer = 0x300000000;
Expand Down Expand Up @@ -242,10 +298,13 @@ TEST_F(MasterServiceTest, PutStartEndFlow) {
EXPECT_FALSE(replica_list.empty());
EXPECT_EQ(ReplicaStatus::PROCESSING, replica_list[0].status);

// During put, Get/Remove should fail
// During put, Get/Remove/Reput should fail
EXPECT_EQ(ErrorCode::REPLICA_IS_NOT_READY,
service_->GetReplicaList(key, replica_list));
EXPECT_EQ(ErrorCode::REPLICA_IS_NOT_READY, service_->Remove(key));
EXPECT_EQ(ErrorCode::OBJECT_ALREADY_EXISTS,
service_->PutStart(key, value_length, slice_lengths, config,
replica_list));

// Test PutEnd
EXPECT_EQ(ErrorCode::OK, service_->PutEnd(key));
Expand Down Expand Up @@ -282,10 +341,13 @@ TEST_F(MasterServiceTest, RandomPutStartEndFlow) {
replica_list));
EXPECT_FALSE(replica_list.empty());
EXPECT_EQ(ReplicaStatus::PROCESSING, replica_list[0].status);
// During put, Get/Remove should fail
// During put, Get/Remove/Reput should fail
EXPECT_EQ(ErrorCode::REPLICA_IS_NOT_READY,
service_->GetReplicaList(key, replica_list));
EXPECT_EQ(ErrorCode::REPLICA_IS_NOT_READY, service_->Remove(key));
EXPECT_EQ(ErrorCode::OBJECT_ALREADY_EXISTS,
service_->PutStart(key, value_length, slice_lengths, config,
replica_list));
// Test PutEnd
EXPECT_EQ(ErrorCode::OK, service_->PutEnd(key));
// Verify replica list after PutEnd
Expand Down Expand Up @@ -1191,7 +1253,7 @@ TEST_F(MasterServiceTest, BatchExistKeyTest) {
EXPECT_EQ(ErrorCode::OK, service_->ExistKey(test_keys[i]));
}

// Tets batch
// Test batch
test_keys.push_back("non_existent_key");
auto exist_resp = service_->BatchExistKey(test_keys);
for (int i = 0; i < test_object_num; ++i) {
Expand All @@ -1200,6 +1262,72 @@ TEST_F(MasterServiceTest, BatchExistKeyTest) {
ASSERT_EQ(ErrorCode::OBJECT_NOT_FOUND, exist_resp[test_object_num]);
}

TEST_F(MasterServiceTest, BatchEvictTest) {
// set a large kv_lease_ttl and low test eviction ratio.
const uint64_t kv_lease_ttl = 2000;
double eviction_ratio = 0.1;
double eviction_high_watermark_ratio = 0.6;
std::unique_ptr<MasterService> service_(new MasterService(
false, kv_lease_ttl, eviction_ratio, eviction_high_watermark_ratio));
constexpr size_t buffer = 0x300000000;
constexpr size_t size = 1024 * 1024 * 128;
constexpr size_t object_size = 1024 * 1024;
std::string segment_name = "test_segment";
Segment segment(generate_uuid(), segment_name, buffer, size);
UUID client_id = generate_uuid();
ASSERT_EQ(ErrorCode::OK, service_->MountSegment(segment, client_id));

// Verify batch eviction when reach the eviction_high_watermark_ratio.
for (int i = 0; i < 80; ++i) {
std::string key = "test_key" + std::to_string(i);
std::vector<uint64_t> slice_lengths = {object_size};
ReplicateConfig config;
config.replica_num = 1;
std::vector<Replica::Descriptor> replica_list;
ASSERT_EQ(ErrorCode::OK,
service_->PutStart(key, object_size, slice_lengths, config,
replica_list));
ASSERT_EQ(ErrorCode::OK, service_->PutEnd(key));
// lease first 50 objects.
if (i < 50) {
ASSERT_EQ(ErrorCode::OK,
service_->GetReplicaList(key, replica_list));
}
std::vector<std::string> keys;
ASSERT_EQ(ErrorCode::OK, service_->GetAllKeys(keys));
ASSERT_EQ(keys.size(), i + 1);
}

std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::vector<std::string> keys;
ASSERT_EQ(ErrorCode::OK, service_->GetAllKeys(keys));
ASSERT_LE(keys.size(), 75);

// Verify batch eviction when reach allocation fail.
std::string large_object_key = "test_large_object_key";
constexpr size_t large_object_size = 1024 * 1024 * 64;
std::vector<uint64_t> large_slice_lengths(64, object_size);
ReplicateConfig config;
config.replica_num = 1;
std::vector<Replica::Descriptor> replica_list;
ASSERT_EQ(ErrorCode::NO_AVAILABLE_HANDLE,
service_->PutStart(large_object_key, large_object_size,
large_slice_lengths, config, replica_list));

std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::vector<std::string> keys_after;
ASSERT_EQ(ErrorCode::OK, service_->GetAllKeys(keys_after));
ASSERT_GE(keys.size() - keys_after.size(), 5);

// All leased objects should be accessible.
for (int i = 0; i < 50; ++i) {
std::string key = "test_key" + std::to_string(i);
ASSERT_EQ(ErrorCode::OK, service_->GetReplicaList(key, replica_list));
}
std::this_thread::sleep_for(std::chrono::milliseconds(kv_lease_ttl));
service_->RemoveAll();
}

} // namespace mooncake::test

int main(int argc, char** argv) {
Expand Down
Loading