diff --git a/conf/pika.conf b/conf/pika.conf index 97d171d419..6486574996 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -153,6 +153,25 @@ replication-num : 0 # The default value of consensus-level is 0, which means this feature is not enabled. consensus-level : 0 +# Batch processing configuration (used by both command collection and consensus mechanism) +# The maximum number of items in a batch (both command collection and consensus) +# Default: 100 +batch-size : 100 + +# Batch processing configuration (used by both command collection and consensus mechanism) +# The maximum waiting batch for (both command collection and consensus) +# Default: 5 +batch-max-wait-time : 5 + +# The timeout in milliseconds for waiting for a batch ACK from a slave. +# Default: 500 +replication-ack-timeout : 500 + +# Enable command batch processing for better performance +# When enabled, write commands will be collected and processed in batches +# Default: no +command-batch-enabled : yes + # The Prefix of dump file's name. # All the files that generated by command "bgsave" will be name with this prefix. dump-prefix : diff --git a/include/pika_binlog.h b/include/pika_binlog.h index 43615ae0b4..778eb08b4f 100644 --- a/include/pika_binlog.h +++ b/include/pika_binlog.h @@ -61,6 +61,10 @@ class Binlog : public pstd::noncopyable { * Set Producer pro_num and pro_offset with lock */ pstd::Status SetProducerStatus(uint32_t pro_num, uint64_t pro_offset, uint32_t term = 0, uint64_t index = 0); + + // Force sync data to disk + pstd::Status Sync(); + // Need to hold Lock(); pstd::Status Truncate(uint32_t pro_num, uint64_t pro_offset, uint64_t index); diff --git a/include/pika_client_conn.h b/include/pika_client_conn.h index bc4c28db6a..de01ce5034 100644 --- a/include/pika_client_conn.h +++ b/include/pika_client_conn.h @@ -113,7 +113,7 @@ class PikaClientConn : public net::RedisConn { std::vector> resp_array; std::shared_ptr time_stat_; - + void TryWriteResp(); private: net::ServerThread* const server_thread_; std::string current_db_; @@ -134,7 +134,7 @@ class PikaClientConn : public net::RedisConn { void ProcessMonitor(const PikaCmdArgsType& argv); void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr& resp_ptr, bool cache_miss_in_rtc); - void TryWriteResp(); + // void TryWriteResp(); }; struct ClientInfo { diff --git a/include/pika_command_collector.h b/include/pika_command_collector.h new file mode 100644 index 0000000000..fe74bcf6ab --- /dev/null +++ b/include/pika_command_collector.h @@ -0,0 +1,91 @@ +// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef PIKA_COMMAND_COLLECTOR_H_ +#define PIKA_COMMAND_COLLECTOR_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "include/pika_command.h" +#include "include/pika_define.h" +#include "pstd/include/pstd_status.h" + +#include "include/pika_consensus.h" + +/** + * @brief PikaCommandCollector is used to collect write commands and process them in batches + * + * Main functions: + * 1. Collect write commands and process them in optimized batches after reaching the threshold + * 2. Handle the conflict of the same key (the later command will overwrite the earlier command) + * 3. Send commands in batches to the consensus coordinator with batch-level synchronization + * 4. Support asynchronous callback notification of command processing results + * 5. Track performance metrics for batch processing + +*/ +class PikaCommandCollector { + public: + // Callback function type after command processing is completed + using CommandCallback = std::function; + + /** + * @brief constructor + * @param coordinator consensus coordinator reference + * @param batch_max_wait_time maximum wait time in milliseconds + */ + // Constructor with raw pointer (original) + PikaCommandCollector(ConsensusCoordinator* coordinator, int batch_max_wait_time = 5); + + // Constructor with shared_ptr (for compatibility with make_shared calls) + PikaCommandCollector(std::shared_ptr coordinator, int batch_max_wait_time = 5); + + ~PikaCommandCollector(); + + /** + * @brief Add command to collector + * @param cmd_ptr command pointer + * @param callback callback function after processing is completed + * @return whether the addition was successful + */ + bool AddCommand(std::shared_ptr cmd_ptr, CommandCallback callback); + + /** + * @brief Set the batch max wait time + * @param batch_max_wait_time maximum wait time in milliseconds + */ + void SetBatchMaxWaitTime(int batch_max_wait_time); + + /** + * @brief Get batch processing statistics + * @return Pair of (total_processed_commands, total_batches) + */ + std::pair GetBatchStats() const; + + private: + //Consensus coordinator reference + ConsensusCoordinator* coordinator_; + + // Batch processing configuration + std::atomic batch_max_wait_time_; + + // Batch statistics + std::atomic total_processed_{0}; + std::atomic total_batches_{0}; +}; + +#endif // PIKA_COMMAND_COLLECTOR_H_ \ No newline at end of file diff --git a/include/pika_command_queue.h b/include/pika_command_queue.h new file mode 100644 index 0000000000..62574fcd2d --- /dev/null +++ b/include/pika_command_queue.h @@ -0,0 +1,99 @@ +// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef PIKA_COMMAND_QUEUE_H_ +#define PIKA_COMMAND_QUEUE_H_ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "pstd/include/pstd_mutex.h" +#include "pstd/include/env.h" +#include "include/pika_command.h" +#include "include/pika_define.h" +#include "pstd/include/env.h" +#include "include/pika_define.h" + +// Callback function type for command completion notification +using CommandCallback = std::function; + +// Structure representing a batch of commands +struct CommandBatch { + std::vector> commands; + std::vector callbacks; + uint64_t batch_id; + uint64_t create_time; + std::string db_name; + std::vector binlog_offsets; // Binlog offsets for each command + + CommandBatch(const std::vector>& cmds, + const std::vector& cbs, + const std::string& db) + : commands(cmds), callbacks(cbs), db_name(db) { + static std::atomic next_id{1}; + batch_id = next_id.fetch_add(1); + create_time = pstd::NowMicros(); + } + + bool Empty() const { + return commands.empty(); + } + + size_t Size() const { + return commands.size(); + } +}; + +// New structure to group multiple CommandBatches for RocksDB processing +struct BatchGroup { + std::vector> batches; + LogOffset end_offset; // Only store the final offset of the last batch + BatchGroup() = default; + BatchGroup(const std::vector>& batches, + const LogOffset& final_offset) + : batches(batches), end_offset(final_offset) {} + bool Empty() const { return batches.empty(); } + size_t BatchCount() const { return batches.size(); } +}; + +// Thread-safe command queue for batched command processing +class CommandQueue { +public: + explicit CommandQueue(size_t max_size); + ~CommandQueue(); + + // Enqueue a command batch (blocking if queue is full) + bool EnqueueBatch(std::shared_ptr batch); + + // Dequeue a command batch (blocking if queue is empty) + std::shared_ptr DequeueBatch(); + + // Dequeue all available batches (non-blocking) + std::vector> DequeueAllBatches(); + + // Get current queue size + size_t Size() const; + + // Check if queue is empty + bool Empty() const; + + // Shutdown the queue + void Shutdown(); + +private: + std::queue> cmd_queue_; + mutable std::mutex queue_mutex_; + std::condition_variable queue_cv_; + size_t max_size_; + std::atomic shutdown_{false}; +}; + +#endif // PIKA_COMMAND_QUEUE_H_ \ No newline at end of file diff --git a/include/pika_conf.h b/include/pika_conf.h index 80d5abe8f0..d9cd9a91bd 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -69,6 +69,21 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return sync_thread_num_; } + + bool command_batch_enabled() { + std::shared_lock l(rwlock_); + return command_batch_enabled_; + } + + int batch_size() { + std::shared_lock l(rwlock_); + return batch_size_; + } + + int batch_max_wait_time() { + std::shared_lock l(rwlock_); + return batch_max_wait_time_; + } int sync_binlog_thread_num() { std::shared_lock l(rwlock_); return sync_binlog_thread_num_; @@ -350,6 +365,16 @@ class PikaConf : public pstd::BaseConf { int max_conn_rbuf_size() { return max_conn_rbuf_size_.load(); } int consensus_level() { return consensus_level_.load(); } int replication_num() { return replication_num_.load(); } + int replication_ack_timeout() { + std::shared_lock l(rwlock_); + return replication_ack_timeout_; + } + + // Function to set replication acknowledgment timeout (used by batch system) + void SetReplicationAckTimeout(int timeout) { + std::lock_guard l(rwlock_); + replication_ack_timeout_ = timeout; + } int rate_limiter_mode() { std::shared_lock l(rwlock_); return rate_limiter_mode_; @@ -436,7 +461,6 @@ class PikaConf : public pstd::BaseConf { bool is_admin_cmd(const std::string& cmd) { return admin_cmd_set_.find(cmd) != admin_cmd_set_.end(); } - // Immutable config items, we don't use lock. bool daemonize() { return daemonize_; } bool rtc_cache_read_enabled() { return rtc_cache_read_enabled_; } @@ -462,6 +486,23 @@ class PikaConf : public pstd::BaseConf { std::lock_guard l(rwlock_); thread_num_ = value; } + + void SetCommandBatchEnabled(const bool value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("command-batch-enabled", value ? "yes" : "no"); + command_batch_enabled_ = value; + } + + void SetCommandBatchSize(const int value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("batch-size", std::to_string(value)); + batch_size_ = value; + } + void SetCommandBatchMaxWaitTime(const int value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("batch-max-wait-time", std::to_string(value)); + batch_max_wait_time_ = value; + } void SetTimeout(const int value) { std::lock_guard l(rwlock_); TryPushDiffCommands("timeout", std::to_string(value)); @@ -665,6 +706,17 @@ class PikaConf : public pstd::BaseConf { TryPushDiffCommands("max-conn-rbuf-size", std::to_string(value)); max_conn_rbuf_size_.store(value); } + void SetConsensusBatchSize(const int value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("batch-size", std::to_string(value)); + batch_size_ = value; + } + // This method is used by config update system + void UpdateReplicationAckTimeout(const int value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("replication-ack-timeout", std::to_string(value)); + replication_ack_timeout_ = value; + } void SetMaxCacheFiles(const int& value) { std::lock_guard l(rwlock_); TryPushDiffCommands("max-cache-files", std::to_string(value)); @@ -929,6 +981,12 @@ class PikaConf : public pstd::BaseConf { std::string server_id_; std::string run_id_; std::string replication_id_; + + // 命令批处理相关配置 + bool command_batch_enabled_ = true; + int batch_size_ = 100; + int batch_max_wait_time_ = 5; + int replication_ack_timeout_ = 5000; std::string requirepass_; std::string masterauth_; std::string userpass_; @@ -1047,7 +1105,7 @@ class PikaConf : public pstd::BaseConf { int throttle_bytes_per_second_ = 200 << 20; // 200MB/s int max_rsync_parallel_num_ = kMaxRsyncParallelNum; std::atomic_int64_t rsync_timeout_ms_ = 1000; - + /* kUninitialized = 0, // unknown setting kDisable = 1, // disable perf stats diff --git a/include/pika_consensus.h b/include/pika_consensus.h index 78e20eb3ab..f13a147054 100644 --- a/include/pika_consensus.h +++ b/include/pika_consensus.h @@ -244,6 +244,10 @@ class ConsensusCoordinator { SyncProgress sync_pros_; std::shared_ptr stable_logger_; std::shared_ptr mem_logger_; + + // Make db_name accessible to external classes + public: + const std::string& db_name() const { return db_name_; } // pacificA public: @@ -258,6 +262,8 @@ class ConsensusCoordinator { pstd::Status CommitAppLog(const LogOffset& master_committed_id); pstd::Status UpdateCommittedID(); pstd::Status ApplyBinlog(const std::shared_ptr& cmd_ptr); + pstd::Status BatchApplyBinlogs(const std::vector& logs_to_apply); + void BatchInternalApplyFollower(const std::vector>& cmd_ptrs); pstd::Status ProcessCoordination(); LogOffset GetCommittedId() { @@ -273,9 +279,16 @@ class ConsensusCoordinator { prepared_id_ = offset; } void SetCommittedId(const LogOffset& offset) { - std::lock_guard l(committed_id_rwlock_); - committed_id_ = offset; - context_->UpdateAppliedIndex(committed_id_); + { + std::lock_guard l(committed_id_rwlock_); + if (committed_id_ >= offset) { + return; + } + committed_id_ = offset; + context_->UpdateAppliedIndex(committed_id_); + } + notification_counter_.fetch_add(1); + LOG(INFO) << "SetCommittedId: Updated to " << offset.ToString(); } private: @@ -286,6 +299,7 @@ class ConsensusCoordinator { bool is_consistency_ = false; std::shared_mutex committed_id_rwlock_; LogOffset committed_id_ = LogOffset(); + std::atomic notification_counter_{0}; std::shared_mutex prepared_id__rwlock_; LogOffset prepared_id_ = LogOffset(); std::shared_ptr logs_; diff --git a/include/pika_define.h b/include/pika_define.h index c09d0d7c38..6568685a33 100644 --- a/include/pika_define.h +++ b/include/pika_define.h @@ -140,7 +140,9 @@ struct LogOffset { bool operator<=(const LogOffset& other) const { return b_offset <= other.b_offset; } bool operator>=(const LogOffset& other) const { return b_offset >= other.b_offset; } bool operator>(const LogOffset& other) const { return b_offset > other.b_offset; } + bool operator!=(const LogOffset& other) const { return b_offset != other.b_offset; } std::string ToString() const { return b_offset.ToString() + " " + l_offset.ToString(); } + bool IsValid() const { return b_offset.filenum > 0 || b_offset.offset > 0; } BinlogOffset b_offset; LogicOffset l_offset; }; @@ -178,10 +180,18 @@ const std::string BinlogSyncStateMsg[] = {"NotSync", "ReadFromCache", "ReadFromF struct BinlogChip { LogOffset offset_; std::string binlog_; - BinlogChip(const LogOffset& offset, std::string binlog) : offset_(offset), binlog_(std::move(binlog)) {} + bool is_batch_ = false; + + BinlogChip(const LogOffset& offset, std::string binlog) + : offset_(offset), binlog_(std::move(binlog)), is_batch_(false) {} + + BinlogChip(const LogOffset& offset, std::string binlog, bool is_batch) + : offset_(offset), binlog_(std::move(binlog)), is_batch_(is_batch) {} + BinlogChip(const BinlogChip& binlog_chip) { offset_ = binlog_chip.offset_; binlog_ = binlog_chip.binlog_; + is_batch_ = binlog_chip.is_batch_; } }; diff --git a/include/pika_kv.h b/include/pika_kv.h index 82939d29d9..8e5e5a6f32 100644 --- a/include/pika_kv.h +++ b/include/pika_kv.h @@ -24,6 +24,7 @@ class SetCmd : public Cmd { res.push_back(key_); return res; } + ~SetCmd() {} void Do() override; void DoUpdateCache() override; void DoThroughDB() override; diff --git a/include/pika_rm.h b/include/pika_rm.h index 709d5722cc..7db1bbf322 100644 --- a/include/pika_rm.h +++ b/include/pika_rm.h @@ -12,6 +12,10 @@ #include #include #include +#include +#include +#include +#include #include "pstd/include/pstd_status.h" @@ -22,6 +26,8 @@ #include "include/pika_slave_node.h" #include "include/pika_stable_log.h" #include "include/rsync_client.h" +#include "include/pika_command_collector.h" +#include "include/pika_command_queue.h" #define kBinlogSendPacketNum 40 #define kBinlogSendBatchNum 100 @@ -83,16 +89,22 @@ class SyncMasterDB : public SyncDB { return coordinator_.StableLogger()->Logger(); } + std::shared_ptr GetSlaveNode(const std::string& ip, int port); + // Make coordinator_ accessible to StableLog class + ConsensusCoordinator& GetCoordinator() { return coordinator_; } + std::shared_ptr GetCommandCollector(); + private: // invoker need to hold slave_mu_ pstd::Status ReadBinlogFileToWq(const std::shared_ptr& slave_ptr); - std::shared_ptr GetSlaveNode(const std::string& ip, int port); + //std::shared_ptr GetSlaveNode(const std::string& ip, int port); std::unordered_map> GetAllSlaveNodes(); pstd::Mutex session_mu_; int32_t session_id_ = 0; ConsensusCoordinator coordinator_; + std::shared_ptr command_collector_; //pacificA public: public: @@ -112,8 +124,7 @@ class SyncMasterDB : public SyncDB { pstd::Status UpdateCommittedID(); pstd::Status CommitAppLog(const LogOffset& master_committed_id); pstd::Status Truncate(const LogOffset& offset); - - + // pstd::Status WaitForSlaveAcks(const LogOffset& target_offset, int timeout_ms); }; class SyncSlaveDB : public SyncDB { @@ -233,6 +244,14 @@ class PikaReplicaManager { return pika_repl_client_->GetUnfinishedAsyncWriteDBTaskCount(db_name); } + // Command Queue related methods + void EnqueueCommandBatch(std::shared_ptr batch); + std::shared_ptr DequeueCommandBatch(); + size_t GetCommandQueueSize() const; + bool IsCommandQueueEmpty() const; + // CommittedID notification for RocksDB thread + void NotifyCommittedID(const LogOffset& committed_id); + private: void InitDB(); pstd::Status SelectLocalIp(const std::string& remote_ip, int remote_port, std::string* local_ip); @@ -243,10 +262,59 @@ class PikaReplicaManager { pstd::Mutex write_queue_mu_; + // db_name -> a queue of write task + using DBWriteTaskQueue = std::map>; + // ip:port -> a map of DBWriteTaskQueue + using SlaveWriteTaskQueue = std::map; + // every host owns a queue, the key is "ip + port" - std::unordered_map>> write_queues_; + SlaveWriteTaskQueue write_queues_; + + // client for replica std::unique_ptr pika_repl_client_; std::unique_ptr pika_repl_server_; + + // Condition variable for signaling when the write queue has new items + pstd::CondVar write_queue_cv_; + + std::shared_mutex is_consistency_rwlock_; + bool is_consistency_ = true; + std::shared_mutex committed_id_rwlock_; + + // Command queue for collected batches + std::unique_ptr command_queue_; + + // Background thread for processing command queue + std::unique_ptr command_queue_thread_; + std::atomic command_queue_running_{false}; + std::mutex command_queue_mutex_; + std::condition_variable command_queue_cv_; + + // RocksDB background thread for Put operations and client responses + std::unique_ptr rocksdb_back_thread_; + std::atomic rocksdb_thread_running_{false}; + std::mutex rocksdb_thread_mutex_; + std::condition_variable rocksdb_thread_cv_; + + // Pending batch groups waiting for CommittedID + std::queue> pending_batch_groups_; + std::mutex pending_batch_groups_mutex_; + + // Last committed ID for RocksDB thread processing + LogOffset last_committed_id_; + std::mutex last_committed_id_mutex_; + + // Background thread processing methods + void StartCommandQueueThread(); + void StopCommandQueueThread(); + void CommandQueueLoop(); + void ProcessCommandBatches(const std::vector>& batches); + + // RocksDB background thread methods + void StartRocksDBThread(); + void StopRocksDBThread(); + void RocksDBThreadLoop(); + size_t ProcessCommittedBatchGroups(const LogOffset& committed_id); }; #endif // PIKA_RM_H diff --git a/include/pika_server.h b/include/pika_server.h index 41a8c9b346..f67e569645 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -581,7 +581,7 @@ class PikaServer : public pstd::noncopyable { std::string master_ip_; int master_port_ = 0; int repl_state_ = PIKA_REPL_NO_CONNECT; - bool is_consistency_ = false; + bool is_consistency_ = true; int role_ = PIKA_ROLE_SINGLE; int last_role_ = PIKA_ROLE_SINGLE; int last_meta_sync_timestamp_ = 0; diff --git a/src/net/src/pb_conn.cc b/src/net/src/pb_conn.cc index 5185e8f51d..15e4024da0 100644 --- a/src/net/src/pb_conn.cc +++ b/src/net/src/pb_conn.cc @@ -153,7 +153,7 @@ WriteStatus PbConn::SendReply() { if (item_len - write_buf_.item_pos_ != 0) { return kWriteHalf; } - LOG(ERROR) << "write item success"; + //LOG(ERROR) << "write item success"; } return kWriteAll; } diff --git a/src/pika_binlog.cc b/src/pika_binlog.cc index 187d63d8ad..e051108317 100644 --- a/src/pika_binlog.cc +++ b/src/pika_binlog.cc @@ -469,3 +469,10 @@ Status Binlog::Truncate(uint32_t pro_num, uint64_t pro_offset, uint64_t index) { return Status::OK(); } + +Status Binlog::Sync() { + if (queue_) { + return queue_->Sync(); + } + return Status::Corruption("Logger not initialized"); +} \ No newline at end of file diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index a6cd5ec62f..59f196a189 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -16,9 +16,11 @@ #include "include/pika_define.h" #include "include/pika_rm.h" #include "include/pika_server.h" +#include "include/pika_command_collector.h" #include "net/src/dispatch_thread.h" #include "net/src/worker_thread.h" #include "src/pstd/include/scope_record_lock.h" +#include #include "rocksdb/perf_context.h" #include "rocksdb/iostats_context.h" @@ -217,8 +219,46 @@ std::shared_ptr PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st // Perform some operations rocksdb::get_perf_context()->Reset(); - // Process Command - c_ptr->Execute(); + + // Process Command - route write commands through CommandCollector for batching + if (c_ptr->is_write() && g_pika_conf->command_batch_enabled()) { + // Get the appropriate SyncMasterDB for command batching + auto sync_db = g_pika_rm->GetSyncMasterDBByName(DBInfo(c_ptr->db_name())); + if (sync_db) { + auto command_collector = sync_db->GetCommandCollector(); + if (command_collector) { + // Create callback to handle command completion + auto callback = [this, c_ptr](const LogOffset& offset, pstd::Status status) { + LOG(INFO) << "Command completed"; + auto pc = dynamic_cast(c_ptr->GetConn().get()); + if (pc) { + auto resp_ptr = c_ptr->GetResp(); + if (resp_ptr) { + *resp_ptr = std::move(c_ptr->res().message()); + } + pc->resp_num--; + pc->TryWriteResp(); + } + }; + + // Add command to collector for batch processing + bool added = command_collector->AddCommand(c_ptr, callback); + if (!added) { + LOG(WARNING) << "Failed to add command " << c_ptr->name() << " to CommandCollector, executing directly"; + c_ptr->Execute(); + } + } else { + LOG(WARNING) << "CommandCollector not available, executing command directly"; + c_ptr->Execute(); + } + } else { + LOG(WARNING) << "SyncMasterDB not found for " << c_ptr->db_name() << ", executing command directly"; + c_ptr->Execute(); + } + } else { + // Non-write commands or batching disabled - execute directly + c_ptr->Execute(); + } time_stat_->process_done_ts_ = pstd::NowMicros(); auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap(); @@ -553,8 +593,13 @@ void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr cmd_ptr = DoCmd(argv, opt, resp_ptr, cache_miss_in_rtc); - *resp_ptr = std::move(cmd_ptr->res().message()); - resp_num--; + // *resp_ptr = std::move(cmd_ptr->res().message()); + // resp_num--; + if (opt == kCmdNameSet) { + } else { + *resp_ptr = std::move(cmd_ptr->res().message()); + resp_num--; + } } std::queue> PikaClientConn::GetTxnCmdQue() { return txn_cmd_que_; } diff --git a/src/pika_command.cc b/src/pika_command.cc index fa27505844..d72f5c355b 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -952,35 +952,36 @@ bool Cmd::DoReadCommandInCache() { void Cmd::DoBinlog() { - if (res().ok() && is_write() && g_pika_conf->write_binlog()) { - std::shared_ptr conn_ptr = GetConn(); - std::shared_ptr resp_ptr = GetResp(); - // Consider that dummy cmd appended by system, both conn and resp are null. - if ((!conn_ptr || !resp_ptr) && (name_ != kCmdDummy)) { - if (!conn_ptr) { - LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " conn empty."; - } - if (!resp_ptr) { - LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " resp empty."; - } - res().SetRes(CmdRes::kErrOther); - return; - } - - Status s = sync_db_->ConsensusProposeLog(shared_from_this()); - if (!s.ok()) { - if(g_pika_server->IsConsistency()&&s.IsTimeout()){ - res().SetRes(CmdRes::kConsistencyTimeout, "Timeout waiting for consistency"); - LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " Slave node consistency timeout" - << s.ToString(); - }else{ - LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " Writing binlog failed, maybe no space left on device " - << s.ToString(); - res().SetRes(CmdRes::kErrOther, s.ToString()); - } - return; - } - } +// if (res().ok() && is_write() && g_pika_conf->write_binlog()) { +// std::shared_ptr conn_ptr = GetConn(); +// std::shared_ptr resp_ptr = GetResp(); +// // Consider that dummy cmd appended by system, both conn and resp are null. +// if ((!conn_ptr || !resp_ptr) && (name_ != kCmdDummy)) { +// if (!conn_ptr) { +// LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " conn empty."; +// } +// if (!resp_ptr) { +// LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " resp empty."; +// } +// res().SetRes(CmdRes::kErrOther); +// return; +// } + +// Status s = sync_db_->ConsensusProposeLog(shared_from_this()); +// if (!s.ok()) { +// if(g_pika_server->IsConsistency()&&s.IsTimeout()){ +// res().SetRes(CmdRes::kConsistencyTimeout, "Timeout waiting for consistency"); +// LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " Slave node consistency timeout" +// << s.ToString(); +// }else{ +// LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " Writing binlog failed, maybe no space left on device " +// << s.ToString(); +// res().SetRes(CmdRes::kErrOther, s.ToString()); +// } +// return; +// } +// } + return; } #define PIKA_STAGE_DURATION_OUTPUT(duration) \ diff --git a/src/pika_command_collector.cc b/src/pika_command_collector.cc new file mode 100644 index 0000000000..8e64f814cd --- /dev/null +++ b/src/pika_command_collector.cc @@ -0,0 +1,64 @@ +// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include +#include +#include +#include "include/pika_command_collector.h" +#include "include/pika_rm.h" +#include "include/pika_conf.h" + +extern std::unique_ptr g_pika_conf; +extern std::unique_ptr g_pika_rm; + +PikaCommandCollector::PikaCommandCollector(ConsensusCoordinator* coordinator, int batch_max_wait_time) + : coordinator_(coordinator), batch_max_wait_time_(batch_max_wait_time) { + LOG(INFO) << "PikaCommandCollector started for DB: " << coordinator_->db_name() + << " with batch_max_wait_time: " << batch_max_wait_time << "ms"; +} + +PikaCommandCollector::PikaCommandCollector(std::shared_ptr coordinator, int batch_max_wait_time) + : coordinator_(coordinator.get()), batch_max_wait_time_(batch_max_wait_time) { + LOG(INFO) << "PikaCommandCollector started for DB: " << coordinator_->db_name() + << " with batch_max_wait_time: " << batch_max_wait_time << "ms"; +} + +PikaCommandCollector::~PikaCommandCollector() { + LOG(INFO) << "PikaCommandCollector stopped, processed " << total_processed_.load() + << " commands, " << total_batches_.load() << " batches"; +} + +bool PikaCommandCollector::AddCommand(std::shared_ptr cmd_ptr, CommandCallback callback) { + if (!cmd_ptr || !cmd_ptr->is_write()) { + LOG(WARNING) << "Attempt to add non-write command to CommandCollector"; + return false; + } + + // Create a single-command batch directly + std::vector> commands = {cmd_ptr}; + std::vector callbacks = {std::move(callback)}; + + std::string db_name = cmd_ptr->db_name().empty() ? g_pika_conf->default_db() : cmd_ptr->db_name(); + auto command_batch = std::make_shared(commands, callbacks, db_name); + + // Enqueue the batch directly to PikaReplicaManager + g_pika_rm->EnqueueCommandBatch(command_batch); + + // Update statistics + total_processed_.fetch_add(1); + total_batches_.fetch_add(1); + + LOG(INFO) << "Added single command " << cmd_ptr->name() << " to CommandQueue"; + return true; +} + +void PikaCommandCollector::SetBatchMaxWaitTime(int batch_max_wait_time) { + batch_max_wait_time_.store(batch_max_wait_time); + LOG(INFO) << "BatchMaxWaitTime set to " << batch_max_wait_time << "ms"; +} + +std::pair PikaCommandCollector::GetBatchStats() const { + return {total_processed_.load(), total_batches_.load()}; +} \ No newline at end of file diff --git a/src/pika_command_queue.cc b/src/pika_command_queue.cc new file mode 100644 index 0000000000..f9013f65c9 --- /dev/null +++ b/src/pika_command_queue.cc @@ -0,0 +1,107 @@ +// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "include/pika_command_queue.h" +#include + +CommandQueue::CommandQueue(size_t max_size) : max_size_(max_size) { + LOG(INFO) << "CommandQueue created with max_size: " << max_size_; +} + +CommandQueue::~CommandQueue() { + Shutdown(); + LOG(INFO) << "CommandQueue destroyed"; +} + +bool CommandQueue::EnqueueBatch(std::shared_ptr batch) { + if (!batch || batch->Empty()) { + LOG(WARNING) << "Attempt to enqueue empty or null batch"; + return false; + } + + std::lock_guard lock(queue_mutex_); + + if (shutdown_.load()) { + LOG(WARNING) << "Cannot enqueue batch: queue is shutdown"; + return false; + } + + if (cmd_queue_.size() >= max_size_) { + LOG(WARNING) << "Command queue is full (size: " << cmd_queue_.size() + << ", max: " << max_size_ << "), dropping batch"; + return false; + } + + cmd_queue_.push(batch); + + LOG(INFO) << "Enqueued command batch with " << batch->Size() + << " commands, queue size: " << cmd_queue_.size(); + + queue_cv_.notify_one(); + return true; +} + +std::shared_ptr CommandQueue::DequeueBatch() { + std::unique_lock lock(queue_mutex_); + + while (cmd_queue_.empty() && !shutdown_.load()) { + queue_cv_.wait(lock); + } + + if (shutdown_.load() && cmd_queue_.empty()) { + return nullptr; + } + + auto batch = cmd_queue_.front(); + cmd_queue_.pop(); + + LOG(INFO) << "Dequeued command batch with " << batch->Size() + << " commands, remaining queue size: " << cmd_queue_.size(); + + return batch; +} + +std::vector> CommandQueue::DequeueAllBatches() { + std::vector> batches; + std::lock_guard lock(queue_mutex_); + + if (shutdown_.load()) { + return batches; + } + + // Take all available batches + while (!cmd_queue_.empty()) { + batches.push_back(cmd_queue_.front()); + cmd_queue_.pop(); + } + + if (!batches.empty()) { + size_t total_commands = 0; + for (const auto& batch : batches) { + total_commands += batch->Size(); + } + LOG(INFO) << "Dequeued all batches: " << batches.size() + << " batches with " << total_commands << " total commands"; + } + + return batches; +} + +size_t CommandQueue::Size() const { + std::lock_guard lock(queue_mutex_); + return cmd_queue_.size(); +} + +bool CommandQueue::Empty() const { + std::lock_guard lock(queue_mutex_); + return cmd_queue_.empty(); +} + +void CommandQueue::Shutdown() { + std::lock_guard lock(queue_mutex_); + shutdown_.store(true); + queue_cv_.notify_all(); + LOG(INFO) << "CommandQueue shutdown, remaining batches: " << cmd_queue_.size(); +} diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 94071eac7f..a4a7178cb7 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -412,6 +412,20 @@ int PikaConf::Load() { max_cache_statistic_keys_ = 0; } + // 命令批处理相关配置 + std::string command_batch_enabled; + GetConfStr("command-batch-enabled", &command_batch_enabled); + command_batch_enabled_ = (command_batch_enabled == "yes"); + + GetConfInt("batch-size", &batch_size_); + if (batch_size_ <= 0) { + batch_size_ = 100; + } + + GetConfInt("batch_max_wait_time", &batch_max_wait_time_); + if (batch_max_wait_time_ <= 0) { + batch_max_wait_time_ = 5; + } // disable_auto_compactions GetConfBool("disable_auto_compactions", &disable_auto_compactions_); @@ -707,9 +721,13 @@ int PikaConf::Load() { rsync_timeout_ms_.store(tmp_rsync_timeout_ms); } - return ret; -} - + GetConfInt("replication-ack-timeout", &replication_ack_timeout_); + if (replication_ack_timeout_ <= 0) { + replication_ack_timeout_ = 5000; + } + return ret; + } + void PikaConf::TryPushDiffCommands(const std::string& command, const std::string& value) { if (!CheckConfExist(command)) { diff_commands_[command] = value; @@ -770,6 +788,9 @@ int PikaConf::ConfigRewrite() { SetConfStr("run-id", run_id_); SetConfStr("replication-id", replication_id_); SetConfInt("max-cache-statistic-keys", max_cache_statistic_keys_); + SetConfStr("command-batch-enabled", command_batch_enabled_ ? "yes" : "no"); + SetConfInt("batch-size", batch_size_); + SetConfInt("batch-max-wait-time", batch_max_wait_time_); SetConfInt("small-compaction-threshold", small_compaction_threshold_); SetConfInt("small-compaction-duration-threshold", small_compaction_duration_threshold_); SetConfInt("max-client-response-size", static_cast(max_client_response_size_)); @@ -790,6 +811,7 @@ int PikaConf::ConfigRewrite() { SetConfInt("replication-num", replication_num_.load()); SetConfStr("slow-cmd-list", pstd::Set2String(slow_cmd_set_, ',')); SetConfInt("max-conn-rbuf-size", max_conn_rbuf_size_.load()); + SetConfInt("replication-ack-timeout", replication_ack_timeout_); // options for storage engine SetConfInt("max-cache-files", max_cache_files_); SetConfInt("max-background-compactions", max_background_compactions_); diff --git a/src/pika_consensus.cc b/src/pika_consensus.cc index ef1960d589..9900c5a41f 100644 --- a/src/pika_consensus.cc +++ b/src/pika_consensus.cc @@ -12,6 +12,7 @@ #include "include/pika_conf.h" #include "include/pika_rm.h" #include "include/pika_server.h" +#include "include/pika_repl_bgworker.h" using pstd::Status; @@ -841,9 +842,16 @@ Status ConsensusCoordinator::PersistAppendBinlog(const std::shared_ptr& cmd // If successful, append the log entry to the logs // TODO: 这里logs_的appendlog操作和上边的stable_logger_->Logger()->Put不是原子的,可能导致offset大的先被追加到logs_中, // 多线程写入的时候窗口会对不上,最终主从断开连接。需要加逻辑保证原子性 - logs_->AppendLog(Log::LogItem(cur_offset, cmd_ptr, binlog)); - - SetPreparedId(cur_offset); + //LOG(INFO) << "PersistAppendBinlog: About to append to logs_, current size=" << logs_->Size() + // << ", offset=" << cur_offset.ToString() << ", cmd=" << cmd_ptr->name(); + // logs_->AppendLog(Log::LogItem(cur_offset, cmd_ptr, binlog)); + //LOG(INFO) << "PersistAppendBinlog: Successfully appended to logs_, new size=" << logs_->Size(); + { + std::lock_guard l(order_mu_); + // Append to logs_ under order lock to maintain ordering + logs_->AppendLog(Log::LogItem(cur_offset, cmd_ptr, binlog)); + SetPreparedId(cur_offset); + } return stable_logger_->Logger()->IsOpened(); } @@ -863,7 +871,7 @@ Status ConsensusCoordinator::AppendEntries(const std::shared_ptr& cmd_ptr, return s; } - g_pika_server->SignalAuxiliary(); + // g_pika_server->SignalAuxiliary(); return Status::OK(); } Status ConsensusCoordinator::AppendSlaveEntries(const std::shared_ptr& cmd_ptr, const BinlogItem& attribute) { @@ -917,6 +925,23 @@ Status ConsensusCoordinator::UpdateCommittedID() { } } } + // if (!has_active_slaves) { + // LogOffset master_prepared_id = GetPreparedId(); + // if (master_prepared_id.IsValid() && master_prepared_id >= GetCommittedId()) { + // SetCommittedId(master_prepared_id); + // LOG(INFO) << "PacificA update CommittedID (no active slaves): " << GetCommittedId().ToString() + // << ", Total slaves: " << total_slaves + // << ", kSlaveBinlogSync: " << binlog_sync_slaves + // << ", Other states: " << other_state_slaves; + // } else { + // LOG(INFO) << "PacificA update CommittedID: No active slaves, keeping current CommittedID: " << GetCommittedId().ToString() + // << ", Total slaves: " << total_slaves + // << ", kSlaveBinlogSync: " << binlog_sync_slaves + // << ", Other states: " << other_state_slaves; + // } + // // g_pika_rm->NotifyCommittedID(GetCommittedId()); + // return Status::OK(); + // } if (slave_prepared_id < GetCommittedId()) { LOG(WARNING) << "Error: slave_prepared_id (" << slave_prepared_id.ToString() << ") < master_committedId (" << GetCommittedId().ToString() << ")"; @@ -948,7 +973,9 @@ Status ConsensusCoordinator::ProcessCoordination() { // Execute the operation of writing to DB Status ConsensusCoordinator::ApplyBinlog(const std::shared_ptr& cmd_ptr) { auto opt = cmd_ptr->argv()[0]; + LOG(INFO) << "[ApplyBinlog] Received command: " << opt << " for db: " << db_name_; if (pstd::StringToLower(opt) != kCmdNameFlushdb) { + LOG(INFO) << "[ApplyBinlog] Scheduling async task for " << opt; InternalApplyFollower(cmd_ptr); } else { int32_t wait_ms = 250; @@ -964,6 +991,25 @@ Status ConsensusCoordinator::ApplyBinlog(const std::shared_ptr& cmd_ptr) { return Status::OK(); } +// Batch apply commands to slave database +Status ConsensusCoordinator::BatchApplyBinlogs(const std::vector& logs_to_apply) { + if (logs_to_apply.empty()) { + return Status::OK(); + } + for (const auto& log_item : logs_to_apply) { + PikaReplBgWorker::WriteDBInSyncWay(log_item.cmd_ptr); + } + + return Status::OK(); +} + +void ConsensusCoordinator::BatchInternalApplyFollower(const std::vector>& cmd_ptrs) { + for (const auto& cmd_ptr : cmd_ptrs) { + // g_pika_rm->ScheduleWriteDBTask(cmd_ptr, db_name_); + PikaReplBgWorker::WriteDBInSyncWay(cmd_ptr); + } +} + Status ConsensusCoordinator::SendBinlog(std::shared_ptr slave_ptr, std::string db_name) { std::vector tasks; diff --git a/src/pika_kv.cc b/src/pika_kv.cc index 1c1abdd4cf..79dc6cd291 100644 --- a/src/pika_kv.cc +++ b/src/pika_kv.cc @@ -15,6 +15,20 @@ extern std::unique_ptr g_pika_conf; /* SET key value [NX] [XX] [EX ] [PX ] */ +// SetCmd::~SetCmd() { +// auto tmp_conn = GetConn(); +// if (!tmp_conn) { +// return; +// } + +// auto pc = dynamic_cast(tmp_conn.get()); +// std::shared_ptr resp_ptr = std::make_shared(); +// *resp_ptr = std::move(res().message()); +// pc->resp_num--; +// pc->resp_array.push_back(resp_ptr); +// pc->TryWriteResp(); +// LOG(INFO) << "SetCmd::~SetCmd() is completed"; +// } void SetCmd::DoInitial() { if (!CheckArg(argv_.size())) { res_.SetRes(CmdRes::kWrongNum, kCmdNameSet); diff --git a/src/pika_repl_server.cc b/src/pika_repl_server.cc index c8f1c9f9dc..2617a59aab 100644 --- a/src/pika_repl_server.cc +++ b/src/pika_repl_server.cc @@ -17,7 +17,7 @@ extern PikaServer* g_pika_server; extern std::unique_ptr g_pika_rm; PikaReplServer::PikaReplServer(const std::set& ips, int port, int cron_interval) { - server_tp_ = std::make_unique(PIKA_REPL_SERVER_TP_SIZE, 100000, "PikaReplServer"); + server_tp_ = std::make_unique(1, 100000, "PikaReplServer"); pika_repl_server_thread_ = std::make_unique(ips, port, cron_interval); pika_repl_server_thread_->set_thread_name("PikaReplServer"); } diff --git a/src/pika_rm.cc b/src/pika_rm.cc index 9c777339ab..a411e906bc 100644 --- a/src/pika_rm.cc +++ b/src/pika_rm.cc @@ -12,6 +12,7 @@ #include #include +#include #include "net/include/net_cli.h" @@ -25,6 +26,7 @@ using pstd::Status; extern std::unique_ptr g_pika_rm; extern PikaServer* g_pika_server; +extern std::unique_ptr g_pika_conf; /* SyncDB */ @@ -38,7 +40,9 @@ std::string SyncDB::DBName() { /* SyncMasterDB*/ SyncMasterDB::SyncMasterDB(const std::string& db_name) - : SyncDB(db_name), coordinator_(db_name) {} + : SyncDB(db_name), coordinator_(db_name) { + command_collector_ = std::make_shared(&coordinator_, 5); +} int SyncMasterDB::GetNumberOfSlaveNode() { return coordinator_.SyncPros().SlaveSize(); } @@ -245,11 +249,11 @@ Status SyncMasterDB::WakeUpSlaveBinlogSync() { } if (!s.ok()) { to_del.push_back(slave_ptr); - LOG(WARNING) << "WakeUpSlaveBinlogSync failed, marking for deletion: " - << slave_ptr->ToStringStatus() << " - " << s.ToString(); - } + // LOG(WARNING) << "WakeUpSlaveBinlogSync failed, marking for deletion: " + // << slave_ptr->ToStringStatus() << " - " << s.ToString(); } - } + } + } for (const auto& to_del_slave : to_del) { RemoveSlaveNode(to_del_slave->Ip(), to_del_slave->Port()); @@ -424,11 +428,25 @@ LogOffset SyncMasterDB::GetCommittedId(){ Status SyncMasterDB::AppendSlaveEntries(const std::shared_ptr& cmd_ptr, const BinlogItem& attribute) { return coordinator_.AppendSlaveEntries(cmd_ptr, attribute); } + Status SyncMasterDB::ProcessCoordination(){ return coordinator_.ProcessCoordination(); } Status SyncMasterDB::UpdateCommittedID(){ - return coordinator_.UpdateCommittedID(); + Status s = coordinator_.UpdateCommittedID(); + if (s.ok()) { + // Notify RocksDB thread of new CommittedID + LogOffset committed_id = coordinator_.GetCommittedId(); + if (committed_id.IsValid()) { + extern std::unique_ptr g_pika_rm; + if (g_pika_rm) { + g_pika_rm->NotifyCommittedID(committed_id); + } + } + } else { + LOG(WARNING) << "UpdateCommittedID failed: " << s.ToString(); + } + return s; } Status SyncMasterDB::Truncate(const LogOffset& offset){ return coordinator_.Truncate(offset); @@ -471,13 +489,79 @@ Status SyncMasterDB::AppendCandidateBinlog(const std::string& ip, int port, cons return Status::OK(); } +/* +pstd::Status SyncMasterDB::WaitForSlaveAcks(const LogOffset& target_offset, int timeout_ms) { + // bug: 重发有问题 + // Get slave count + int slave_count = GetNumberOfSlaveNode(); + LOG(INFO) << "WaitForSlaveAcks: Waiting for ACKs from master and " << slave_count << " slave(s) for target " << target_offset.ToString(); + + // If no slaves, return success immediately + if (slave_count == 0) { + LOG(INFO) << "WaitForSlaveAcks: No slaves connected, returning success immediately"; + // Update committed_id + coordinator_.SetCommittedId(target_offset); + return Status::OK(); + } + + g_pika_rm->WakeUpBinlogSync(); + + // Use efficient polling mechanism to avoid creating extra threads + auto start_time = std::chrono::steady_clock::now(); + auto timeout_duration = std::chrono::milliseconds(timeout_ms); + const int POLL_INTERVAL_MS = 10; // 10ms polling interval + + while (true) { + // Check if timeout + auto elapsed = std::chrono::steady_clock::now() - start_time; + if (elapsed >= timeout_duration) { + LOG(WARNING) << "WaitForSlaveAcks: after " << timeout_ms << "ms waiting for target " << target_offset.ToString(); + return Status::Timeout("Strong consistency replication timed out"); + } + + // Check acknowledgment status of each slave node + std::unordered_map> slaves = GetAllSlaveNodes(); + int ack_count = 1; // Master node is already acknowledged + + for (const auto& slave_pair : slaves) { + std::shared_ptr slave = slave_pair.second; + if (slave) { + slave->Lock(); + LogOffset acked_offset = slave->acked_offset; + slave->Unlock(); + + if (acked_offset >= target_offset) { + ack_count++; + } + } + } + + // Check if all nodes have acknowledged (1 master + N slaves) + int expected_acks = 1 + slave_count; + if (ack_count >= expected_acks) { + LOG(INFO) << "WaitForSlaveAcks: All " << expected_acks << " nodes have acknowledged target " << target_offset.ToString(); + // Update committed_id + coordinator_.SetCommittedId(target_offset); + return Status::OK(); + } + + // Sleep briefly then retry + std::this_thread::sleep_for(std::chrono::milliseconds(POLL_INTERVAL_MS)); + + // Periodically trigger binlog sync to ensure slave nodes receive data + if (elapsed.count() % 100 == 0) { // Trigger every 100ms + g_pika_rm->WakeUpBinlogSync(); + } + } +} +*/ + Status SyncMasterDB::ConsensusProposeLog(const std::shared_ptr& cmd_ptr) { // If consistency is not required, directly propose the log without waiting for consensus if (!coordinator_.GetISConsistency()) { return coordinator_.ProposeLog(cmd_ptr); } - auto start = std::chrono::steady_clock::now(); LogOffset offset; Status s = coordinator_.AppendEntries(cmd_ptr, offset); // Append the log entry to the coordinator @@ -485,19 +569,22 @@ Status SyncMasterDB::ConsensusProposeLog(const std::shared_ptr& cmd_ptr) { return s; } - // Wait for consensus to be achieved within 10 seconds - while (std::chrono::duration_cast(std::chrono::steady_clock::now() - start).count() < 10) { - // Check if consensus has been achieved for the given log offset - if (checkFinished(offset)) { - return Status::OK(); - } - // TODO: 这里暂时注掉了sleep等待,50ms耗时过长,影响写入链路,后期需要改成条件变量唤醒方式 - //std::this_thread::sleep_for(std::chrono::milliseconds(50)); - } + // // Wait for consensus to be achieved within 10 seconds + // while (std::chrono::duration_cast(std::chrono::steady_clock::now() - start).count() < 10) { + // // Check if consensus has been achieved for the given log offset + // if (checkFinished(offset)) { + // return Status::OK(); + // } + // // TODO: 这里暂时注掉了sleep等待,50ms耗时过长,影响写入链路,后期需要改成条件变量唤醒方式 + // //std::this_thread::sleep_for(std::chrono::milliseconds(50)); + // } - return Status::Timeout("No consistency achieved within 10 seconds"); -} + // return Status::Timeout("No consistency achieved within 10 seconds"); + //LOG(INFO) << "ConsensusProposeLog: Successfully appended cmd " << cmd_ptr->name() + // << " with offset " << offset.ToString() << ", delegating to RocksDBThreadLoop"; + return Status::OK(); +} Status SyncMasterDB::ConsensusProcessLeaderLog(const std::shared_ptr& cmd_ptr, const BinlogItem& attribute) { return coordinator_.ProcessLeaderLog(cmd_ptr, attribute); @@ -515,6 +602,8 @@ std::unordered_map> SyncMasterDB::GetAll return coordinator_.SyncPros().GetAllSlaveNodes(); } +std::shared_ptr SyncMasterDB::GetCommandCollector() { return command_collector_; } + /* SyncSlaveDB */ SyncSlaveDB::SyncSlaveDB(const std::string& db_name) : SyncDB(db_name) { @@ -646,6 +735,9 @@ PikaReplicaManager::PikaReplicaManager() { int port = g_pika_conf->port() + kPortShiftReplServer; pika_repl_client_ = std::make_unique(3000, 60); pika_repl_server_ = std::make_unique(ips, port, 3000); + command_queue_ = std::make_unique(500); + // Initialize CommittedID tracking + last_committed_id_ = LogOffset(); InitDB(); } @@ -662,9 +754,17 @@ void PikaReplicaManager::Start() { LOG(FATAL) << "Start Repl Server Error: " << ret << (ret == net::kCreateThreadError ? ": create thread error " : ": other error"); } + // Start command queue background thread + StartCommandQueueThread(); + // Start RocksDB background thread + StartRocksDBThread(); } void PikaReplicaManager::Stop() { + // Stop RocksDB background thread first + StopRocksDBThread(); + // Stop command queue background thread + StopCommandQueueThread(); pika_repl_client_->Stop(); pika_repl_server_->Stop(); } @@ -698,9 +798,12 @@ void PikaReplicaManager::ProduceWriteQueue(const std::string& ip, int port, std: const std::vector& tasks) { std::lock_guard l(write_queue_mu_); std::string index = ip + ":" + std::to_string(port); + LOG(INFO) << "[Queue] Entering ProduceWriteQueue for " << ip << ":" << port << ", db: " << db_name << ", current queue size: " << write_queues_.size() << ", tasks to add: " << tasks.size(); for (auto& task : tasks) { write_queues_[index][db_name].push(task); } + LOG(INFO) << "[Queue] Added " << tasks.size() << " tasks to queue for " << ip << ":" << port << ", db: " << db_name << ", new queue size: " << write_queues_[index][db_name].size(); + LOG(INFO) << "[Queue] Exiting ProduceWriteQueue"; } int PikaReplicaManager::ConsumeWriteQueue() { @@ -710,7 +813,7 @@ int PikaReplicaManager::ConsumeWriteQueue() { std::lock_guard l(write_queue_mu_); for (auto& iter : write_queues_) { const std::string& ip_port = iter.first; - std::unordered_map>& p_map = iter.second; + std::map>& p_map = iter.second; for (auto& db_queue : p_map) { std::queue& queue = db_queue.second; for (int i = 0; i < kBinlogSendPacketNum; ++i) { @@ -750,9 +853,11 @@ int PikaReplicaManager::ConsumeWriteQueue() { for (auto& to_send : iter.second) { Status s = pika_repl_server_->SendSlaveBinlogChips(ip, port, to_send); if (!s.ok()) { - LOG(WARNING) << "send binlog to " << ip << ":" << port << " failed, " << s.ToString(); + LOG(WARNING) << "SendSlaveBinlogChips to " << iter.first << " failed, " << s.ToString(); to_delete.push_back(iter.first); continue; + } else { + LOG(INFO) << "[Queue] SendSlaveBinlogChips to " << iter.first << " success."; } } } @@ -763,6 +868,9 @@ int PikaReplicaManager::ConsumeWriteQueue() { write_queues_.erase(del_queue); } } + // LOG(INFO) << "[Consume] Entering ConsumeWriteQueue, total queues: " << write_queues_.size(); + // LOG(INFO) << "[Consume] Consumed " << counter << " tasks"; + // LOG(INFO) << "[Consume] Exiting ConsumeWriteQueue"; return counter; } @@ -1181,3 +1289,390 @@ void PikaReplicaManager::BuildBinlogOffset(const LogOffset& offset, InnerMessage boffset->set_term(offset.l_offset.term); boffset->set_index(offset.l_offset.index); } + +// Command Queue related implementations +void PikaReplicaManager::EnqueueCommandBatch(std::shared_ptr batch) { + if (command_queue_) { + bool success = command_queue_->EnqueueBatch(batch); + if (success) { + // Notify the command queue thread that new data is available + { + std::lock_guard lock(command_queue_mutex_); + command_queue_cv_.notify_one(); + } + } else { + LOG(ERROR) << "Failed to enqueue command batch with " << batch->Size() << " commands"; + } + } else { + LOG(ERROR) << "Command queue not initialized"; + } +} + +std::shared_ptr PikaReplicaManager::DequeueCommandBatch() { + if (command_queue_) { + return command_queue_->DequeueBatch(); + } else { + LOG(ERROR) << "Command queue not initialized"; + return nullptr; + } +} + +size_t PikaReplicaManager::GetCommandQueueSize() const { + if (command_queue_) { + return command_queue_->Size(); + } + return 0; +} + +bool PikaReplicaManager::IsCommandQueueEmpty() const { + if (command_queue_) { + return command_queue_->Empty(); + } + return true; +} + +void PikaReplicaManager::StartCommandQueueThread() { + if (command_queue_running_.load()) { + LOG(WARNING) << "Command queue thread is already running"; + return; + } + command_queue_running_.store(true); + command_queue_thread_ = std::make_unique(&PikaReplicaManager::CommandQueueLoop, this); + LOG(INFO) << "Command queue background thread started"; +} + +void PikaReplicaManager::StopCommandQueueThread() { + if (!command_queue_running_.load()) { + return; + } + command_queue_running_.store(false); + // Notify the condition variable to wake up the thread + { + std::lock_guard lock(command_queue_mutex_); + command_queue_cv_.notify_one(); + } + + if (command_queue_thread_ && command_queue_thread_->joinable()) { + command_queue_thread_->join(); + } + LOG(INFO) << "Command queue background thread stopped"; +} + +void PikaReplicaManager::CommandQueueLoop() { + LOG(INFO) << "Command queue loop started"; + while (command_queue_running_.load()) { + try { + std::unique_lock lock(command_queue_mutex_); + // Wait for notification or timeout + command_queue_cv_.wait(lock, [this] { + return !command_queue_running_.load() || !command_queue_->Empty(); + }); + + // Check if we should exit + if (!command_queue_running_.load()) { + break; + } + + // Release lock before processing + lock.unlock(); + + // Continuously process batches until queue is empty + bool processed_any_batch = false; + do { + // Non-blocking dequeue all available batches + auto batches = command_queue_->DequeueAllBatches(); + + if (!batches.empty()) { + // Process all batches + ProcessCommandBatches(batches); + processed_any_batch = true; + LOG(INFO) << "Processed " << batches.size() << " batches, checking for more..."; + } else { + processed_any_batch = false; + } + // Continue processing if we processed any batches (there might be more) + } while (processed_any_batch && command_queue_running_.load()); + } catch (const std::exception& e) { + LOG(ERROR) << "Error in command queue loop: " << e.what(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + } + LOG(INFO) << "Command queue loop ended"; +} + +void PikaReplicaManager::ProcessCommandBatches(const std::vector>& batches) { + if (batches.empty()) { + return; + } + size_t total_commands = 0; + for (const auto& batch : batches) { + total_commands += batch->Size(); + } + LOG(INFO) << "Processing " << batches.size() << " batches with " << total_commands << " total commands"; + + // Merge all commands from all batches for unified batch processing + std::vector> all_commands; + std::map, size_t>> command_to_batch_map; + size_t global_cmd_idx = 0; + for (const auto& batch : batches) { + if (!batch || batch->Empty()) { + continue; + } + for (size_t i = 0; i < batch->commands.size(); ++i) { + all_commands.push_back(batch->commands[i]); + command_to_batch_map[global_cmd_idx] = std::make_pair(batch, i); + global_cmd_idx++; + } + } + + if (all_commands.empty()) { + LOG(WARNING) << "No valid commands found in batches"; + return; + } + + // Use the first batch's database for consensus + std::string db_name = batches[0]->db_name; + std::shared_ptr sync_db = GetSyncMasterDBByName(DBInfo(db_name)); + if (!sync_db) { + LOG(WARNING) << "SyncMasterDB not found for database: " << db_name; + // Call callbacks with error for all batches + for (const auto& batch : batches) { + for (size_t i = 0; i < batch->callbacks.size(); ++i) { + if (batch->callbacks[i]) { + batch->callbacks[i](LogOffset(), pstd::Status::NotFound("Database not found")); + } + } + } + return; + } + + try { + // Process each command individually using AppendEntries + std::vector all_offsets; + all_offsets.resize(all_commands.size()); + + // Process each command using the original single-command logic + for (size_t cmd_idx = 0; cmd_idx < all_commands.size(); ++cmd_idx) { + const auto& cmd_ptr = all_commands[cmd_idx]; + LogOffset cmd_offset; + + pstd::Status s = sync_db->ConsensusProposeLog(cmd_ptr); + if (!s.ok()) { + LOG(ERROR) << "Failed to " << (sync_db->GetISConsistency() ? "append" : "propose") + << " command " << cmd_ptr->name() << ": " << s.ToString(); + // Call callbacks with error for remaining commands + for (size_t error_idx = cmd_idx; error_idx < all_commands.size(); ++error_idx) { + auto& batch_info = command_to_batch_map[error_idx]; + auto batch = batch_info.first; + size_t batch_cmd_idx = batch_info.second; + if (batch_cmd_idx < batch->callbacks.size() && batch->callbacks[batch_cmd_idx]) { + batch->callbacks[batch_cmd_idx](LogOffset(), s); + } + } + return; + } + // Get the prepared ID as the offset + cmd_offset = sync_db->GetPreparedId(); + + all_offsets[cmd_idx] = cmd_offset; + + // Update individual batch offsets + auto& batch_info = command_to_batch_map[cmd_idx]; + auto batch = batch_info.first; + size_t batch_cmd_idx = batch_info.second; + + // Ensure the batch has enough space for offsets + if (batch->binlog_offsets.size() <= batch_cmd_idx) { + batch->binlog_offsets.resize(batch->commands.size()); + } + batch->binlog_offsets[batch_cmd_idx] = cmd_offset; + } + + LOG(INFO) << "Successfully processed " << all_commands.size() << " commands individually"; + + // Create BatchGroup with the last offset as end_offset + LogOffset end_offset = all_offsets.back(); + auto batch_group = std::make_shared(batches, end_offset); + + // Enqueue the BatchGroup + { + std::lock_guard lock(pending_batch_groups_mutex_); + pending_batch_groups_.push(batch_group); + } + LOG(INFO) << "Created BatchGroup with " << batches.size() << " batches and end_offset: " << end_offset.ToString(); + } catch (const std::exception& e) { + LOG(ERROR) << "Exception in ProcessCommandBatches: " << e.what(); + // Call callbacks with error for all batches + for (const auto& batch : batches) { + for (size_t i = 0; i < batch->callbacks.size(); ++i) { + if (batch->callbacks[i]) { + batch->callbacks[i](LogOffset(), pstd::Status::Corruption("Exception in processing")); + } + } + } + } + + // Signal auxiliary thread once after processing all batches + g_pika_server->SignalAuxiliary(); +} + +// RocksDB Background Thread Implementation +void PikaReplicaManager::StartRocksDBThread() { + if (rocksdb_thread_running_.load()) { + LOG(WARNING) << "RocksDB background thread is already running"; + return; + } + rocksdb_thread_running_.store(true); + rocksdb_back_thread_ = std::make_unique(&PikaReplicaManager::RocksDBThreadLoop, this); + LOG(INFO) << "RocksDB background thread started"; +} + +void PikaReplicaManager::StopRocksDBThread() { + if (!rocksdb_thread_running_.load()) { + return; + } + rocksdb_thread_running_.store(false); + // Notify the condition variable to wake up the thread + { + std::lock_guard lock(rocksdb_thread_mutex_); + rocksdb_thread_cv_.notify_one(); + } + if (rocksdb_back_thread_ && rocksdb_back_thread_->joinable()) { + rocksdb_back_thread_->join(); + } + LOG(INFO) << "RocksDB background thread stopped"; +} + +void PikaReplicaManager::RocksDBThreadLoop() { + LOG(INFO) << "RocksDB_back_thread started"; + while (rocksdb_thread_running_.load()) { + try { + std::unique_lock lock(rocksdb_thread_mutex_); + // Wait for CommittedID notification or shutdown signal + rocksdb_thread_cv_.wait(lock, [this] { + bool has_pending; + { + std::lock_guard pending_lock(pending_batch_groups_mutex_); + has_pending = !pending_batch_groups_.empty(); + } + return !rocksdb_thread_running_.load() || has_pending; + }); + // Check if we should exit + if (!rocksdb_thread_running_.load()) { + break; + } + lock.unlock(); + // Get current committed ID + LogOffset committed_id; + { + std::lock_guard id_lock(last_committed_id_mutex_); + committed_id = last_committed_id_; + } + + // Process committed BatchGroups + if (committed_id.IsValid()) { + size_t groups_processed = ProcessCommittedBatchGroups(committed_id); + if (groups_processed > 0) { + LOG(INFO) << "Processed " << groups_processed << " committed BatchGroups"; + } + } + } catch (const std::exception& e) { + LOG(ERROR) << "Error in RocksDB thread loop: " << e.what(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + } + LOG(INFO) << "RocksDB_back_thread ended"; +} + +size_t PikaReplicaManager::ProcessCommittedBatchGroups(const LogOffset& committed_id) { + std::queue> groups_to_process; + // Get pending BatchGroups and separate committed from uncommitted + { + std::lock_guard lock(pending_batch_groups_mutex_); + while (!pending_batch_groups_.empty()) { + auto batch_group = pending_batch_groups_.front(); + // Check if this BatchGroup is committed by comparing its end_offset with committed_id + bool is_committed = (batch_group->end_offset <= committed_id); + if (is_committed) { + groups_to_process.push(batch_group); + pending_batch_groups_.pop(); + LOG(INFO) << "BatchGroup with end_offset " << batch_group->end_offset.ToString() + << " is committed (committed_id: " << committed_id.ToString() << ")"; + } else { + // First uncommitted BatchGroup found, stop processing + static LogOffset last_uncommitted_offset; + if (last_uncommitted_offset != batch_group->end_offset) { + LOG(INFO) << "BatchGroup with end_offset " << batch_group->end_offset.ToString() + << " is not yet committed (committed_id: " << committed_id.ToString() << ")"; + last_uncommitted_offset = batch_group->end_offset; + } + break; + } + } + } + // Store the number of groups to process for return value + size_t groups_count = groups_to_process.size(); + // Process committed BatchGroups + while (!groups_to_process.empty()) { + auto batch_group = groups_to_process.front(); + groups_to_process.pop(); + //LOG(INFO) << "Processing committed BatchGroup with " << batch_group->BatchCount() + // << " batches for RocksDB Put operations and client callbacks"; + + // Process all batches in this group + for (const auto& batch : batch_group->batches) { + // Execute RocksDB Put operations for each command in the batch + for (size_t i = 0; i < batch->commands.size(); ++i) { + const auto& cmd_ptr = batch->commands[i]; + LOG(INFO) << "[RocksDB] RocksDBThreadLoop" << "Processing BatchGroup with end_offset: " << batch_group->end_offset.ToString(); + try { + // Execute the command (this will perform the RocksDB Put operation) + cmd_ptr->Execute(); + // auto tmp_conn = cmd_ptr->GetConn(); + // auto pc = dynamic_cast(tmp_conn.get()); + // std::shared_ptr resp_ptr = std::make_shared(); + // *resp_ptr = std::move(cmd_ptr->res().message()); + // pc->resp_num--; + // pc->resp_array.push_back(resp_ptr); + // pc->TryWriteResp(); + if (cmd_ptr->res().ok()) { + LOG(INFO) << "Successfully executed RocksDB Put for command: " << cmd_ptr->name(); + // Execute callback with BatchGroup's end_offset (all commands in the group get the same offset) + if (i < batch->callbacks.size() && batch->callbacks[i]) { + LOG(INFO) << "[Callback] RocksDBThreadLoop" + << ", Executing callback for end_offset: " << batch_group->end_offset.ToString(); + batch->callbacks[i](batch_group->end_offset, pstd::Status::OK()); + } + } else { + LOG(WARNING) << "Command " << cmd_ptr->name() << " execution failed: " << cmd_ptr->res().message(); + // Execute callback with command error, still use BatchGroup's end_offset + if (i < batch->callbacks.size() && batch->callbacks[i]) { + batch->callbacks[i](batch_group->end_offset, pstd::Status::Corruption("Command execution failed: " + cmd_ptr->res().message())); + } + } + } catch (const std::exception& e) { + LOG(ERROR) << "Exception during command execution: " << e.what(); + // Execute callback with exception error, still use BatchGroup's end_offset + if (i < batch->callbacks.size() && batch->callbacks[i]) { + batch->callbacks[i](batch_group->end_offset, pstd::Status::Corruption("Exception during execution: " + std::string(e.what()))); + } + } + } + } + } + + return groups_count; +} + +void PikaReplicaManager::NotifyCommittedID(const LogOffset& committed_id) { + { + std::lock_guard lock(last_committed_id_mutex_); + last_committed_id_ = committed_id; + } + // Notify RocksDB thread + { + std::lock_guard lock(rocksdb_thread_mutex_); + rocksdb_thread_cv_.notify_one(); + } + LOG(INFO) << "Notified RocksDB thread of new CommittedID: " << committed_id.ToString(); +} diff --git a/src/pika_server.cc b/src/pika_server.cc index a7d50b1e71..74f439060a 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -24,6 +24,7 @@ #include "include/pika_monotonic_time.h" #include "include/pika_rm.h" #include "include/pika_server.h" +#include "include/pika_command_collector.h" using pstd::Status; extern PikaServer* g_pika_server; @@ -169,6 +170,14 @@ void PikaServer::Start() { } */ + if (g_pika_conf->command_batch_enabled()) { + auto master_db = g_pika_rm->GetSyncMasterDBByName(DBInfo(g_pika_conf->default_db())); + if (master_db) { + auto command_collector = master_db->GetCommandCollector(); + LOG(INFO) << "Command batch enabled, command collector accessed successfully"; + } + } + ret = pika_client_processor_->Start(); if (ret != net::kSuccess) { dbs_.clear(); @@ -1096,9 +1105,17 @@ std::unordered_map PikaServer::ServerAllDBStat() { re int PikaServer::SendToPeer() { return g_pika_rm->ConsumeWriteQueue(); } -void PikaServer::SignalAuxiliary() { pika_auxiliary_thread_->cv_.notify_one(); } +void PikaServer::SignalAuxiliary() { + // LOG(INFO) << "[Signal] SignalAuxiliary called, notifying auxiliary thread"; + pika_auxiliary_thread_->cv_.notify_one(); +} -Status PikaServer::TriggerSendBinlogSync() { return g_pika_rm->WakeUpBinlogSync(); } +Status PikaServer::TriggerSendBinlogSync() { + // LOG(INFO) << "[Trigger] TriggerSendBinlogSync called, calling WakeUpBinlogSync"; + Status s = g_pika_rm->WakeUpBinlogSync(); + // LOG(INFO) << "[Trigger] WakeUpBinlogSync result: " << s.ToString(); + return s; +} int PikaServer::PubSubNumPat() { return pika_pubsub_thread_->PubSubNumPat(); } diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index 3066a62759..0c4aa52cae 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -14,6 +14,7 @@ Redis::Redis(Storage* const s, const DataType& type) lock_mgr_(std::make_shared(1000, 0, std::make_shared())), small_compaction_threshold_(5000), small_compaction_duration_threshold_(10000) { + default_write_options_.disableWAL = true; statistics_store_ = std::make_unique>(); scan_cursors_store_ = std::make_unique>(); scan_cursors_store_->SetCapacity(5000); diff --git a/tests/integration/clean_start.sh b/tests/integration/clean_start.sh new file mode 100644 index 0000000000..f527ccedfc --- /dev/null +++ b/tests/integration/clean_start.sh @@ -0,0 +1,55 @@ +#!/bin/bash + +cd /home/pika/caiyu/pikiwidb || exit + +clean_ports() { + echo "Checking and cleaning ports..." + + sudo killall -9 pika + + sleep 1 +} + +echo "Building project..." +sudo ./build.sh +if [ $? -ne 0 ]; then + echo "Build failed!" + exit 1 +fi +echo "Build successful." + +echo "Cleaning up test directory..." +sudo rm -rf ./output/pacifica_test/ +echo "Cleanup completed." + +clean_ports + +cd output || exit + +echo "Starting master and slave servers..." +sudo ../tests/integration/start_master_and_slave.sh +sleep 10 + +echo "Setting up strong consistency replication..." + +redis-cli -p 9302 slaveof 127.0.0.1 9301 strong +sleep 1 +echo "Replication setup successful." + +echo "Running benchmark..." + +# redis-cli -p 9301 set key "12313" +redis-benchmark -p 9301 -t set -n 100000 -c 10 --threads 1 +echo "Benchmark finished." + +echo -e "\n==== 主节点 INFO 日志 ====" +tail -n 150 ./pacifica_test/master/log/pika.INFO + +echo -e "\n==== 主节点 WARNING 日志 ====" +tail -n 150 ./pacifica_test/master/log/pika.WARNING + +echo -e "\n==== 从节点 INFO 日志 ====" +tail -n 150 ./pacifica_test/slave1/log/pika.INFO + +echo -e "\n==== 从节点 WARNING 日志 ====" +tail -n 150 ./pacifica_test/slave1/log/pika.WARNING \ No newline at end of file diff --git a/tests/integration/start_master_and_slave.sh b/tests/integration/start_master_and_slave.sh index d3d0f1257d..5517ade141 100755 --- a/tests/integration/start_master_and_slave.sh +++ b/tests/integration/start_master_and_slave.sh @@ -1,135 +1,135 @@ -#!/bin/bash -# This script is used by .github/workflows/pika.yml, Do not modify this file unless you know what you are doing. -# it's used to start pika master and slave, running path: build -cp ../conf/pika.conf ./pika_single.conf -cp ../conf/pika.conf ./pika_master.conf -cp ../conf/pika.conf ./pika_slave.conf -cp ../conf/pika.conf ./pika_rename.conf -cp ../conf/pika.conf ./pika_master_rename.conf -cp ../conf/pika.conf ./pika_slave_rename.conf -cp ../conf/pika.conf ./pika_acl_both_password.conf -cp ../conf/pika.conf ./pika_acl_only_admin_password.conf -cp ../conf/pika.conf ./pika_has_other_acl_user.conf -# Create folders for storing data on the primary and secondary nodes -mkdir master_data -mkdir slave_data -# Example Change the location for storing data on primary and secondary nodes in the configuration file -sed -i.bak \ - -e 's|databases : 1|databases : 2|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_single.conf - -sed -i.bak \ - -e 's|databases : 1|databases : 2|' \ - -e 's|port : 9221|port : 9241|' \ - -e 's|log-path : ./log/|log-path : ./master_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./master_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./master_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./master_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./master_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_master.conf - -sed -i.bak \ - -e 's|databases : 1|databases : 2|' \ - -e 's|port : 9221|port : 9231|' \ - -e 's|log-path : ./log/|log-path : ./slave_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./slave_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./slave_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./slave_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./slave_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_slave.conf - -sed -i.bak \ - -e 's|# rename-command : FLUSHALL 360flushall|rename-command : FLUSHALL 360flushall|' \ - -e 's|# rename-command : FLUSHDB 360flushdb|rename-command : FLUSHDB 360flushdb|' \ - -e 's|databases : 1|databases : 2|' \ - -e 's|port : 9221|port : 9251|' \ - -e 's|log-path : ./log/|log-path : ./rename_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./rename_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./rename_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./rename_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./rename_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_rename.conf - -sed -i.bak \ - -e 's|requirepass :|requirepass : requirepass|' \ - -e 's|masterauth :|masterauth : requirepass|' \ - -e 's|# userpass :|userpass : userpass|' \ - -e 's|# userblacklist :|userblacklist : flushall,flushdb|' \ - -e 's|port : 9221|port : 9261|' \ - -e 's|log-path : ./log/|log-path : ./acl1_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./acl1_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./acl1_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./acl1_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl1_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_acl_both_password.conf - -sed -i.bak \ - -e 's|requirepass :|requirepass : requirepass|' \ - -e 's|masterauth :|masterauth : requirepass|' \ - -e 's|# userblacklist :|userblacklist : flushall,flushdb|' \ - -e 's|port : 9221|port : 9271|' \ - -e 's|log-path : ./log/|log-path : ./acl2_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./acl2_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./acl2_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./acl2_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl2_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_acl_only_admin_password.conf - -sed -i.bak \ - -e 's|requirepass :|requirepass : requirepass|' \ - -e 's|masterauth :|masterauth : requirepass|' \ - -e 's|# userpass :|userpass : userpass|' \ - -e 's|# userblacklist :|userblacklist : flushall,flushdb|' \ - -e 's|port : 9221|port : 9281|' \ - -e 's|log-path : ./log/|log-path : ./acl3_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./acl3_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./acl3_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./acl3_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl3_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_has_other_acl_user.conf -echo -e '\nuser : limit on >limitpass ~* +@all &*' >> ./pika_has_other_acl_user.conf - -sed -i '' \ - -e 's|# rename-command : FLUSHDB 360flushdb|rename-command : FLUSHDB 360flushdb|' \ - -e 's|port : 9221|port : 9291|' \ - -e 's|log-path : ./log/|log-path : ./master_rename_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./master_rename_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./master_rename_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./master_rename_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./master_rename_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_master_rename.conf - -sed -i '' \ - -e 's|# rename-command : FLUSHDB 360flushdb|rename-command : FLUSHDB 360flushdb|' \ - -e 's|port : 9221|port : 9301|' \ - -e 's|log-path : ./log/|log-path : ./slave_rename_data/log/|' \ - -e 's|db-path : ./db/|db-path : ./slave_rename_data/db/|' \ - -e 's|dump-path : ./dump/|dump-path : ./slave_rename_data/dump/|' \ - -e 's|pidfile : ./pika.pid|pidfile : ./slave_rename_data/pika.pid|' \ - -e 's|db-sync-path : ./dbsync/|db-sync-path : ./slave_rename_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' \ - -e 's|timeout : 60|timeout : 500|' ./pika_slave_rename.conf - -# Start three nodes -./pika -c ./pika_single.conf -./pika -c ./pika_master.conf -./pika -c ./pika_slave.conf -./pika -c ./pika_rename.conf -./pika -c ./pika_acl_both_password.conf -./pika -c ./pika_acl_only_admin_password.conf -./pika -c ./pika_has_other_acl_user.conf -./pika -c ./pika_master_rename.conf -./pika -c ./pika_slave_rename.conf -#ensure both master and slave are ready -sleep 10 +# #!/bin/bash +# # This script is used by .github/workflows/pika.yml, Do not modify this file unless you know what you are doing. +# # it's used to start pika master and slave, running path: build +# cp ../conf/pika.conf ./pika_single.conf +# cp ../conf/pika.conf ./pika_master.conf +# cp ../conf/pika.conf ./pika_slave.conf +# cp ../conf/pika.conf ./pika_rename.conf +# cp ../conf/pika.conf ./pika_master_rename.conf +# cp ../conf/pika.conf ./pika_slave_rename.conf +# cp ../conf/pika.conf ./pika_acl_both_password.conf +# cp ../conf/pika.conf ./pika_acl_only_admin_password.conf +# cp ../conf/pika.conf ./pika_has_other_acl_user.conf +# # Create folders for storing data on the primary and secondary nodes +# mkdir master_data +# mkdir slave_data +# # Example Change the location for storing data on primary and secondary nodes in the configuration file +# sed -i.bak \ +# -e 's|databases : 1|databases : 2|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_single.conf + +# sed -i.bak \ +# -e 's|databases : 1|databases : 2|' \ +# -e 's|port : 9221|port : 9241|' \ +# -e 's|log-path : ./log/|log-path : ./master_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./master_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./master_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./master_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./master_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_master.conf + +# sed -i.bak \ +# -e 's|databases : 1|databases : 2|' \ +# -e 's|port : 9221|port : 9231|' \ +# -e 's|log-path : ./log/|log-path : ./slave_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./slave_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./slave_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./slave_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./slave_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_slave.conf + +# sed -i.bak \ +# -e 's|# rename-command : FLUSHALL 360flushall|rename-command : FLUSHALL 360flushall|' \ +# -e 's|# rename-command : FLUSHDB 360flushdb|rename-command : FLUSHDB 360flushdb|' \ +# -e 's|databases : 1|databases : 2|' \ +# -e 's|port : 9221|port : 9251|' \ +# -e 's|log-path : ./log/|log-path : ./rename_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./rename_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./rename_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./rename_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./rename_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_rename.conf + +# sed -i.bak \ +# -e 's|requirepass :|requirepass : requirepass|' \ +# -e 's|masterauth :|masterauth : requirepass|' \ +# -e 's|# userpass :|userpass : userpass|' \ +# -e 's|# userblacklist :|userblacklist : flushall,flushdb|' \ +# -e 's|port : 9221|port : 9261|' \ +# -e 's|log-path : ./log/|log-path : ./acl1_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./acl1_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./acl1_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./acl1_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl1_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_acl_both_password.conf + +# sed -i.bak \ +# -e 's|requirepass :|requirepass : requirepass|' \ +# -e 's|masterauth :|masterauth : requirepass|' \ +# -e 's|# userblacklist :|userblacklist : flushall,flushdb|' \ +# -e 's|port : 9221|port : 9271|' \ +# -e 's|log-path : ./log/|log-path : ./acl2_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./acl2_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./acl2_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./acl2_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl2_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_acl_only_admin_password.conf + +# sed -i.bak \ +# -e 's|requirepass :|requirepass : requirepass|' \ +# -e 's|masterauth :|masterauth : requirepass|' \ +# -e 's|# userpass :|userpass : userpass|' \ +# -e 's|# userblacklist :|userblacklist : flushall,flushdb|' \ +# -e 's|port : 9221|port : 9281|' \ +# -e 's|log-path : ./log/|log-path : ./acl3_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./acl3_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./acl3_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./acl3_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl3_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_has_other_acl_user.conf +# echo -e '\nuser : limit on >limitpass ~* +@all &*' >> ./pika_has_other_acl_user.conf + +# sed -i '' \ +# -e 's|# rename-command : FLUSHDB 360flushdb|rename-command : FLUSHDB 360flushdb|' \ +# -e 's|port : 9221|port : 9291|' \ +# -e 's|log-path : ./log/|log-path : ./master_rename_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./master_rename_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./master_rename_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./master_rename_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./master_rename_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_master_rename.conf + +# sed -i '' \ +# -e 's|# rename-command : FLUSHDB 360flushdb|rename-command : FLUSHDB 360flushdb|' \ +# -e 's|port : 9221|port : 9301|' \ +# -e 's|log-path : ./log/|log-path : ./slave_rename_data/log/|' \ +# -e 's|db-path : ./db/|db-path : ./slave_rename_data/db/|' \ +# -e 's|dump-path : ./dump/|dump-path : ./slave_rename_data/dump/|' \ +# -e 's|pidfile : ./pika.pid|pidfile : ./slave_rename_data/pika.pid|' \ +# -e 's|db-sync-path : ./dbsync/|db-sync-path : ./slave_rename_data/dbsync/|' \ +# -e 's|#daemonize : yes|daemonize : yes|' \ +# -e 's|timeout : 60|timeout : 500|' ./pika_slave_rename.conf + +# # Start three nodes +# ./pika -c ./pika_single.conf +# ./pika -c ./pika_master.conf +# ./pika -c ./pika_slave.conf +# ./pika -c ./pika_rename.conf +# ./pika -c ./pika_acl_both_password.conf +# ./pika -c ./pika_acl_only_admin_password.conf +# ./pika -c ./pika_has_other_acl_user.conf +# ./pika -c ./pika_master_rename.conf +# ./pika -c ./pika_slave_rename.conf +# #ensure both master and slave are ready +# sleep 10 # 创建PacificA一致性测试的数据目录 mkdir -p pacifica_test/master