Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,9 @@ set(LZ4_INCLUDE_DIR ${INSTALL_INCLUDEDIR})
ExternalProject_Add(zlib
DEPENDS
URL
https://github.com/madler/zlib/releases/download/v1.2.13/zlib-1.2.13.tar.gz
https://github.com/madler/zlib/releases/download/v1.3.1/zlib-1.3.1.tar.gz
URL_HASH
MD5=9b8aa094c4e5765dabf4da391f00d15c
MD5=9855b6d802d7fe5b7bd5b196a2271655
DOWNLOAD_NO_PROGRESS
1
UPDATE_COMMAND
Expand Down
46 changes: 34 additions & 12 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ slow-cmd-thread-pool-size : 1
admin-thread-pool-size : 2

# Slow cmd list e.g. hgetall, mset
slow-cmd-list :
slow-cmd-list :

# List of commands considered as administrative. These commands will be handled by the admin thread pool. Modify this list as needed.
# Default commands: info, ping, monitor
Expand Down Expand Up @@ -95,7 +95,7 @@ proto-max-bulk-len : 512M
# If <= 0, a proper value is automatically calculated.
# (usually 1/8 of writer-buffer-size, rounded up to a multiple of 4KB)
# Supported Units [K|M|G], arena-block-size default unit is in [bytes].
arena-block-size :
arena-block-size :

# Timeout of Pika's connection, counting down starts When there are no requests
# on a connection (it enters sleep state), when the countdown reaches 0, the connection
Expand All @@ -109,12 +109,12 @@ timeout : 60
# [NOTICE] If this admin password is the same as user password (including both being empty),
# in this scenario, users are not subject to the restrictions imposed by the userblacklist.
# PS: "user password" refers to value of the parameter below: userpass.
requirepass :
requirepass :

# Password for replication verify, used for authentication when a slave
# connects to a master to request replication.
# [NOTICE] The value of this parameter must match the "requirepass" setting on the master.
masterauth :
masterauth :

# The [password of user], which is empty by default.
# [NOTICE] If this user password is the same as admin password (including both being empty),
Expand Down Expand Up @@ -153,9 +153,28 @@ replication-num : 0
# The default value of consensus-level is 0, which means this feature is not enabled.
consensus-level : 0

# Batch processing configuration (used by both command collection and consensus mechanism)
# The maximum number of items in a batch (both command collection and consensus)
# Default: 100
batch-size : 100

# Batch processing configuration (used by both command collection and consensus mechanism)
# The maximum waiting batch for (both command collection and consensus)
# Default: 5
batch-max-wait-time : 5

# The timeout in milliseconds for waiting for a batch ACK from a slave.
# Default: 500
replication-ack-timeout : 500

# Enable command batch processing for better performance
# When enabled, write commands will be collected and processed in batches
# Default: no
command-batch-enabled : yes

# The Prefix of dump file's name.
# All the files that generated by command "bgsave" will be name with this prefix.
dump-prefix :
dump-prefix :

# daemonize [yes | no].
#daemonize : yes
Expand Down Expand Up @@ -536,13 +555,13 @@ cache-num : 16
# cache-model 0:cache_none 1:cache_read
cache-model : 1
# cache-type: string, set, zset, list, hash, bit
cache-type: string, set, zset, list, hash, bit
cache-type : string, set, zset, list, hash, bit

# Set the maximum number of elements in the cache of the Set, list, Zset data types
cache-value-item-max-size: 1024
cache-value-item-max-size : 1024

# Sets the maximum number of bytes for Key when the String data type is updated in the cache
max-key-size-in-cache: 1048576
max-key-size-in-cache : 1048576

# Maximum number of keys in the zset redis cache
# On the disk DB, a zset field may have many fields. In the memory cache, we limit the maximum
Expand Down Expand Up @@ -574,10 +593,10 @@ cache-maxmemory : 10737418240
cache-maxmemory-policy : 1

# cache-maxmemory-samples
cache-maxmemory-samples: 5
cache-maxmemory-samples : 5

# cache-lfu-decay-time
cache-lfu-decay-time: 1
cache-lfu-decay-time : 1


# is possible to manage access to Pub/Sub channels with ACL rules as well. The
Expand Down Expand Up @@ -631,12 +650,12 @@ cache-lfu-decay-time: 1
# 'internal-used-unfinished-full-sync' is used to generate a metric 'is_eligible_for_master_election'
# which serves for the scenario of codis-pika cluster reelection
# You'd better [DO NOT MODIFY IT UNLESS YOU KNOW WHAT YOU ARE DOING]
internal-used-unfinished-full-sync :
internal-used-unfinished-full-sync :

# for wash data from 4.0.0 to 4.0.1
# https://github.com/OpenAtomFoundation/pika/issues/2886
# default value: true
wash-data: true
wash-data : true

# Pika automatic compact compact strategy, a complement to rocksdb compact.
# Trigger the compact background task periodically according to `compact-interval`
Expand Down Expand Up @@ -671,3 +690,6 @@ dont-compact-sst-created-in-seconds : 20
# According to the number of sst files in rocksdb,
# compact every `compact-every-num-of-files` file.
best-delete-min-ratio : 10
# Generated by ReplicationID CONFIG REWRITE
replication-id : d605afa1b464ddf3e571966482dd934ec7336a4bac49aa0a6b
run-id : 58ec490577bb7defd64f6fe642d7609af67896b1
5 changes: 5 additions & 0 deletions include/pika_binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,15 @@ class Binlog : public pstd::noncopyable {
pstd::Status Put(const std::string& item, LogOffset *cur_logoffset,std::string& binlog);
pstd::Status IsOpened();
pstd::Status GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset, uint32_t* term = nullptr, uint64_t* logic_id = nullptr);
pstd::WritableFile* GetQueue() { return queue_.get(); }
/*
* Set Producer pro_num and pro_offset with lock
*/
pstd::Status SetProducerStatus(uint32_t pro_num, uint64_t pro_offset, uint32_t term = 0, uint64_t index = 0);

// Force sync data to disk
pstd::Status Sync();

// Need to hold Lock();
pstd::Status Truncate(uint32_t pro_num, uint64_t pro_offset, uint64_t index);

Expand Down
4 changes: 2 additions & 2 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class PikaClientConn : public net::RedisConn {
std::vector<std::shared_ptr<std::string>> resp_array;

std::shared_ptr<TimeStat> time_stat_;

void TryWriteResp();
private:
net::ServerThread* const server_thread_;
std::string current_db_;
Expand All @@ -134,7 +134,7 @@ class PikaClientConn : public net::RedisConn {
void ProcessMonitor(const PikaCmdArgsType& argv);

void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);
void TryWriteResp();
// void TryWriteResp();
};

struct ClientInfo {
Expand Down
91 changes: 91 additions & 0 deletions include/pika_command_collector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_COMMAND_COLLECTOR_H_
#define PIKA_COMMAND_COLLECTOR_H_

#include <atomic>
#include <condition_variable>
#include <deque>
#include <functional>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>
#include <chrono>
#include <optional>

#include "include/pika_command.h"
#include "include/pika_define.h"
#include "pstd/include/pstd_status.h"

#include "include/pika_consensus.h"

/**
* @brief PikaCommandCollector is used to collect write commands and process them in batches
*
* Main functions:
* 1. Collect write commands and process them in optimized batches after reaching the threshold
* 2. Handle the conflict of the same key (the later command will overwrite the earlier command)
* 3. Send commands in batches to the consensus coordinator with batch-level synchronization
* 4. Support asynchronous callback notification of command processing results
* 5. Track performance metrics for batch processing

*/
class PikaCommandCollector {
public:
// Callback function type after command processing is completed
using CommandCallback = std::function<void(const LogOffset& offset, pstd::Status status)>;

/**
* @brief constructor
* @param coordinator consensus coordinator reference
* @param batch_max_wait_time maximum wait time in milliseconds
*/
// Constructor with raw pointer (original)
PikaCommandCollector(ConsensusCoordinator* coordinator, int batch_max_wait_time = 5);

// Constructor with shared_ptr (for compatibility with make_shared calls)
PikaCommandCollector(std::shared_ptr<ConsensusCoordinator> coordinator, int batch_max_wait_time = 5);

~PikaCommandCollector();

/**
* @brief Add command to collector
* @param cmd_ptr command pointer
* @param callback callback function after processing is completed
* @return whether the addition was successful
*/
bool AddCommand(std::shared_ptr<Cmd> cmd_ptr, CommandCallback callback);

/**
* @brief Set the batch max wait time
* @param batch_max_wait_time maximum wait time in milliseconds
*/
void SetBatchMaxWaitTime(int batch_max_wait_time);

/**
* @brief Get batch processing statistics
* @return Pair of (total_processed_commands, total_batches)
*/
std::pair<uint64_t, uint64_t> GetBatchStats() const;

private:
//Consensus coordinator reference
ConsensusCoordinator* coordinator_;

// Batch processing configuration
std::atomic<int> batch_max_wait_time_;

// Batch statistics
std::atomic<uint64_t> total_processed_{0};
std::atomic<uint64_t> total_batches_{0};
};

#endif // PIKA_COMMAND_COLLECTOR_H_
99 changes: 99 additions & 0 deletions include/pika_command_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_COMMAND_QUEUE_H_
#define PIKA_COMMAND_QUEUE_H_

#include <memory>
#include <queue>
#include <vector>
#include <string>
#include <functional>
#include <atomic>
#include <condition_variable>
#include <mutex>

#include "pstd/include/pstd_mutex.h"
#include "pstd/include/env.h"
#include "include/pika_command.h"
#include "include/pika_define.h"
#include "pstd/include/env.h"
#include "include/pika_define.h"

// Callback function type for command completion notification
using CommandCallback = std::function<void(const LogOffset&, pstd::Status)>;

// Structure representing a batch of commands
struct CommandBatch {
std::vector<std::shared_ptr<Cmd>> commands;
std::vector<CommandCallback> callbacks;
uint64_t batch_id;
uint64_t create_time;
std::string db_name;
std::vector<LogOffset> binlog_offsets; // Binlog offsets for each command

CommandBatch(const std::vector<std::shared_ptr<Cmd>>& cmds,
const std::vector<CommandCallback>& cbs,
const std::string& db)
: commands(cmds), callbacks(cbs), db_name(db) {
static std::atomic<uint64_t> next_id{1};
batch_id = next_id.fetch_add(1);
create_time = pstd::NowMicros();
}

bool Empty() const {
return commands.empty();
}

size_t Size() const {
return commands.size();
}
};

// New structure to group multiple CommandBatches for RocksDB processing
struct BatchGroup {
std::vector<std::shared_ptr<CommandBatch>> batches;
LogOffset end_offset; // Only store the final offset of the last batch
BatchGroup() = default;
BatchGroup(const std::vector<std::shared_ptr<CommandBatch>>& batches,
const LogOffset& final_offset)
: batches(batches), end_offset(final_offset) {}
bool Empty() const { return batches.empty(); }
size_t BatchCount() const { return batches.size(); }
};

// Thread-safe command queue for batched command processing
class CommandQueue {
public:
explicit CommandQueue(size_t max_size);
~CommandQueue();

// Enqueue a command batch (blocking if queue is full)
bool EnqueueBatch(std::shared_ptr<CommandBatch> batch);

// Dequeue a command batch (blocking if queue is empty)
std::shared_ptr<CommandBatch> DequeueBatch();

// Dequeue all available batches (non-blocking)
std::vector<std::shared_ptr<CommandBatch>> DequeueAllBatches();

// Get current queue size
size_t Size() const;

// Check if queue is empty
bool Empty() const;

// Shutdown the queue
void Shutdown();

private:
std::queue<std::shared_ptr<CommandBatch>> cmd_queue_;
mutable std::mutex queue_mutex_;
std::condition_variable queue_cv_;
size_t max_size_;
std::atomic<bool> shutdown_{false};
};

#endif // PIKA_COMMAND_QUEUE_H_
Loading
Loading