@@ -29,9 +29,7 @@ extern std::unique_ptr<PikaServer> g_pika_server;
2929namespace pika_raft {
3030
3131// PikaStateMachine implementation
32- PikaStateMachine::PikaStateMachine ()
33- : applied_index_(0 ), leader_term_(-1 ) {
34- }
32+ PikaStateMachine::PikaStateMachine () {}
3533
3634void PikaStateMachine::on_apply (braft::Iterator& iter) {
3735 for (; iter.valid (); iter.next ()) {
@@ -41,7 +39,6 @@ void PikaStateMachine::on_apply(braft::Iterator& iter) {
4139 int64_t index = iter.index ();
4240
4341 if (!g_pika_server || !g_pika_server->GetRaftManager ()) {
44- applied_index_.store (index, std::memory_order_relaxed);
4542 // Run closure asynchronously in bthread to avoid blocking on_apply
4643 if (done) {
4744 braft::run_closure_in_bthread (done_guard.release ());
@@ -55,14 +52,14 @@ void PikaStateMachine::on_apply(braft::Iterator& iter) {
5552 if (done) {
5653 done->status ().set_error (EINVAL, " Failed to parse binlog" );
5754 }
58- applied_index_.store (index, std::memory_order_relaxed);
5955 if (done) {
6056 braft::run_closure_in_bthread (done_guard.release ());
6157 }
6258 continue ;
6359 }
6460
65- rocksdb::Status apply_status = g_pika_server->GetRaftManager ()->ApplyBinlogEntry (binlog);
61+ // Apply binlog with log index for tracking
62+ rocksdb::Status apply_status = g_pika_server->GetRaftManager ()->ApplyBinlogEntry (binlog, index);
6663
6764 if (done) {
6865 if (apply_status.ok ()) {
@@ -73,7 +70,6 @@ void PikaStateMachine::on_apply(braft::Iterator& iter) {
7370 }
7471 }
7572
76- applied_index_.store (index, std::memory_order_relaxed);
7773
7874 // Run closure asynchronously in bthread to avoid blocking on_apply
7975 if (done) {
@@ -94,19 +90,16 @@ int PikaStateMachine::on_snapshot_load(braft::SnapshotReader* reader) {
9490 in >> index;
9591 in.close ();
9692
97- applied_index_.store (index, std::memory_order_relaxed);
9893 return 0 ;
9994 }
10095
10196 return -1 ;
10297}
10398
10499void PikaStateMachine::on_leader_start (int64_t term) {
105- leader_term_.store (term, std::memory_order_relaxed);
106100}
107101
108102void PikaStateMachine::on_leader_stop (const butil::Status& status) {
109- leader_term_.store (-1 , std::memory_order_relaxed);
110103}
111104
112105void PikaStateMachine::on_error (const ::braft::Error& e) {
@@ -465,6 +458,7 @@ pstd::Status RaftManager::RemoveNode(const std::string& db_name,
465458 }
466459
467460 braft::PeerId peer_id = ParsePeerId (peer_addr);
461+
468462 if (peer_id.is_empty ()) {
469463 return pstd::Status::Corruption (" Invalid peer address: " + peer_addr);
470464 }
@@ -475,6 +469,7 @@ pstd::Status RaftManager::RemoveNode(const std::string& db_name,
475469pstd::Status RaftManager::GetClusterInfo (const std::string& db_name,
476470 std::string* info) {
477471 auto node = GetRaftNode (db_name);
472+
478473 if (!node) {
479474 return pstd::Status::Corruption (" Raft node not found for DB: " + db_name);
480475 }
@@ -501,14 +496,39 @@ pstd::Status RaftManager::CreateRaftNode(const std::string& db_name,
501496 return pstd::Status::Corruption (" Raft node already exists for DB: " + db_name);
502497 }
503498
504- // Create peer ID for this node
505- // Use localhost since Pika doesn't expose a host() method
506- std::string addr = " 127.0.0.1:" + std::to_string (g_pika_conf->port () + 3000 );
507- braft::PeerId peer_id = ParsePeerId (addr);
508- if (peer_id.is_empty ()) {
509- return pstd::Status::Corruption (" Failed to create peer ID" );
499+ // Determine the Raft port for this node
500+ // Raft uses Pika port + 3000
501+ int raft_port = g_pika_conf->port () + 3000 ;
502+
503+ // Find the peer address from the peers list that matches our Raft port
504+ // This allows the user to specify the exact address in RAFT.CLUSTER INIT command
505+ braft::PeerId peer_id;
506+ bool found = false ;
507+
508+ for (const auto & peer : peers) {
509+ if (peer.addr .port == raft_port) {
510+ peer_id = peer;
511+ found = true ;
512+ LOG (INFO) << " Found matching peer address in cluster config: " << peer.to_string ();
513+ break ;
514+ }
510515 }
511516
517+ // If no matching peer found, return error
518+ if (!found) {
519+ std::string error_msg = " No matching peer address found in cluster config for Raft port " +
520+ std::to_string (raft_port) +
521+ " . Please include this node's address (with port " +
522+ std::to_string (raft_port) +
523+ " ) in the RAFT.CLUSTER INIT command. " +
524+ " Example: RAFT.CLUSTER INIT <ip1>:" + std::to_string (raft_port) +
525+ " ,<ip2>:<port2>,..." ;
526+ LOG (ERROR) << error_msg;
527+ return pstd::Status::Corruption (error_msg);
528+ }
529+
530+ LOG (INFO) << " Creating Raft node for DB: " << db_name << " with address: " << peer_id.to_string ();
531+
512532 // Create Raft node
513533 auto node = std::make_shared<PikaRaftNode>(group_id_ + " _" + db_name, peer_id);
514534
@@ -625,7 +645,7 @@ void RaftManager::AppendLog(const std::string& db_name,
625645 node->GetRaftNode ()->apply (task);
626646}
627647
628- rocksdb::Status RaftManager::ApplyBinlogEntry (const ::pikiwidb::Binlog& binlog) {
648+ rocksdb::Status RaftManager::ApplyBinlogEntry (const ::pikiwidb::Binlog& binlog, uint64_t log_index ) {
629649 std::string db_name = " db0" ;
630650
631651 auto db = g_pika_server->GetDB (db_name);
@@ -640,14 +660,15 @@ rocksdb::Status RaftManager::ApplyBinlogEntry(const ::pikiwidb::Binlog& binlog)
640660 return rocksdb::Status::InvalidArgument (" Storage is null" );
641661 }
642662
643- auto status = storage->OnBinlogWrite (binlog, 0 );
663+ // Pass log_index to storage layer for tracking
664+ auto status = storage->OnBinlogWrite (binlog, log_index);
644665
645666 if (!status.ok ()) {
646- LOG (ERROR) << " Failed to apply binlog to " << db_name << " : " << status.ToString ();
667+ LOG (ERROR) << " Failed to apply binlog to " << db_name << " at log_index " << log_index
668+ << " : " << status.ToString ();
647669 }
648670
649671 return status;
650672}
651673
652674} // namespace pika_raft
653-
0 commit comments