diff --git a/conf/pika.conf b/conf/pika.conf index 97d171d419..e52139e1ae 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -313,6 +313,11 @@ write-binlog : yes # Supported Units [K|M|G], binlog-file-size default unit is in [bytes] and the default value is 100M. binlog-file-size : 104857600 +# The interval (in number of logs) for forcing a disk flush of the binlog. +# A value of 1 means fsync for every log. A higher value improves performance at the cost of durability. +# Default: 100 +binlog-fsync-interval : 1 + # Automatically triggers a small compaction according to statistics # Use the cache to store up to 'max-cache-statistic-keys' keys # If 'max-cache-statistic-keys' set to '0', that means turn off the statistics function diff --git a/include/pika_binlog.h b/include/pika_binlog.h index 43615ae0b4..3f32d3c992 100644 --- a/include/pika_binlog.h +++ b/include/pika_binlog.h @@ -61,6 +61,8 @@ class Binlog : public pstd::noncopyable { * Set Producer pro_num and pro_offset with lock */ pstd::Status SetProducerStatus(uint32_t pro_num, uint64_t pro_offset, uint32_t term = 0, uint64_t index = 0); + // Force sync data to disk + pstd::Status Sync(); // Need to hold Lock(); pstd::Status Truncate(uint32_t pro_num, uint64_t pro_offset, uint64_t index); diff --git a/include/pika_client_conn.h b/include/pika_client_conn.h index bc4c28db6a..40afbf7866 100644 --- a/include/pika_client_conn.h +++ b/include/pika_client_conn.h @@ -8,6 +8,7 @@ #include #include +#include #include "acl.h" #include "include/pika_command.h" @@ -52,6 +53,13 @@ class PikaClientConn : public net::RedisConn { bool cache_miss_in_rtc_; }; + struct ParallelTask { + std::vector> resps; + std::vector> futures; + std::atomic completed_count{0}; + size_t total_tasks{0}; + }; + struct TxnStateBitMask { public: static constexpr uint8_t Start = 0; @@ -72,6 +80,7 @@ class PikaClientConn : public net::RedisConn { void BatchExecRedisCmd(const std::vector& argvs, bool cache_miss_in_rtc); int DealMessage(const net::RedisCmdArgsType& argv, std::string* response) override { return 0; } static void DoBackgroundTask(void* arg); + static void ParallelExecRedisCmd(void* arg); bool IsPubSub() { return is_pubsub_; } void SetIsPubSub(bool is_pubsub) { is_pubsub_ = is_pubsub; } diff --git a/include/pika_conf.h b/include/pika_conf.h index 80d5abe8f0..4c01d3830a 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -749,6 +749,8 @@ class PikaConf : public pstd::BaseConf { rsync_timeout_ms_.store(value); } + int binlog_fsync_interval() const; + void SetProtoMaxBulkLen(const int64_t value) { std::lock_guard l(rwlock_); TryPushDiffCommands("proto-max-bulk-len", std::to_string(value)); @@ -1065,6 +1067,9 @@ class PikaConf : public pstd::BaseConf { //Internal used metrics Persisted by pika.conf std::unordered_set internal_used_unfinished_full_sync_; + + // Binlog fsync interval + int binlog_fsync_interval_; }; #endif diff --git a/include/pika_consensus.h b/include/pika_consensus.h index 78e20eb3ab..7ce7a3cf9a 100644 --- a/include/pika_consensus.h +++ b/include/pika_consensus.h @@ -257,7 +257,7 @@ class ConsensusCoordinator { pstd::Status AppendSlaveEntries(const std::shared_ptr& cmd_ptr, const BinlogItem& attribute); pstd::Status CommitAppLog(const LogOffset& master_committed_id); pstd::Status UpdateCommittedID(); - pstd::Status ApplyBinlog(const std::shared_ptr& cmd_ptr); + pstd::Status ApplyBinlog(const std::vector& logs); pstd::Status ProcessCoordination(); LogOffset GetCommittedId() { @@ -276,7 +276,10 @@ class ConsensusCoordinator { std::lock_guard l(committed_id_rwlock_); committed_id_ = offset; context_->UpdateAppliedIndex(committed_id_); + committed_id_cv_.notify_all(); } + pstd::Mutex* GetCommittedIdMu() { return &committed_id_mu_; } + pstd::CondVar* GetCommittedIdCv() { return &committed_id_cv_; } private: pstd::Status PersistAppendBinlog(const std::shared_ptr& cmd_ptr, LogOffset& cur_offset); @@ -285,10 +288,13 @@ class ConsensusCoordinator { std::shared_mutex is_consistency_rwlock_; bool is_consistency_ = false; std::shared_mutex committed_id_rwlock_; + pstd::Mutex committed_id_mu_; + pstd::CondVar committed_id_cv_; LogOffset committed_id_ = LogOffset(); std::shared_mutex prepared_id__rwlock_; LogOffset prepared_id_ = LogOffset(); std::shared_ptr logs_; + int binlog_fsync_counter_ = 0; }; #endif // INCLUDE_PIKA_CONSENSUS_H_ diff --git a/include/pika_repl_bgworker.h b/include/pika_repl_bgworker.h index dd62622fb9..bfb1b0185c 100644 --- a/include/pika_repl_bgworker.h +++ b/include/pika_repl_bgworker.h @@ -42,6 +42,7 @@ class PikaReplBgWorker { net::RedisParser redis_parser_; std::string ip_port_; std::string db_name_; + int binlog_fsync_counter_ = 0; private: net::BGThread bg_thread_; diff --git a/include/pika_repl_client.h b/include/pika_repl_client.h index 4faf8285a3..2eaccc5805 100644 --- a/include/pika_repl_client.h +++ b/include/pika_repl_client.h @@ -88,14 +88,22 @@ class PikaReplClient { async_write_db_task_counts_[db_index].fetch_sub(incr_step, std::memory_order::memory_order_seq_cst); } - int32_t GetUnfinishedAsyncWriteDBTaskCount(const std::string& db_name) { - int32_t db_index = db_name.back() - '0'; - assert(db_index >= 0 && db_index <= 7); - return async_write_db_task_counts_[db_index].load(std::memory_order_seq_cst); - } + int32_t GetUnfinishedAsyncWriteDBTaskCount(const std::string& db_name); + void SignalAsyncWriteDBTaskEnd(const std::string& db_name); + void WaitForAsyncWriteDBTaskEnd(const std::string& db_name); + + // unfinished_async_write_db_tasks related + pstd::Mutex unfinished_async_write_db_tasks_mu_; + std::unordered_map unfinished_async_write_db_tasks_; + pstd::CondVar async_write_db_tasks_cond_; + + // db_write_block_fds_ related + pstd::Mutex db_write_block_fds_mu_; + std::set db_write_block_fds_; private: - size_t GetBinlogWorkerIndexByDBName(const std::string &db_name); + size_t GetBinlogWorkerIndexByDBName(const std::string& db_name); + size_t GetDBWorkerIndexByDBName(const std::string& db_name); size_t GetHashIndexByKey(const std::string& key); void UpdateNextAvail() { next_avail_ = (next_avail_ + 1) % static_cast(write_binlog_workers_.size()); } diff --git a/include/pika_rm.h b/include/pika_rm.h index 709d5722cc..16ca019d9e 100644 --- a/include/pika_rm.h +++ b/include/pika_rm.h @@ -207,6 +207,8 @@ class PikaReplicaManager { const std::shared_ptr& res, const std::shared_ptr& conn, void* res_private_data); void ScheduleWriteDBTask(const std::shared_ptr& cmd_ptr, const std::string& db_name); + void SignalAsyncWriteDBTaskEnd(const std::string& db_name); + void WaitForAsyncWriteDBTaskEnd(const std::string& db_name); void ScheduleReplClientBGTaskByDBName(net::TaskFunc , void* arg, const std::string &db_name); void ReplServerRemoveClientConn(int fd); void ReplServerUpdateClientConnMap(const std::string& ip_port, int fd); @@ -243,8 +245,15 @@ class PikaReplicaManager { pstd::Mutex write_queue_mu_; + // db_name -> a queue of write task + using DBWriteTaskQueue = std::map>; + // ip:port -> a map of DBWriteTaskQueue + using SlaveWriteTaskQueue = std::map; + // every host owns a queue, the key is "ip + port" - std::unordered_map>> write_queues_; + SlaveWriteTaskQueue write_queues_; + + // client for replica std::unique_ptr pika_repl_client_; std::unique_ptr pika_repl_server_; }; diff --git a/src/net/src/pb_conn.cc b/src/net/src/pb_conn.cc index 5185e8f51d..15e4024da0 100644 --- a/src/net/src/pb_conn.cc +++ b/src/net/src/pb_conn.cc @@ -153,7 +153,7 @@ WriteStatus PbConn::SendReply() { if (item_len - write_buf_.item_pos_ != 0) { return kWriteHalf; } - LOG(ERROR) << "write item success"; + //LOG(ERROR) << "write item success"; } return kWriteAll; } diff --git a/src/pika_binlog.cc b/src/pika_binlog.cc index 187d63d8ad..e051108317 100644 --- a/src/pika_binlog.cc +++ b/src/pika_binlog.cc @@ -469,3 +469,10 @@ Status Binlog::Truncate(uint32_t pro_num, uint64_t pro_offset, uint64_t index) { return Status::OK(); } + +Status Binlog::Sync() { + if (queue_) { + return queue_->Sync(); + } + return Status::Corruption("Logger not initialized"); +} \ No newline at end of file diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index a6cd5ec62f..72788e22ad 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -19,6 +19,7 @@ #include "net/src/dispatch_thread.h" #include "net/src/worker_thread.h" #include "src/pstd/include/scope_record_lock.h" +#include #include "rocksdb/perf_context.h" #include "rocksdb/iostats_context.h" @@ -55,6 +56,8 @@ std::shared_ptr PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st c_ptr->SetConn(shared_from_this()); c_ptr->SetResp(resp_ptr); + LOG(INFO) << "PikaClientConn::DoCmd command: " << c_ptr->name() << ", keys: " << c_ptr->current_key().size(); + // Check authed if (AuthRequired()) { // the user is not authed, need to do auth if (!(c_ptr->flag() & kCmdFlagsNoAuth)) { @@ -357,14 +360,64 @@ void PikaClientConn::DoBackgroundTask(void* arg) { } void PikaClientConn::BatchExecRedisCmd(const std::vector& argvs, bool cache_miss_in_rtc) { - resp_num.store(static_cast(argvs.size())); - for (const auto& argv : argvs) { - std::shared_ptr resp_ptr = std::make_shared(); - resp_array.push_back(resp_ptr); - ExecRedisCmd(argv, resp_ptr, cache_miss_in_rtc); + if (argvs.empty()) { + return; } - time_stat_->process_done_ts_ = pstd::NowMicros(); - TryWriteResp(); + if (argvs.size() > 1) { + auto task = std::make_shared(); + task->total_tasks = argvs.size(); + task->resps.resize(argvs.size()); + + for (size_t i = 0; i < argvs.size(); ++i) { + task->resps[i] = std::make_shared(); + std::promise promise; + task->futures.push_back(promise.get_future()); + + g_pika_server->ScheduleClientPool(&PikaClientConn::ParallelExecRedisCmd, new std::tuple(shared_from_this(), argvs[i], task, i, std::move(promise), cache_miss_in_rtc), false, false); + } + + for (auto& f : task->futures) { + f.get(); + } + + for (const auto& resp : task->resps) { + WriteResp(*resp); + } + if (write_completed_cb_) { + write_completed_cb_(); + write_completed_cb_ = nullptr; + } + NotifyEpoll(true); + } else { + resp_num.store(static_cast(argvs.size())); + for (const auto& argv : argvs) { + std::shared_ptr resp_ptr = std::make_shared(); + resp_array.push_back(resp_ptr); + ExecRedisCmd(argv, resp_ptr, cache_miss_in_rtc); + } + time_stat_->process_done_ts_ = pstd::NowMicros(); + TryWriteResp(); + } +} + +void PikaClientConn::ParallelExecRedisCmd(void* arg) { + auto* task_args = static_cast, net::RedisCmdArgsType, std::shared_ptr, size_t, std::promise, bool>*>(arg); + auto [conn, argv, task, index, promise, cache_miss_in_rtc] = std::move(*task_args); + delete task_args; + + std::string opt = argv[0]; + pstd::StringToLower(opt); + if (opt == kClusterPrefix) { + if (argv.size() >= 2) { + opt += argv[1]; + pstd::StringToLower(opt); + } + } + + std::shared_ptr cmd_ptr = conn->DoCmd(argv, opt, task->resps[index], cache_miss_in_rtc); + *(task->resps[index]) = std::move(cmd_ptr->res().message()); + + promise.set_value(); } bool PikaClientConn::ReadCmdInCache(const net::RedisCmdArgsType& argv, const std::string& opt) { diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 94071eac7f..02c1b5d71e 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -528,6 +528,10 @@ int PikaConf::Load() { if (binlog_file_size_ < 1024 || static_cast(binlog_file_size_) > (1024LL * 1024 * 1024)) { binlog_file_size_ = 100 * 1024 * 1024; // 100M } + GetConfInt("binlog-fsync-interval", &binlog_fsync_interval_); + if (binlog_fsync_interval_ < 0) { + binlog_fsync_interval_ = 0; + } GetConfStr("pidfile", &pidfile_); // db sync @@ -912,3 +916,5 @@ std::vector PikaConf::compression_per_level() { } return types; } + +int PikaConf::binlog_fsync_interval() const { return binlog_fsync_interval_; } \ No newline at end of file diff --git a/src/pika_consensus.cc b/src/pika_consensus.cc index ef1960d589..3f9066bb66 100644 --- a/src/pika_consensus.cc +++ b/src/pika_consensus.cc @@ -4,6 +4,8 @@ // of patent rights can be found in the PATENTS file in the same directory. #include +#include +#include #include "include/pika_consensus.h" @@ -67,8 +69,8 @@ void Context::UpdateAppliedIndex(const LogOffset& offset) { LogOffset cur_offset; // TODO: 暂时注释掉这一行,因为applied_win_没有push调用,只有update,窗口永远对不上 //applied_win_.Update(SyncWinItem(offset), SyncWinItem(offset), &cur_offset); - if (cur_offset > applied_index_) { - applied_index_ = cur_offset; + if (offset > applied_index_) { + applied_index_ = offset; StableSave(); } } @@ -382,12 +384,12 @@ Status ConsensusCoordinator::UpdateSlave(const std::string& ip, int port, const std::lock_guard l(slave_ptr->slave_mu); slave_ptr->acked_offset = end; sync_pros_.AddMatchIndex(ip, port, slave_ptr->acked_offset); - LOG(INFO) << "PacificA slave ip: " << ip << ", port :" << port << ",slave acked_offset " - << slave_ptr->acked_offset.ToString(); + // LOG(INFO) << "PacificA slave ip: " << ip << ", port :" << port << ",slave acked_offset " + // << slave_ptr->acked_offset.ToString(); if (slave_ptr->slave_state != kSlaveBinlogSync && slave_ptr->acked_offset >= slave_ptr->target_offset) { slave_ptr->slave_state = kSlaveBinlogSync; - LOG(INFO) << "PacificA change slave_state kSlaveBinlogSync acked_offset: " << slave_ptr->acked_offset.ToString() - << ", target_offset: " << slave_ptr->target_offset.ToString(); + // LOG(INFO) << "PacificA change slave_state kSlaveBinlogSync acked_offset: " << slave_ptr->acked_offset.ToString() + // << ", target_offset: " << slave_ptr->target_offset.ToString(); } } } else { @@ -813,7 +815,7 @@ bool ConsensusCoordinator::GetISConsistency() { bool ConsensusCoordinator::checkFinished(const LogOffset& offset) { //TODO: 暂时加了读写锁,后期考虑替换为原子变量 - std::lock_guard l(committed_id_rwlock_); + std::lock_guard l(committed_id_mu_); if (offset <= committed_id_) { return true; } @@ -823,26 +825,36 @@ bool ConsensusCoordinator::checkFinished(const LogOffset& offset) { //// pacificA private: Status ConsensusCoordinator::PersistAppendBinlog(const std::shared_ptr& cmd_ptr, LogOffset& cur_offset) { + std::lock_guard l(order_mu_); std::string content = cmd_ptr->ToRedisProtocol(); std::string binlog = std::string(); LogOffset offset = LogOffset(); Status s = stable_logger_->Logger()->Put(content, &offset, binlog); - LOG(INFO) << "PacificA binlog_offset :" << offset.ToString(); + //LOG(INFO) << "PacificA binlog_offset :" << offset.ToString(); cur_offset = offset; if (!s.ok()) { - std::string db_name = cmd_ptr->db_name().empty() ? g_pika_conf->default_db() : cmd_ptr->db_name(); - std::shared_ptr db = g_pika_server->GetDB(db_name); - if (db) { - db->SetBinlogIoError(); - } + // std::string db_name = cmd_ptr->db_name().empty() ? g_pika_conf->default_db() : cmd_ptr->db_name(); + // std::shared_ptr db = g_pika_server->GetDB(db_name); + // if (db) { + // db->SetBinlogIoError(); + // } return s; } + + if (++binlog_fsync_counter_ % g_pika_conf->binlog_fsync_interval() == 0) { + s = stable_logger_->Logger()->Sync(); + if (!s.ok()) { + LOG(WARNING) << "Failed to sync binlog to disk on master: " << s.ToString(); + return s; + } + } + // If successful, append the log entry to the logs // TODO: 这里logs_的appendlog操作和上边的stable_logger_->Logger()->Put不是原子的,可能导致offset大的先被追加到logs_中, // 多线程写入的时候窗口会对不上,最终主从断开连接。需要加逻辑保证原子性 logs_->AppendLog(Log::LogItem(cur_offset, cmd_ptr, binlog)); - + //LOG(INFO) << "After AppendLog: logs_->Size()=" << logs_->Size() << ", logs_->LastOffset()=" << logs_->LastOffset().ToString(); SetPreparedId(cur_offset); return stable_logger_->Logger()->IsOpened(); @@ -860,6 +872,11 @@ Status ConsensusCoordinator::AppendEntries(const std::shared_ptr& cmd_ptr, // make sure stable log and mem log consistent Status s = PersistAppendBinlog(cmd_ptr, cur_logoffset); if (!s.ok()) { + std::string db_name = cmd_ptr->db_name().empty() ? g_pika_conf->default_db() : cmd_ptr->db_name(); + std::shared_ptr db = g_pika_server->GetDB(db_name); + if (db) { + db->SetBinlogIoError(); + } return s; } @@ -867,6 +884,11 @@ Status ConsensusCoordinator::AppendEntries(const std::shared_ptr& cmd_ptr, return Status::OK(); } Status ConsensusCoordinator::AppendSlaveEntries(const std::shared_ptr& cmd_ptr, const BinlogItem& attribute) { + BinlogOffset b_offset(attribute.filenum(), attribute.offset()); + LogicOffset l_offset(attribute.term_id(), attribute.logic_id()); + LogOffset log_offset(b_offset, l_offset); + //LOG(INFO) << "Received binlog from master: " << log_offset.ToString() << " for db: " << db_name_; + LogOffset last_index = logs_->LastOffset(); if (attribute.logic_id() < last_index.l_offset.index) { LOG(WARNING) << DBInfo(db_name_).ToString() << "Drop log from leader logic_id " << attribute.logic_id() @@ -885,17 +907,23 @@ Status ConsensusCoordinator::AppendSlaveEntries(const std::shared_ptr& cmd_ * @brief Commit logs up to the given offset and update the committed ID. */ Status ConsensusCoordinator::CommitAppLog(const LogOffset& master_committed_id) { + //LOG(INFO) << "Slave CommitAppLog for db " << db_name_ << ", master_committed_id: " << master_committed_id.ToString(); int index = logs_->FindOffset(logs_->FirstOffset()); int log_size = logs_->Size(); // Cache log size + std::vector logs_to_apply; for (int i = index; i < log_size; ++i) { Log::LogItem log = logs_->At(i); if (master_committed_id >= log.offset) { - LOG(INFO) << "PacificA master_committed_id: " << master_committed_id.ToString() - << ", ApplyLog: " << log.offset.ToString(); - ApplyBinlog(log.cmd_ptr); + logs_to_apply.push_back(log); + } else { + break; } } + if (!logs_to_apply.empty()) { + ApplyBinlog(logs_to_apply); + } + logs_->TruncateFrom(master_committed_id); // Truncate logs SetCommittedId(master_committed_id); // Update committed ID return Status::OK(); @@ -923,7 +951,7 @@ Status ConsensusCoordinator::UpdateCommittedID() { return Status::Error("slave_prepared_id < master_committedId"); } SetCommittedId(slave_prepared_id); - LOG(INFO) << "PacificA update CommittedID: " << GetCommittedId().ToString(); + //LOG(INFO) << "PacificA update CommittedID: " << GetCommittedId().ToString(); return Status::OK(); } Status ConsensusCoordinator::ProcessCoordination() { @@ -946,48 +974,63 @@ Status ConsensusCoordinator::ProcessCoordination() { return Status::OK(); } // Execute the operation of writing to DB -Status ConsensusCoordinator::ApplyBinlog(const std::shared_ptr& cmd_ptr) { - auto opt = cmd_ptr->argv()[0]; - if (pstd::StringToLower(opt) != kCmdNameFlushdb) { - InternalApplyFollower(cmd_ptr); - } else { - int32_t wait_ms = 250; - while (g_pika_rm->GetUnfinishedAsyncWriteDBTaskCount(db_name_) > 0) { - // TODO: 暂时去掉了sleep的逻辑,考虑使用条件变量唤醒 - //std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms)); - wait_ms *= 2; - wait_ms = wait_ms < 3000 ? wait_ms : 3000; +Status ConsensusCoordinator::ApplyBinlog(const std::vector& logs) { + for (const auto& log : logs) { + const auto& cmd_ptr = log.cmd_ptr; + auto opt = cmd_ptr->argv()[0]; + LOG(INFO) << "Slave ApplyBinlog for db " << db_name_ << ", command: " << opt; + if (pstd::StringToLower(opt) != kCmdNameFlushdb) { + PikaReplBgWorker::WriteDBInSyncWay(cmd_ptr); + } else { + // int32_t wait_ms = 250; + // while (g_pika_rm->GetUnfinishedAsyncWriteDBTaskCount(db_name_) > 0) { + // // TODO: 暂时去掉了sleep的逻辑,考虑使用条件变量唤醒 + // //std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms)); + // wait_ms *= 2; + // wait_ms = wait_ms < 3000 ? wait_ms : 3000; + // } + g_pika_rm->WaitForAsyncWriteDBTaskEnd(db_name_); + PikaReplBgWorker::WriteDBInSyncWay(cmd_ptr); } - PikaReplBgWorker::WriteDBInSyncWay(cmd_ptr); } - return Status::OK(); } Status ConsensusCoordinator::SendBinlog(std::shared_ptr slave_ptr, std::string db_name) { std::vector tasks; + if (!g_pika_server->IsConsistency()) { + return Status::OK(); + } + std::string ip = slave_ptr->Ip(); + int port = slave_ptr->Port(); + int32_t session_id = slave_ptr->SessionId(); - // Check if there are new log entries that need to be sent to the slave - if (logs_->LastOffset() >= slave_ptr->acked_offset) { - // Find the index of the log entry corresponding to the slave's acknowledged offset - int index = logs_->FindOffset(slave_ptr->acked_offset); - if (index < logs_->Size()) { - for (int i = index; i < logs_->Size(); ++i) { - const Log::LogItem& item = logs_->At(i); - - slave_ptr->SetLastSendTime(pstd::NowMicros()); + LogOffset last_sent = slave_ptr->sent_offset; + if (logs_->LastOffset() > last_sent) { + int send_start_index = logs_->FindOffset(last_sent); + if (send_start_index < 0) { + LOG(WARNING) << "Binlog offset not found, maybe purged. last_sent: " << last_sent.ToString(); + return Status::Corruption("cant find the file_num"); + } - RmNode rm_node(slave_ptr->Ip(), slave_ptr->Port(), slave_ptr->DBName(), slave_ptr->SessionId()); - WriteTask task(rm_node, BinlogChip(item.offset, item.binlog_), slave_ptr->sent_offset, GetCommittedId()); - tasks.emplace_back(std::move(task)); + if (send_start_index < logs_->Size() && logs_->At(send_start_index).offset == last_sent) { + send_start_index++; + } - slave_ptr->sent_offset = item.offset; - } + if (send_start_index < logs_->Size()) { + LogOffset prev_offset = send_start_index > 0 ? logs_->At(send_start_index - 1).offset : LogOffset(); + const auto& item = logs_->At(send_start_index); + RmNode rm_node(ip, port, db_name, session_id); + WriteTask task(rm_node, BinlogChip(item.offset, item.binlog_), prev_offset, GetCommittedId()); + tasks.emplace_back(std::move(task)); } } if (!tasks.empty()) { - g_pika_rm->ProduceWriteQueue(slave_ptr->Ip(), slave_ptr->Port(), db_name, tasks); + g_pika_rm->ProduceWriteQueue(ip, port, db_name, tasks); + slave_ptr->sent_offset = tasks.back().binlog_chip_.offset_; + // LOG(INFO) << "SendBinlog tasks to queue, slave: " << ip << ":" << port << " tasks: " << tasks.size() + // << " new sent_offset: " << slave_ptr->sent_offset.ToString(); } return Status::OK(); } diff --git a/src/pika_repl_bgworker.cc b/src/pika_repl_bgworker.cc index 5340533160..6b43e8adc1 100644 --- a/src/pika_repl_bgworker.cc +++ b/src/pika_repl_bgworker.cc @@ -5,12 +5,17 @@ #include "include/pika_repl_bgworker.h" +#include + +#include +#include #include #include "include/pika_cmd_table_manager.h" #include "include/pika_conf.h" #include "include/pika_rm.h" #include "include/pika_server.h" +#include "pstd/include/pstd_string.h" #include "pstd/include/pstd_defer.h" #include "src/pstd/include/scope_record_lock.h" #include "include/pika_conf.h" @@ -53,6 +58,9 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { PikaReplBgWorker* worker = task_arg->worker; worker->ip_port_ = conn->ip_port(); + // LOG(INFO) << "HandleBGWorkerWriteBinlog: Received binlog from master " << worker->ip_port_ + // << ", index size: " << index->size(); + DEFER { delete index; delete task_arg; @@ -94,8 +102,10 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { if (only_keepalive) { ack_start = LogOffset(); + ack_end = LogOffset(); } else { ack_start = pb_begin; + ack_end = pb_end; } // because DispatchBinlogRes() have been order them. @@ -115,6 +125,7 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { return; } + int processed_count = 0; for (int i : *index) { const InnerMessage::InnerResponse::BinlogSync& binlog_res = res->binlog_sync(i); // if pika are not current a slave or DB not in @@ -136,6 +147,7 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { if(db->GetISConsistency()){ const InnerMessage::BinlogOffset& committed_id = binlog_res.committed_id(); LogOffset master_committed_id(BinlogOffset(committed_id.filenum(),committed_id.offset()),LogicOffset(committed_id.term(),committed_id.index())); + //LOG(INFO) << "Processing committed_id from master: " << master_committed_id.ToString(); Status s= db->CommitAppLog(master_committed_id); if(!s.ok()){ return; @@ -145,13 +157,16 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { if (binlog_res.binlog().empty()) { continue; } - if (!PikaBinlogTransverter::BinlogItemWithoutContentDecode(TypeFirst, binlog_res.binlog(), &worker->binlog_item_)) { + + const std::string& binlog_str = binlog_res.binlog(); + + if (!PikaBinlogTransverter::BinlogItemWithoutContentDecode(TypeFirst, binlog_str, &worker->binlog_item_)) { LOG(WARNING) << "Binlog item decode failed"; slave_db->SetReplState(ReplState::kTryConnect); return; } - const char* redis_parser_start = binlog_res.binlog().data() + BINLOG_ENCODE_LEN; - int redis_parser_len = static_cast(binlog_res.binlog().size()) - BINLOG_ENCODE_LEN; + const char* redis_parser_start = binlog_str.data() + BINLOG_ENCODE_LEN; + int redis_parser_len = static_cast(binlog_str.size()) - BINLOG_ENCODE_LEN; int processed_len = 0; net::RedisParserStatus ret = worker->redis_parser_.ProcessInputBuffer(redis_parser_start, redis_parser_len, &processed_len); @@ -159,16 +174,15 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { LOG(WARNING) << "Redis parser failed"; slave_db->SetReplState(ReplState::kTryConnect); return; - } - db = g_pika_rm->GetSyncMasterDBByName(DBInfo(worker->db_name_)); - if (!db) { - LOG(WARNING) << "DB " << worker->db_name_ << " Not Found"; - return; - } + } + processed_count++; } + + //LOG(INFO) << "Successfully processed " << processed_count << " binlog entries"; if (only_keepalive) { ack_end = LogOffset(); + LOG(INFO) << "Sending keepalive ACK to master"; } else { LogOffset productor_status; // Reply Ack to master immediately @@ -177,6 +191,17 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { &productor_status.l_offset.term, &productor_status.l_offset.index); ack_end = productor_status; ack_end.l_offset.term = pb_end.l_offset.term; + + //Force flush to ensure data persistence + if (++(worker->binlog_fsync_counter_) % g_pika_conf->binlog_fsync_interval() == 0) { + Status s = logger->Sync(); + if (!s.ok()) { + LOG(WARNING) << "Failed to sync binlog to disk: " << s.ToString(); + return; + } + } + // LOG(INFO) << "Synced binlog to disk, sending ACK to master from " + // << ack_start.ToString() << " to " << ack_end.ToString(); } g_pika_rm->SendBinlogSyncAckRequest(db_name, ack_start, ack_end); @@ -226,8 +251,8 @@ int PikaReplBgWorker::HandleWriteBinlog(net::RedisParser* parser, const net::Red } void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) { - std::unique_ptr task_arg(static_cast(arg)); - const std::shared_ptr c_ptr = task_arg->cmd_ptr; + std::unique_ptr> cmd_ptr_ptr(static_cast*>(arg)); + const std::shared_ptr c_ptr = *cmd_ptr_ptr; WriteDBInSyncWay(c_ptr); } diff --git a/src/pika_repl_client.cc b/src/pika_repl_client.cc index 80b9b4b7bb..80dc7f1ced 100644 --- a/src/pika_repl_client.cc +++ b/src/pika_repl_client.cc @@ -119,18 +119,53 @@ void PikaReplClient::ScheduleWriteBinlogTask(const std::string& db_name, } void PikaReplClient::ScheduleWriteDBTask(const std::shared_ptr& cmd_ptr, const std::string& db_name) { + std::lock_guard lock(unfinished_async_write_db_tasks_mu_); const PikaCmdArgsType& argv = cmd_ptr->argv(); std::string dispatch_key = argv.size() >= 2 ? argv[1] : argv[0]; size_t index = GetHashIndexByKey(dispatch_key); - auto task_arg = new ReplClientWriteDBTaskArg(cmd_ptr); + auto task_arg = new std::shared_ptr(cmd_ptr); - IncrAsyncWriteDBTaskCount(db_name, 1); - std::function task_finish_call_back = [this, db_name]() { this->DecrAsyncWriteDBTaskCount(db_name, 1); }; + unfinished_async_write_db_tasks_[db_name]++; + LOG(INFO) << "Scheduling WriteDB task for db " << db_name << ", command: " << argv[0] << ". Unfinished tasks: " + << unfinished_async_write_db_tasks_[db_name]; + std::function task_finish_call_back = [this, db_name]() { this->SignalAsyncWriteDBTaskEnd(db_name); }; write_db_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteDB, static_cast(task_arg), task_finish_call_back); } +int32_t PikaReplClient::GetUnfinishedAsyncWriteDBTaskCount(const std::string& db_name) { + std::lock_guard lock(unfinished_async_write_db_tasks_mu_); + if (unfinished_async_write_db_tasks_.find(db_name) == unfinished_async_write_db_tasks_.end()) { + return 0; + } + return unfinished_async_write_db_tasks_.at(db_name); +} + +void PikaReplClient::SignalAsyncWriteDBTaskEnd(const std::string& db_name) { + std::lock_guard lock(unfinished_async_write_db_tasks_mu_); + if (unfinished_async_write_db_tasks_.find(db_name) != unfinished_async_write_db_tasks_.end()) { + unfinished_async_write_db_tasks_[db_name]--; + LOG(INFO) << "Finished WriteDB task for db " << db_name << ". Unfinished tasks: " << unfinished_async_write_db_tasks_[db_name]; + if (unfinished_async_write_db_tasks_[db_name] == 0) { + LOG(INFO) << "All WriteDB tasks finished for db " << db_name << ". Notifying waiting threads."; + async_write_db_tasks_cond_.notify_all(); + } + } +} + +void PikaReplClient::WaitForAsyncWriteDBTaskEnd(const std::string& db_name) { + std::unique_lock lock(unfinished_async_write_db_tasks_mu_); + if (unfinished_async_write_db_tasks_.count(db_name) && unfinished_async_write_db_tasks_[db_name] > 0) { + LOG(INFO) << "Waiting for " << unfinished_async_write_db_tasks_[db_name] + << " async write DB tasks to end for db " << db_name; + } + while (unfinished_async_write_db_tasks_.count(db_name) && unfinished_async_write_db_tasks_[db_name] > 0) { + async_write_db_tasks_cond_.wait(lock); + } + LOG(INFO) << "Finished waiting for async write DB tasks for db " << db_name; +} + size_t PikaReplClient::GetBinlogWorkerIndexByDBName(const std::string &db_name) { char db_num_c = db_name.back(); int32_t db_num = db_num_c - '0'; @@ -144,6 +179,10 @@ size_t PikaReplClient::GetBinlogWorkerIndexByDBName(const std::string &db_name) return db_num % write_binlog_workers_.size(); } +size_t PikaReplClient::GetDBWorkerIndexByDBName(const std::string& db_name) { + return std::hash()(db_name) % write_db_workers_.size(); +} + size_t PikaReplClient::GetHashIndexByKey(const std::string& key) { size_t hash_base = write_db_workers_.size(); return (str_hash(key) % hash_base); diff --git a/src/pika_repl_server.cc b/src/pika_repl_server.cc index c8f1c9f9dc..bf392a3ecd 100644 --- a/src/pika_repl_server.cc +++ b/src/pika_repl_server.cc @@ -6,6 +6,7 @@ #include "include/pika_repl_server.h" #include +#include #include "include/pika_conf.h" #include "include/pika_rm.h" @@ -53,6 +54,7 @@ int PikaReplServer::Stop() { pstd::Status PikaReplServer::SendSlaveBinlogChips(const std::string& ip, int port, const std::vector& tasks) { + //LOG(INFO) << "SendSlaveBinlogChips to " << ip << ":" << port << " with " << tasks.size() << " tasks"; InnerMessage::InnerResponse response; BuildBinlogSyncResp(tasks, &response); @@ -88,26 +90,50 @@ void PikaReplServer::BuildBinlogOffset(const LogOffset& offset, InnerMessage::Bi } void PikaReplServer::BuildBinlogSyncResp(const std::vector& tasks, InnerMessage::InnerResponse* response) { + //LOG(INFO) << "BuildBinlogSyncResp with " << tasks.size() << " tasks"; response->set_code(InnerMessage::kOk); response->set_type(InnerMessage::Type::kBinlogSync); + // Unpack the batch package and serialize it item by item for (const auto& task : tasks) { + const std::string& binlog = task.binlog_chip_.binlog_; + // if (binlog.size() >= 8 && *reinterpret_cast(binlog.data()) == htonl(PIKA_BATCH_MAGIC)) { + // // This is a batch + // size_t offset = sizeof(uint32_t); + // while (offset < binlog.size()) { + // uint32_t len = ntohl(*reinterpret_cast(binlog.data() + offset)); + // offset += sizeof(uint32_t); + + // InnerMessage::InnerResponse::BinlogSync* binlog_sync = response->add_binlog_sync(); + // binlog_sync->set_session_id(task.rm_node_.SessionId()); + // InnerMessage::Slot* db = binlog_sync->mutable_slot(); + // db->set_db_name(task.rm_node_.DBName()); + // db->set_slot_id(0); + + // // We use the offset of the last item in the batch for the whole batch + // InnerMessage::BinlogOffset* boffset = binlog_sync->mutable_binlog_offset(); + // BuildBinlogOffset(task.binlog_chip_.offset_, boffset); + // if (g_pika_server->IsConsistency()) { + // InnerMessage::BinlogOffset* committed_id = binlog_sync->mutable_committed_id(); + // BuildBinlogOffset(task.committed_id_, committed_id); + // } + // binlog_sync->set_binlog(binlog.data() + offset, len); + // offset += len; + // } + // } else { + // This is a single log InnerMessage::InnerResponse::BinlogSync* binlog_sync = response->add_binlog_sync(); binlog_sync->set_session_id(task.rm_node_.SessionId()); InnerMessage::Slot* db = binlog_sync->mutable_slot(); db->set_db_name(task.rm_node_.DBName()); - /* - * Since the slot field is written in protobuffer, - * slot_id is set to the default value 0 for compatibility - * with older versions, but slot_id is not used - */ db->set_slot_id(0); InnerMessage::BinlogOffset* boffset = binlog_sync->mutable_binlog_offset(); BuildBinlogOffset(task.binlog_chip_.offset_, boffset); - if(g_pika_server->IsConsistency()){ + if (g_pika_server->IsConsistency()) { InnerMessage::BinlogOffset* committed_id = binlog_sync->mutable_committed_id(); BuildBinlogOffset(task.committed_id_, committed_id); } - binlog_sync->set_binlog(task.binlog_chip_.binlog_); + binlog_sync->set_binlog(binlog); + //} } } diff --git a/src/pika_repl_server_conn.cc b/src/pika_repl_server_conn.cc index 091c85a0de..f55a4c62d4 100644 --- a/src/pika_repl_server_conn.cc +++ b/src/pika_repl_server_conn.cc @@ -21,6 +21,7 @@ PikaReplServerConn::PikaReplServerConn(int fd, const std::string& ip_port, net:: PikaReplServerConn::~PikaReplServerConn() = default; void PikaReplServerConn::HandleMetaSyncRequest(void* arg) { + //LOG(INFO) << "ReplServer BG thread handle MetaSync Request"; std::unique_ptr task_arg(static_cast(arg)); const std::shared_ptr req = task_arg->req; std::shared_ptr conn = task_arg->conn; @@ -100,6 +101,7 @@ void PikaReplServerConn::HandleMetaSyncRequest(void* arg) { } void PikaReplServerConn::HandleTrySyncRequest(void* arg) { + //LOG(INFO) << "ReplServer BG thread handle TrySync Request"; std::unique_ptr task_arg(static_cast(arg)); const std::shared_ptr req = task_arg->req; std::shared_ptr conn = task_arg->conn; @@ -275,6 +277,7 @@ bool PikaReplServerConn::TrySyncOffsetCheck(const std::shared_ptr& } void PikaReplServerConn::HandleDBSyncRequest(void* arg) { + //LOG(INFO) << "ReplServer BG thread handle DBSync Request"; std::unique_ptr task_arg(static_cast(arg)); const std::shared_ptr req = task_arg->req; std::shared_ptr conn = task_arg->conn; @@ -356,6 +359,7 @@ void PikaReplServerConn::HandleDBSyncRequest(void* arg) { } void PikaReplServerConn::HandleBinlogSyncRequest(void* arg) { + //LOG(INFO) << "ReplServer BG thread handle BinlogSync Request"; std::unique_ptr task_arg(static_cast(arg)); const std::shared_ptr req = task_arg->req; std::shared_ptr conn = task_arg->conn; @@ -435,6 +439,7 @@ void PikaReplServerConn::HandleBinlogSyncRequest(void* arg) { } void PikaReplServerConn::HandleRemoveSlaveNodeRequest(void* arg) { + //LOG(INFO) << "ReplServer BG thread handle RemoveSlaveNode Request"; std::unique_ptr task_arg(static_cast(arg)); const std::shared_ptr req = task_arg->req; std::shared_ptr conn = task_arg->conn; diff --git a/src/pika_rm.cc b/src/pika_rm.cc index 9c777339ab..66e9551803 100644 --- a/src/pika_rm.cc +++ b/src/pika_rm.cc @@ -186,6 +186,9 @@ Status SyncMasterDB::ReadBinlogFileToWq(const std::shared_ptr& slave_ } if (!tasks.empty()) { + LOG(INFO) << "Batch sending " << tasks.size() << " logs to slave " << slave_ptr->Ip() << ":" << slave_ptr->Port() + << ", first offset: " << tasks.front().binlog_chip_.offset_.ToString() + << ", last offset: " << tasks.back().binlog_chip_.offset_.ToString(); g_pika_rm->ProduceWriteQueue(slave_ptr->Ip(), slave_ptr->Port(), db_info_.db_name_, tasks); } return Status::OK(); @@ -229,25 +232,24 @@ Status SyncMasterDB::GetSlaveState(const std::string& ip, int port, SlaveState* } Status SyncMasterDB::WakeUpSlaveBinlogSync() { + //LOG(INFO) << "Master DB (" << db_info_.db_name_ << ") WakeUpSlaveBinlogSync"; std::unordered_map> slaves = GetAllSlaveNodes(); std::vector> to_del; for (auto& slave_iter : slaves) { std::shared_ptr slave_ptr = slave_iter.second; std::lock_guard l(slave_ptr->slave_mu); - if (slave_ptr->sent_offset == slave_ptr->acked_offset) { - Status s; - if (coordinator_.GetISConsistency()) { - if(slave_ptr->slave_state == SlaveState::kSlaveBinlogSync||slave_ptr->slave_state == SlaveState::KCandidate){ - s = coordinator_.SendBinlog(slave_ptr, db_info_.db_name_); - } - } else { - s = ReadBinlogFileToWq(slave_ptr); - } - if (!s.ok()) { - to_del.push_back(slave_ptr); - LOG(WARNING) << "WakeUpSlaveBinlogSync failed, marking for deletion: " - << slave_ptr->ToStringStatus() << " - " << s.ToString(); + Status s; + if (coordinator_.GetISConsistency()) { + if(slave_ptr->slave_state == SlaveState::kSlaveBinlogSync||slave_ptr->slave_state == SlaveState::KCandidate){ + s = coordinator_.SendBinlog(slave_ptr, db_info_.db_name_); } + } else { + s = ReadBinlogFileToWq(slave_ptr); + } + if (!s.ok()) { + to_del.push_back(slave_ptr); + LOG(WARNING) << "WakeUpSlaveBinlogSync failed, marking for deletion: " + << slave_ptr->ToStringStatus() << " - " << s.ToString(); } } @@ -428,7 +430,11 @@ Status SyncMasterDB::ProcessCoordination(){ return coordinator_.ProcessCoordination(); } Status SyncMasterDB::UpdateCommittedID(){ - return coordinator_.UpdateCommittedID(); + Status s = coordinator_.UpdateCommittedID(); + if (s.ok()) { + coordinator_.GetCommittedIdCv()->notify_all(); + } + return s; } Status SyncMasterDB::Truncate(const LogOffset& offset){ return coordinator_.Truncate(offset); @@ -476,26 +482,39 @@ Status SyncMasterDB::ConsensusProposeLog(const std::shared_ptr& cmd_ptr) { if (!coordinator_.GetISConsistency()) { return coordinator_.ProposeLog(cmd_ptr); } + //LOG(INFO) << "Master DB (" << db_info_.db_name_ << ") ConsensusProposeLog"; - auto start = std::chrono::steady_clock::now(); + //auto start = std::chrono::steady_clock::now(); LogOffset offset; Status s = coordinator_.AppendEntries(cmd_ptr, offset); // Append the log entry to the coordinator + // g_pika_rm->WakeUpBinlogSync(); if (!s.ok()) { return s; } // Wait for consensus to be achieved within 10 seconds - while (std::chrono::duration_cast(std::chrono::steady_clock::now() - start).count() < 10) { - // Check if consensus has been achieved for the given log offset - if (checkFinished(offset)) { - return Status::OK(); + // while (std::chrono::duration_cast(std::chrono::steady_clock::now() - start).count() < 10) { + // // Check if consensus has been achieved for the given log offset + // if (checkFinished(offset)) { + // return Status::OK(); + // } + // // TODO: 这里暂时注掉了sleep等待,50ms耗时过长,影响写入链路,后期需要改成条件变量唤醒方式 + // //std::this_thread::sleep_for(std::chrono::milliseconds(50)); + // } + + // Wait for consensus to be achieved using condition variable + pstd::Mutex* mu = coordinator_.GetCommittedIdMu(); + pstd::CondVar* cv = coordinator_.GetCommittedIdCv(); + std::unique_lock lock(*mu); + + auto timeout = std::chrono::seconds(10); + while (offset > coordinator_.GetCommittedId()) { + if (cv->wait_for(lock, timeout) == std::cv_status::timeout) { + return Status::Timeout("No consistency achieved within 10 seconds"); } - // TODO: 这里暂时注掉了sleep等待,50ms耗时过长,影响写入链路,后期需要改成条件变量唤醒方式 - //std::this_thread::sleep_for(std::chrono::milliseconds(50)); } - - return Status::Timeout("No consistency achieved within 10 seconds"); + return Status::OK(); } @@ -696,6 +715,7 @@ void PikaReplicaManager::InitDB() { void PikaReplicaManager::ProduceWriteQueue(const std::string& ip, int port, std::string db_name, const std::vector& tasks) { + //LOG(INFO) << "ProduceWriteQueue for " << ip << ":" << port << " db " << db_name << " task_num:" << tasks.size(); std::lock_guard l(write_queue_mu_); std::string index = ip + ":" + std::to_string(port); for (auto& task : tasks) { @@ -704,65 +724,75 @@ void PikaReplicaManager::ProduceWriteQueue(const std::string& ip, int port, std: } int PikaReplicaManager::ConsumeWriteQueue() { - std::unordered_map>> to_send_map; + // A list of sending jobs to be executed outside the lock. + // Each job is a tuple of (ip, port, tasks_to_send). + std::vector>> all_sends; int counter = 0; + + // === Start of Critical Section === + // Scope for the lock_guard. We prepare all batches here. { std::lock_guard l(write_queue_mu_); - for (auto& iter : write_queues_) { - const std::string& ip_port = iter.first; - std::unordered_map>& p_map = iter.second; - for (auto& db_queue : p_map) { - std::queue& queue = db_queue.second; - for (int i = 0; i < kBinlogSendPacketNum; ++i) { - if (queue.empty()) { - break; - } - size_t batch_index = queue.size() > kBinlogSendBatchNum ? kBinlogSendBatchNum : queue.size(); - std::vector to_send; - size_t batch_size = 0; - for (size_t i = 0; i < batch_index; ++i) { - WriteTask& task = queue.front(); - batch_size += task.binlog_chip_.binlog_.size(); - // make sure SerializeToString will not over 2G - if (batch_size > PIKA_MAX_CONN_RBUF_HB) { - break; - } - to_send.push_back(task); - queue.pop(); - counter++; - } - if (!to_send.empty()) { - to_send_map[ip_port].push_back(std::move(to_send)); - } + auto slave_iter = write_queues_.begin(); + while (slave_iter != write_queues_.end()) { + std::string ip; + int port = 0; + if (!pstd::ParseIpPortString(slave_iter->first, ip, port)) { + LOG(WARNING) << "Parse ip_port error " << slave_iter->first; + slave_iter = write_queues_.erase(slave_iter); + continue; + } + + auto& p_map = slave_iter->second; + auto db_iter = p_map.begin(); + while (db_iter != p_map.end()) { + auto& queue = db_iter->second; + if (queue.empty()) { + db_iter = p_map.erase(db_iter); + continue; + } + + std::vector to_send; + while (!queue.empty()) { + to_send.push_back(queue.front()); + queue.pop(); + } + + if (!to_send.empty()) { + all_sends.emplace_back(ip, port, std::move(to_send)); + } + + if (queue.empty()) { + db_iter = p_map.erase(db_iter); + } else { + ++db_iter; } } - } - } - std::vector to_delete; - for (auto& iter : to_send_map) { - std::string ip; - int port = 0; - if (!pstd::ParseIpPortString(iter.first, ip, port)) { - LOG(WARNING) << "Parse ip_port error " << iter.first; - continue; - } - for (auto& to_send : iter.second) { - Status s = pika_repl_server_->SendSlaveBinlogChips(ip, port, to_send); - if (!s.ok()) { - LOG(WARNING) << "send binlog to " << ip << ":" << port << " failed, " << s.ToString(); - to_delete.push_back(iter.first); - continue; + if (p_map.empty()) { + slave_iter = write_queues_.erase(slave_iter); + } else { + ++slave_iter; } } } + // === End of Critical Section === - if (!to_delete.empty()) { - std::lock_guard l(write_queue_mu_); - for (auto& del_queue : to_delete) { - write_queues_.erase(del_queue); + // Now, execute all the prepared network IO jobs outside the lock. + for (auto& send_job : all_sends) { + std::string& ip = std::get<0>(send_job); + int port = std::get<1>(send_job); + std::vector& to_send = std::get<2>(send_job); + + counter += to_send.size(); + Status s = pika_repl_server_->SendSlaveBinlogChips(ip, port, to_send); + if (!s.ok()) { + LOG(WARNING) << "send binlog to " << ip << ":" << port << " failed, " << s.ToString(); + // Drop the slave connection and any remaining items in its queue on failure. + DropItemInWriteQueue(ip, port); } } + return counter; } @@ -802,6 +832,14 @@ void PikaReplicaManager::ScheduleWriteDBTask(const std::shared_ptr& cmd_ptr pika_repl_client_->ScheduleWriteDBTask(cmd_ptr, db_name); } +void PikaReplicaManager::SignalAsyncWriteDBTaskEnd(const std::string& db_name) { + pika_repl_client_->SignalAsyncWriteDBTaskEnd(db_name); +} + +void PikaReplicaManager::WaitForAsyncWriteDBTaskEnd(const std::string& db_name) { + pika_repl_client_->WaitForAsyncWriteDBTaskEnd(db_name); +} + void PikaReplicaManager::ReplServerRemoveClientConn(int fd) { pika_repl_server_->RemoveClientConn(fd); } void PikaReplicaManager::ReplServerUpdateClientConnMap(const std::string& ip_port, int fd) { @@ -825,10 +863,10 @@ Status PikaReplicaManager::UpdateSyncBinlogStatus(const RmNode& slave, const Log return s; } } - s = db->SyncBinlogToWq(slave.Ip(), slave.Port()); - if (!s.ok()) { - return s; - } + // s = db->SyncBinlogToWq(slave.Ip(), slave.Port()); + // if (!s.ok()) { + // return s; + // } return Status::OK(); } diff --git a/src/pika_server.cc b/src/pika_server.cc index a7d50b1e71..b67d6d8ae0 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -213,6 +213,7 @@ void PikaServer::Start() { LOG(INFO) << "Pika Server going to start"; rsync_server_->Start(); while (!exit_) { + //LOG(INFO) << "Pika Server start a new round of timing tasks"; DoTimingTask(); // wake up every 5 seconds if (!exit_ && exit_mutex_.try_lock_for(std::chrono::seconds(5))) { @@ -790,13 +791,16 @@ void PikaServer::SetFirstMetaSync(bool v) { void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd, bool is_admin_cmd) { if (is_slow_cmd && g_pika_conf->slow_cmd_pool()) { + //LOG(INFO) << "Schedule task to slow cmd thread pool"; pika_slow_cmd_thread_pool_->Schedule(func, arg); return; } if (is_admin_cmd) { + //LOG(INFO) << "Schedule task to admin cmd thread pool"; pika_admin_cmd_thread_pool_->Schedule(func, arg); return; } + //LOG(INFO) << "Schedule task to client processor thread pool"; pika_client_processor_->SchedulePool(func, arg); } @@ -1094,11 +1098,20 @@ std::unordered_map PikaServer::ServerExecCountDB() { std::unordered_map PikaServer::ServerAllDBStat() { return statistic_.AllDBStat(); } -int PikaServer::SendToPeer() { return g_pika_rm->ConsumeWriteQueue(); } +int PikaServer::SendToPeer() { + //LOG(INFO) << "Pika auxiliary thread SendToPeer"; + return g_pika_rm->ConsumeWriteQueue(); +} -void PikaServer::SignalAuxiliary() { pika_auxiliary_thread_->cv_.notify_one(); } +void PikaServer::SignalAuxiliary() { + //LOG(INFO) << "Signal auxiliary thread to work"; + pika_auxiliary_thread_->cv_.notify_one(); +} -Status PikaServer::TriggerSendBinlogSync() { return g_pika_rm->WakeUpBinlogSync(); } +Status PikaServer::TriggerSendBinlogSync() { + //LOG(INFO) << "Pika auxiliary thread TriggerSendBinlogSync"; + return g_pika_rm->WakeUpBinlogSync(); +} int PikaServer::PubSubNumPat() { return pika_pubsub_thread_->PubSubNumPat(); } diff --git a/tests/integration/clean_start.sh b/tests/integration/clean_start.sh new file mode 100644 index 0000000000..b04ff1fca8 --- /dev/null +++ b/tests/integration/clean_start.sh @@ -0,0 +1,71 @@ +#!/bin/bash + +# 切换到项目根目录 +cd /home/pika/caiyu/pikiwidb || exit + +# 定义清理端口函数 +clean_ports() { + echo "Checking and cleaning ports..." + + sudo killall -9 pika + + # 等待端口完全释放 + sleep 1 +} + +# 编译项目 +echo "Building project..." +sudo ./build.sh +if [ $? -ne 0 ]; then + echo "Build failed!" + exit 1 +fi +echo "Build successful." + +# 清理测试目录 +echo "Cleaning up test directory..." +sudo rm -rf ./output/pacifica_test/ +echo "Cleanup completed." + +# 清理占用端口的进程 +clean_ports + +# 切换到输出目录 +cd output || exit + +# 启动主从服务器 +echo "Starting master and slave servers..." +sudo ../tests/integration/start_master_and_slave.sh +echo "Waiting for servers to fully initialize..." +sleep 10 + +# 设置主从强一致性关系 +echo "Setting up strong consistency replication..." +redis-cli -p 9302 slaveof 127.0.0.1 9301 strong +if [ $? -ne 0 ]; then + echo "Failed to set slaveof." + exit 1 +fi +echo "Replication setup successful." + +# 等待主从复制完成 +echo "Waiting for replication to be fully established..." +sleep 1 + +# 执行 benchmark +echo "Running benchmark..." +redis-benchmark -p 9301 -t set -n 100000 -c 20 --threads 20 +echo "Benchmark finished." + +# 打印日志信息 +echo -e "\n==== 主节点 INFO 日志 ====" +tail -n 150 ./pacifica_test/master/log/pika.INFO + +echo -e "\n==== 主节点 WARNING 日志 ====" +tail -n 150 ./pacifica_test/master/log/pika.WARNING + +echo -e "\n==== 从节点 INFO 日志 ====" +tail -n 150 ./pacifica_test/slave1/log/pika.INFO + +echo -e "\n==== 从节点 WARNING 日志 ====" +tail -n 150 ./pacifica_test/slave1/log/pika.WARNING \ No newline at end of file diff --git a/tests/integration/start_master_and_slave.sh b/tests/integration/start_master_and_slave.sh index d3d0f1257d..5517ade141 100755 --- a/tests/integration/start_master_and_slave.sh +++ b/tests/integration/start_master_and_slave.sh @@ -1,135 +1,135 @@ -#!/bin/bash -# This script is used by .github/workflows/pika.yml, Do not modify this file unless you know what you are doing. -# it's used to start pika master and slave, running path: build -cp ../conf/pika.conf ./pika_single.conf -cp ../conf/pika.conf ./pika_master.conf -cp ../conf/pika.conf ./pika_slave.conf -cp ../conf/pika.conf ./pika_rename.conf -cp ../conf/pika.conf ./pika_master_rename.conf -cp ../conf/pika.conf ./pika_slave_rename.conf -cp ../conf/pika.conf ./pika_acl_both_password.conf -cp ../conf/pika.conf ./pika_acl_only_admin_password.conf -cp ../conf/pika.conf ./pika_has_other_acl_user.conf -# Create folders for storing data on the primary and secondary nodes -mkdir master_data -mkdir slave_data -# Example Change the location for storing data on primary and secondary nodes in the configuration file -sed -i.bak \ - -e 's|databases : 1|databases : 2|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_single.conf - -sed -i.bak \ - -e 's|databases : 1|databases : 2|' \ - -e 's|port : 9221|port : 9241|' \ - -e 's|log-path : ./log/|log-path : ./master_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./master_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./master_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./master_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./master_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_master.conf - -sed -i.bak \ - -e 's|databases : 1|databases : 2|' \ - -e 's|port : 9221|port : 9231|' \ - -e 's|log-path : ./log/|log-path : ./slave_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./slave_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./slave_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./slave_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./slave_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_slave.conf - -sed -i.bak \ - -e 's|# rename-command : FLUSHALL 360flushall|rename-command : FLUSHALL 360flushall|' \ - -e 's|# rename-command : FLUSHDB 360flushdb|rename-command : FLUSHDB 360flushdb|' \ - -e 's|databases : 1|databases : 2|' \ - -e 's|port : 9221|port : 9251|' \ - -e 's|log-path : ./log/|log-path : ./rename_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./rename_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./rename_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./rename_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./rename_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_rename.conf - -sed -i.bak \ - -e 's|requirepass :|requirepass : requirepass|' \ - -e 's|masterauth :|masterauth : requirepass|' \ - -e 's|# userpass :|userpass : userpass|' \ - -e 's|# userblacklist :|userblacklist : flushall,flushdb|' \ - -e 's|port : 9221|port : 9261|' \ - -e 's|log-path : ./log/|log-path : ./acl1_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./acl1_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./acl1_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./acl1_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl1_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_acl_both_password.conf - -sed -i.bak \ - -e 's|requirepass :|requirepass : requirepass|' \ - -e 's|masterauth :|masterauth : requirepass|' \ - -e 's|# userblacklist :|userblacklist : flushall,flushdb|' \ - -e 's|port : 9221|port : 9271|' \ - -e 's|log-path : ./log/|log-path : ./acl2_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./acl2_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./acl2_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./acl2_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl2_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_acl_only_admin_password.conf - -sed -i.bak \ - -e 's|requirepass :|requirepass : requirepass|' \ - -e 's|masterauth :|masterauth : requirepass|' \ - -e 's|# userpass :|userpass : userpass|' \ - -e 's|# userblacklist :|userblacklist : flushall,flushdb|' \ - -e 's|port : 9221|port : 9281|' \ - -e 's|log-path : ./log/|log-path : ./acl3_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./acl3_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./acl3_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./acl3_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl3_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_has_other_acl_user.conf -echo -e '\nuser : limit on >limitpass ~* +@all &*' >> ./pika_has_other_acl_user.conf - -sed -i '' \ - -e 's|# rename-command : FLUSHDB 360flushdb|rename-command : FLUSHDB 360flushdb|' \ - -e 's|port : 9221|port : 9291|' \ - -e 's|log-path : ./log/|log-path : ./master_rename_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./master_rename_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./master_rename_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./master_rename_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./master_rename_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_master_rename.conf - -sed -i '' \ - -e 's|# rename-command : FLUSHDB 360flushdb|rename-command : FLUSHDB 360flushdb|' \ - -e 's|port : 9221|port : 9301|' \ - -e 's|log-path : ./log/|log-path : ./slave_rename_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./slave_rename_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./slave_rename_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./slave_rename_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./slave_rename_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_slave_rename.conf - -# Start three nodes -./pika -c ./pika_single.conf -./pika -c ./pika_master.conf -./pika -c ./pika_slave.conf -./pika -c ./pika_rename.conf -./pika -c ./pika_acl_both_password.conf -./pika -c ./pika_acl_only_admin_password.conf -./pika -c ./pika_has_other_acl_user.conf -./pika -c ./pika_master_rename.conf -./pika -c ./pika_slave_rename.conf -#ensure both master and slave are ready -sleep 10 +# #!/bin/bash +# # This script is used by .github/workflows/pika.yml, Do not modify this file unless you know what you are doing. +# # it's used to start pika master and slave, running path: build +# cp ../conf/pika.conf ./pika_single.conf +# cp ../conf/pika.conf ./pika_master.conf +# cp ../conf/pika.conf ./pika_slave.conf +# cp ../conf/pika.conf ./pika_rename.conf +# cp ../conf/pika.conf ./pika_master_rename.conf +# cp ../conf/pika.conf ./pika_slave_rename.conf +# cp ../conf/pika.conf ./pika_acl_both_password.conf +# cp ../conf/pika.conf ./pika_acl_only_admin_password.conf +# cp ../conf/pika.conf ./pika_has_other_acl_user.conf +# # Create folders for storing data on the primary and secondary nodes +# mkdir master_data +# mkdir slave_data +# # Example Change the location for storing data on primary and secondary nodes in the configuration file +# sed -i.bak \ +# -e 's|databases : 1|databases : 2|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_single.conf + +# sed -i.bak \ +# -e 's|databases : 1|databases : 2|' \ +# -e 's|port : 9221|port : 9241|' \ +# -e 's|log-path : ./log/|log-path : ./master_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./master_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./master_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./master_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./master_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_master.conf + +# sed -i.bak \ +# -e 's|databases : 1|databases : 2|' \ +# -e 's|port : 9221|port : 9231|' \ +# -e 's|log-path : ./log/|log-path : ./slave_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./slave_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./slave_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./slave_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./slave_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_slave.conf + +# sed -i.bak \ +# -e 's|# rename-command : FLUSHALL 360flushall|rename-command : FLUSHALL 360flushall|' \ +# -e 's|# rename-command : FLUSHDB 360flushdb|rename-command : FLUSHDB 360flushdb|' \ +# -e 's|databases : 1|databases : 2|' \ +# -e 's|port : 9221|port : 9251|' \ +# -e 's|log-path : ./log/|log-path : ./rename_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./rename_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./rename_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./rename_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./rename_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_rename.conf + +# sed -i.bak \ +# -e 's|requirepass :|requirepass : requirepass|' \ +# -e 's|masterauth :|masterauth : requirepass|' \ +# -e 's|# userpass :|userpass : userpass|' \ +# -e 's|# userblacklist :|userblacklist : flushall,flushdb|' \ +# -e 's|port : 9221|port : 9261|' \ +# -e 's|log-path : ./log/|log-path : ./acl1_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./acl1_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./acl1_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./acl1_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl1_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_acl_both_password.conf + +# sed -i.bak \ +# -e 's|requirepass :|requirepass : requirepass|' \ +# -e 's|masterauth :|masterauth : requirepass|' \ +# -e 's|# userblacklist :|userblacklist : flushall,flushdb|' \ +# -e 's|port : 9221|port : 9271|' \ +# -e 's|log-path : ./log/|log-path : ./acl2_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./acl2_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./acl2_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./acl2_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl2_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_acl_only_admin_password.conf + +# sed -i.bak \ +# -e 's|requirepass :|requirepass : requirepass|' \ +# -e 's|masterauth :|masterauth : requirepass|' \ +# -e 's|# userpass :|userpass : userpass|' \ +# -e 's|# userblacklist :|userblacklist : flushall,flushdb|' \ +# -e 's|port : 9221|port : 9281|' \ +# -e 's|log-path : ./log/|log-path : ./acl3_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./acl3_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./acl3_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./acl3_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl3_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_has_other_acl_user.conf +# echo -e '\nuser : limit on >limitpass ~* +@all &*' >> ./pika_has_other_acl_user.conf + +# sed -i '' \ +# -e 's|# rename-command : FLUSHDB 360flushdb|rename-command : FLUSHDB 360flushdb|' \ +# -e 's|port : 9221|port : 9291|' \ +# -e 's|log-path : ./log/|log-path : ./master_rename_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./master_rename_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./master_rename_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./master_rename_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./master_rename_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_master_rename.conf + +# sed -i '' \ +# -e 's|# rename-command : FLUSHDB 360flushdb|rename-command : FLUSHDB 360flushdb|' \ +# -e 's|port : 9221|port : 9301|' \ +# -e 's|log-path : ./log/|log-path : ./slave_rename_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./slave_rename_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./slave_rename_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./slave_rename_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./slave_rename_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_slave_rename.conf + +# # Start three nodes +# ./pika -c ./pika_single.conf +# ./pika -c ./pika_master.conf +# ./pika -c ./pika_slave.conf +# ./pika -c ./pika_rename.conf +# ./pika -c ./pika_acl_both_password.conf +# ./pika -c ./pika_acl_only_admin_password.conf +# ./pika -c ./pika_has_other_acl_user.conf +# ./pika -c ./pika_master_rename.conf +# ./pika -c ./pika_slave_rename.conf +# #ensure both master and slave are ready +# sleep 10 # 创建PacificA一致性测试的数据目录 mkdir -p pacifica_test/master