Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,25 @@ replication-num : 0
# The default value of consensus-level is 0, which means this feature is not enabled.
consensus-level : 0

# Batch processing configuration (used by both command collection and consensus mechanism)
# The maximum number of items in a batch (both command collection and consensus)
# Default: 100
batch-size : 100

# Batch processing configuration (used by both command collection and consensus mechanism)
# The maximum waiting batch for (both command collection and consensus)
# Default: 5
batch-max-wait-time : 5

# The timeout in milliseconds for waiting for a batch ACK from a slave.
# Default: 500
replication-ack-timeout : 500

# Enable command batch processing for better performance
# When enabled, write commands will be collected and processed in batches
# Default: no
command-batch-enabled : yes

# The Prefix of dump file's name.
# All the files that generated by command "bgsave" will be name with this prefix.
dump-prefix :
Expand Down
4 changes: 4 additions & 0 deletions include/pika_binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ class Binlog : public pstd::noncopyable {
* Set Producer pro_num and pro_offset with lock
*/
pstd::Status SetProducerStatus(uint32_t pro_num, uint64_t pro_offset, uint32_t term = 0, uint64_t index = 0);

// Force sync data to disk
pstd::Status Sync();

// Need to hold Lock();
pstd::Status Truncate(uint32_t pro_num, uint64_t pro_offset, uint64_t index);

Expand Down
4 changes: 2 additions & 2 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class PikaClientConn : public net::RedisConn {
std::vector<std::shared_ptr<std::string>> resp_array;

std::shared_ptr<TimeStat> time_stat_;

void TryWriteResp();
private:
net::ServerThread* const server_thread_;
std::string current_db_;
Expand All @@ -134,7 +134,7 @@ class PikaClientConn : public net::RedisConn {
void ProcessMonitor(const PikaCmdArgsType& argv);

void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);
void TryWriteResp();
// void TryWriteResp();
};

struct ClientInfo {
Expand Down
91 changes: 91 additions & 0 deletions include/pika_command_collector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_COMMAND_COLLECTOR_H_
#define PIKA_COMMAND_COLLECTOR_H_

#include <atomic>
#include <condition_variable>
#include <deque>
#include <functional>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>
#include <chrono>
#include <optional>

#include "include/pika_command.h"
#include "include/pika_define.h"
#include "pstd/include/pstd_status.h"

#include "include/pika_consensus.h"

/**
* @brief PikaCommandCollector is used to collect write commands and process them in batches
*
* Main functions:
* 1. Collect write commands and process them in optimized batches after reaching the threshold
* 2. Handle the conflict of the same key (the later command will overwrite the earlier command)
* 3. Send commands in batches to the consensus coordinator with batch-level synchronization
* 4. Support asynchronous callback notification of command processing results
* 5. Track performance metrics for batch processing

*/
class PikaCommandCollector {
public:
// Callback function type after command processing is completed
using CommandCallback = std::function<void(const LogOffset& offset, pstd::Status status)>;

/**
* @brief constructor
* @param coordinator consensus coordinator reference
* @param batch_max_wait_time maximum wait time in milliseconds
*/
// Constructor with raw pointer (original)
PikaCommandCollector(ConsensusCoordinator* coordinator, int batch_max_wait_time = 5);

// Constructor with shared_ptr (for compatibility with make_shared calls)
PikaCommandCollector(std::shared_ptr<ConsensusCoordinator> coordinator, int batch_max_wait_time = 5);

~PikaCommandCollector();

/**
* @brief Add command to collector
* @param cmd_ptr command pointer
* @param callback callback function after processing is completed
* @return whether the addition was successful
*/
bool AddCommand(std::shared_ptr<Cmd> cmd_ptr, CommandCallback callback);

/**
* @brief Set the batch max wait time
* @param batch_max_wait_time maximum wait time in milliseconds
*/
void SetBatchMaxWaitTime(int batch_max_wait_time);

/**
* @brief Get batch processing statistics
* @return Pair of (total_processed_commands, total_batches)
*/
std::pair<uint64_t, uint64_t> GetBatchStats() const;

private:
//Consensus coordinator reference
ConsensusCoordinator* coordinator_;

// Batch processing configuration
std::atomic<int> batch_max_wait_time_;

// Batch statistics
std::atomic<uint64_t> total_processed_{0};
std::atomic<uint64_t> total_batches_{0};
};

#endif // PIKA_COMMAND_COLLECTOR_H_
99 changes: 99 additions & 0 deletions include/pika_command_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_COMMAND_QUEUE_H_
#define PIKA_COMMAND_QUEUE_H_

#include <memory>
#include <queue>
#include <vector>
#include <string>
#include <functional>
#include <atomic>
#include <condition_variable>
#include <mutex>

#include "pstd/include/pstd_mutex.h"
#include "pstd/include/env.h"
#include "include/pika_command.h"
#include "include/pika_define.h"
#include "pstd/include/env.h"
#include "include/pika_define.h"

// Callback function type for command completion notification
using CommandCallback = std::function<void(const LogOffset&, pstd::Status)>;

// Structure representing a batch of commands
struct CommandBatch {
std::vector<std::shared_ptr<Cmd>> commands;
std::vector<CommandCallback> callbacks;
uint64_t batch_id;
uint64_t create_time;
std::string db_name;
std::vector<LogOffset> binlog_offsets; // Binlog offsets for each command

CommandBatch(const std::vector<std::shared_ptr<Cmd>>& cmds,
const std::vector<CommandCallback>& cbs,
const std::string& db)
: commands(cmds), callbacks(cbs), db_name(db) {
static std::atomic<uint64_t> next_id{1};
batch_id = next_id.fetch_add(1);
create_time = pstd::NowMicros();
}

bool Empty() const {
return commands.empty();
}

size_t Size() const {
return commands.size();
}
};

// New structure to group multiple CommandBatches for RocksDB processing
struct BatchGroup {
std::vector<std::shared_ptr<CommandBatch>> batches;
LogOffset end_offset; // Only store the final offset of the last batch
BatchGroup() = default;
BatchGroup(const std::vector<std::shared_ptr<CommandBatch>>& batches,
const LogOffset& final_offset)
: batches(batches), end_offset(final_offset) {}
bool Empty() const { return batches.empty(); }
size_t BatchCount() const { return batches.size(); }
};

// Thread-safe command queue for batched command processing
class CommandQueue {
public:
explicit CommandQueue(size_t max_size);
~CommandQueue();

// Enqueue a command batch (blocking if queue is full)
bool EnqueueBatch(std::shared_ptr<CommandBatch> batch);

// Dequeue a command batch (blocking if queue is empty)
std::shared_ptr<CommandBatch> DequeueBatch();

// Dequeue all available batches (non-blocking)
std::vector<std::shared_ptr<CommandBatch>> DequeueAllBatches();

// Get current queue size
size_t Size() const;

// Check if queue is empty
bool Empty() const;

// Shutdown the queue
void Shutdown();

private:
std::queue<std::shared_ptr<CommandBatch>> cmd_queue_;
mutable std::mutex queue_mutex_;
std::condition_variable queue_cv_;
size_t max_size_;
std::atomic<bool> shutdown_{false};
};

#endif // PIKA_COMMAND_QUEUE_H_
62 changes: 60 additions & 2 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,21 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return sync_thread_num_;
}

bool command_batch_enabled() {
std::shared_lock l(rwlock_);
return command_batch_enabled_;
}

int batch_size() {
std::shared_lock l(rwlock_);
return batch_size_;
}

int batch_max_wait_time() {
std::shared_lock l(rwlock_);
return batch_max_wait_time_;
}
int sync_binlog_thread_num() {
std::shared_lock l(rwlock_);
return sync_binlog_thread_num_;
Expand Down Expand Up @@ -350,6 +365,16 @@ class PikaConf : public pstd::BaseConf {
int max_conn_rbuf_size() { return max_conn_rbuf_size_.load(); }
int consensus_level() { return consensus_level_.load(); }
int replication_num() { return replication_num_.load(); }
int replication_ack_timeout() {
std::shared_lock l(rwlock_);
return replication_ack_timeout_;
}

// Function to set replication acknowledgment timeout (used by batch system)
void SetReplicationAckTimeout(int timeout) {
std::lock_guard l(rwlock_);
replication_ack_timeout_ = timeout;
}
int rate_limiter_mode() {
std::shared_lock l(rwlock_);
return rate_limiter_mode_;
Expand Down Expand Up @@ -436,7 +461,6 @@ class PikaConf : public pstd::BaseConf {
bool is_admin_cmd(const std::string& cmd) {
return admin_cmd_set_.find(cmd) != admin_cmd_set_.end();
}

// Immutable config items, we don't use lock.
bool daemonize() { return daemonize_; }
bool rtc_cache_read_enabled() { return rtc_cache_read_enabled_; }
Expand All @@ -462,6 +486,23 @@ class PikaConf : public pstd::BaseConf {
std::lock_guard l(rwlock_);
thread_num_ = value;
}

void SetCommandBatchEnabled(const bool value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("command-batch-enabled", value ? "yes" : "no");
command_batch_enabled_ = value;
}

void SetCommandBatchSize(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("batch-size", std::to_string(value));
batch_size_ = value;
}
void SetCommandBatchMaxWaitTime(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("batch-max-wait-time", std::to_string(value));
batch_max_wait_time_ = value;
}
void SetTimeout(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("timeout", std::to_string(value));
Expand Down Expand Up @@ -665,6 +706,17 @@ class PikaConf : public pstd::BaseConf {
TryPushDiffCommands("max-conn-rbuf-size", std::to_string(value));
max_conn_rbuf_size_.store(value);
}
void SetConsensusBatchSize(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("batch-size", std::to_string(value));
batch_size_ = value;
}
// This method is used by config update system
void UpdateReplicationAckTimeout(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("replication-ack-timeout", std::to_string(value));
replication_ack_timeout_ = value;
}
void SetMaxCacheFiles(const int& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("max-cache-files", std::to_string(value));
Expand Down Expand Up @@ -929,6 +981,12 @@ class PikaConf : public pstd::BaseConf {
std::string server_id_;
std::string run_id_;
std::string replication_id_;

// 命令批处理相关配置
bool command_batch_enabled_ = true;
int batch_size_ = 100;
int batch_max_wait_time_ = 5;
int replication_ack_timeout_ = 5000;
std::string requirepass_;
std::string masterauth_;
std::string userpass_;
Expand Down Expand Up @@ -1047,7 +1105,7 @@ class PikaConf : public pstd::BaseConf {
int throttle_bytes_per_second_ = 200 << 20; // 200MB/s
int max_rsync_parallel_num_ = kMaxRsyncParallelNum;
std::atomic_int64_t rsync_timeout_ms_ = 1000;

/*
kUninitialized = 0, // unknown setting
kDisable = 1, // disable perf stats
Expand Down
Loading