Skip to content

Commit a90ae8a

Browse files
yawzhangBesroy
authored andcommitted
Add nuraft_state field for complete Raft state persistence
This change use NuRaft's native binary serialization to persist srv_state instead of manual JSON field mapping. This ensures all state fields including receiving_snapshot_ are saved, preventing state inconsistency and unexpected raft behavior.
1 parent 91e0c01 commit a90ae8a

File tree

4 files changed

+121
-12
lines changed

4 files changed

+121
-12
lines changed

conanfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
class HomestoreConan(ConanFile):
1111
name = "homestore"
12-
version = "7.0.7"
12+
version = "7.0.8"
1313

1414
homepage = "https://github.com/eBay/Homestore"
1515
description = "HomeStore Storage Engine"

src/lib/replication/repl_dev/raft_repl_dev.cpp

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1904,31 +1904,64 @@ void RaftReplDev::save_config(const nuraft::cluster_config& config) {
19041904

19051905
void RaftReplDev::save_state(const nuraft::srv_state& state) {
19061906
std::unique_lock lg{m_config_mtx};
1907-
(*m_raft_config_sb)["state"] = nlohmann::json{{"term", state.get_term()},
1908-
{"voted_for", state.get_voted_for()},
1909-
{"election_timer_allowed", state.is_election_timer_allowed()},
1910-
{"catching_up", state.is_catching_up()}};
1907+
// Use NuRaft's native binary serialization
1908+
auto buf = state.serialize();
1909+
std::vector< uint8_t > byte_array(static_cast< const uint8_t* >(buf->data_begin()),
1910+
static_cast< const uint8_t* >(buf->data_begin()) + buf->size());
1911+
(*m_raft_config_sb)["nuraft_state"] = byte_array;
1912+
// Clear legacy state field to ensure rollback version doesn't read old state
1913+
(*m_raft_config_sb)["state"] = nlohmann::json();
19111914
m_raft_config_sb.write();
1912-
RD_LOGI(NO_TRACE_ID, "Saved state {}", (*m_raft_config_sb)["state"].dump());
1915+
1916+
RD_LOGI(NO_TRACE_ID,
1917+
"Saved state in binary format (size={} bytes): term={}, voted_for={}, "
1918+
"election_timer_allowed={}, catching_up={}, receiving_snapshot={}",
1919+
byte_array.size(), state.get_term(), state.get_voted_for(), state.is_election_timer_allowed(),
1920+
state.is_catching_up(), state.is_receiving_snapshot());
19131921
}
19141922

19151923
nuraft::ptr< nuraft::srv_state > RaftReplDev::read_state() {
19161924
std::unique_lock lg{m_config_mtx};
19171925
auto& js = *m_raft_config_sb;
19181926
auto state = nuraft::cs_new< nuraft::srv_state >();
1919-
if (js["state"].empty()) {
1920-
js["state"] = nlohmann::json{{"term", state->get_term()},
1921-
{"voted_for", state->get_voted_for()},
1922-
{"election_timer_allowed", state->is_election_timer_allowed()},
1923-
{"catching_up", state->is_catching_up()}};
1927+
1928+
if (js["state"].empty() && js["nuraft_state"].empty()) {
1929+
RD_LOGI(NO_TRACE_ID, "No existing state found, using default state");
1930+
return state;
1931+
}
1932+
1933+
if (!js["nuraft_state"].empty()) {
1934+
// New binary format
1935+
try {
1936+
std::vector< uint8_t > byte_array = js["nuraft_state"];
1937+
auto buf = nuraft::buffer::alloc(byte_array.size());
1938+
std::memcpy(buf->data_begin(), byte_array.data(), byte_array.size());
1939+
buf->pos(0);
1940+
state = nuraft::srv_state::deserialize(*buf);
1941+
RD_LOGI(NO_TRACE_ID,
1942+
"Loaded state in binary format (size={} bytes): term={}, voted_for={}, "
1943+
"election_timer_allowed={}, catching_up={}, receiving_snapshot={}",
1944+
byte_array.size(), state->get_term(), state->get_voted_for(), state->is_election_timer_allowed(),
1945+
state->is_catching_up(), state->is_receiving_snapshot());
1946+
} catch (std::exception const& e) {
1947+
RD_LOGE(NO_TRACE_ID, "Failed to deserialize state in binary format: {}, using default state", e.what());
1948+
}
19241949
} else {
1950+
// Legacy JSON object format
19251951
try {
19261952
state->set_term(uint64_cast(js["state"]["term"]));
19271953
state->set_voted_for(static_cast< int >(js["state"]["voted_for"]));
19281954
state->allow_election_timer(static_cast< bool >(js["state"]["election_timer_allowed"]));
19291955
state->set_catching_up(static_cast< bool >(js["state"]["catching_up"]));
1956+
// Note: receiving_snapshot_ is NOT in legacy format, will default to false
1957+
RD_LOGW(NO_TRACE_ID,
1958+
"Loaded state from legacy JSON object format: term={}, voted_for={}, election_timer_allowed={}, "
1959+
"catching_up={}. "
1960+
"Will be automatically migrated to binary format on next save.",
1961+
state->get_term(), state->get_voted_for(), state->is_election_timer_allowed(),
1962+
state->is_catching_up());
19301963
} catch (std::out_of_range const&) {
1931-
LOGWARN("State data was not in the expected format [group_id={}]!", m_group_id)
1964+
RD_LOGE(NO_TRACE_ID, "State data in legacy JSON object format is corrupted, using default state");
19321965
}
19331966
}
19341967
return state;

src/tests/test_common/raft_repl_test_base.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "common/homestore_utils.hpp"
4242

4343
#define private public
44+
#define protected public
4445
#include "test_common/hs_repl_test_common.hpp"
4546
#include "replication/service/raft_repl_service.h"
4647
#include "replication/repl_dev/raft_repl_dev.h"

src/tests/test_raft_repl_dev.cpp

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,81 @@ TEST_F(RaftReplDevTest, ReconcileLeader) {
814814
g_helper->sync_for_cleanup_start();
815815
}
816816

817+
TEST_F(RaftReplDevTest, NuraftStateTransition) {
818+
LOGINFO("Homestore replica={} setup completed", g_helper->replica_num());
819+
g_helper->sync_for_test_start();
820+
// Get the RaftReplDev instance
821+
auto repl_dev = std::dynamic_pointer_cast< RaftReplDev >(dbs_[0]->repl_dev());
822+
ASSERT_NE(repl_dev, nullptr);
823+
LOGINFO("Step 0: Got RaftReplDev instance for group_id={}", repl_dev->group_id_str());
824+
825+
// Step 1: Manually set legacy "state" field (JSON object format) to simulate old version data
826+
LOGINFO("Step 1: Setting legacy JSON object format state");
827+
auto& js = *(repl_dev->m_raft_config_sb);
828+
// Clear any existing nuraft_state to ensure we test legacy format
829+
if (js.contains("nuraft_state")) { js.erase("nuraft_state"); }
830+
// Set legacy state
831+
js["state"] =
832+
nlohmann::json{{"term", 100}, {"voted_for", 5}, {"election_timer_allowed", true}, {"catching_up", false}};
833+
repl_dev->m_raft_config_sb.write();
834+
LOGINFO("Step 1: Written legacy state - term=100, voted_for=5, election_timer_allowed=true, catching_up=false");
835+
836+
// Step 2: Read old state from legacy format
837+
g_helper->sync_for_verify_start();
838+
LOGINFO("Step 2: Reading state from legacy JSON object format");
839+
auto old_state = repl_dev->read_state();
840+
ASSERT_NE(old_state, nullptr);
841+
ASSERT_EQ(old_state->get_term(), 100);
842+
ASSERT_EQ(old_state->get_voted_for(), 5);
843+
ASSERT_EQ(old_state->is_election_timer_allowed(), true);
844+
ASSERT_EQ(old_state->is_catching_up(), false);
845+
// receiving_snapshot should be false (not in legacy format)
846+
ASSERT_EQ(old_state->is_receiving_snapshot(), false);
847+
LOGINFO("Step 2: Successfully read legacy state - term={}, voted_for={}, election_timer_allowed={}, "
848+
"catching_up={}, receiving_snapshot={}",
849+
old_state->get_term(), old_state->get_voted_for(), old_state->is_election_timer_allowed(),
850+
old_state->is_catching_up(), old_state->is_receiving_snapshot());
851+
852+
// Step 3: Update state and save in new binary format
853+
g_helper->sync_for_test_start();
854+
LOGINFO("Step 3: Updating state values and saving in binary format");
855+
old_state->set_term(150);
856+
old_state->set_voted_for(10);
857+
old_state->allow_election_timer(false);
858+
old_state->set_catching_up(true);
859+
old_state->set_receiving_snapshot(true); // new field
860+
LOGINFO("Step 3: Saving new state - term=150, voted_for=10, election_timer_allowed=false, "
861+
"catching_up=true, receiving_snapshot=true");
862+
repl_dev->save_state(*old_state);
863+
864+
// Step 4: Verify fields in JSON superblock
865+
LOGINFO("Step 4: Verifying JSON superblock fields");
866+
g_helper->sync_for_verify_start();
867+
js = *(repl_dev->m_raft_config_sb);
868+
ASSERT_TRUE(js.contains("nuraft_state"));
869+
ASSERT_TRUE(js["nuraft_state"].is_array());
870+
LOGINFO("Step 4: Confirmed 'nuraft_state' field exists and is array (size={} bytes)", js["nuraft_state"].size());
871+
872+
// Verify legacy "state" field is empty (not removed, but set to null/empty for rollback compatibility)
873+
ASSERT_TRUE(js["state"].empty() || js["state"].is_null());
874+
LOGINFO("Step 4: Confirmed legacy 'state' field is empty/null for rollback compatibility");
875+
876+
// Step 5: Read back and verify new state from binary format
877+
LOGINFO("Step 5: Reading back state from binary format (nuraft_state field)");
878+
auto new_state = repl_dev->read_state();
879+
ASSERT_NE(new_state, nullptr);
880+
ASSERT_EQ(new_state->get_term(), 150);
881+
ASSERT_EQ(new_state->get_voted_for(), 10);
882+
ASSERT_EQ(new_state->is_election_timer_allowed(), false);
883+
ASSERT_EQ(new_state->is_catching_up(), true);
884+
ASSERT_EQ(new_state->is_receiving_snapshot(), true);
885+
LOGINFO("Step 5: Successfully read new state - term={}, voted_for={}, election_timer_allowed={}, "
886+
"catching_up={}, receiving_snapshot={}",
887+
new_state->get_term(), new_state->get_voted_for(), new_state->is_election_timer_allowed(),
888+
new_state->is_catching_up(), new_state->is_receiving_snapshot());
889+
g_helper->sync_for_cleanup_start();
890+
}
891+
817892
int main(int argc, char* argv[]) {
818893
int parsed_argc = argc;
819894
char** orig_argv = argv;

0 commit comments

Comments
 (0)