Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ write-binlog : yes
# Supported Units [K|M|G], binlog-file-size default unit is in [bytes] and the default value is 100M.
binlog-file-size : 104857600

# The interval (in number of logs) for forcing a disk flush of the binlog.
# A value of 1 means fsync for every log. A higher value improves performance at the cost of durability.
# Default: 100
binlog-fsync-interval : 1

# Automatically triggers a small compaction according to statistics
# Use the cache to store up to 'max-cache-statistic-keys' keys
# If 'max-cache-statistic-keys' set to '0', that means turn off the statistics function
Expand Down
2 changes: 2 additions & 0 deletions include/pika_binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class Binlog : public pstd::noncopyable {
* Set Producer pro_num and pro_offset with lock
*/
pstd::Status SetProducerStatus(uint32_t pro_num, uint64_t pro_offset, uint32_t term = 0, uint64_t index = 0);
// Force sync data to disk
pstd::Status Sync();
// Need to hold Lock();
pstd::Status Truncate(uint32_t pro_num, uint64_t pro_offset, uint64_t index);

Expand Down
9 changes: 9 additions & 0 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <bitset>
#include <utility>
#include <future>

#include "acl.h"
#include "include/pika_command.h"
Expand Down Expand Up @@ -52,6 +53,13 @@ class PikaClientConn : public net::RedisConn {
bool cache_miss_in_rtc_;
};

struct ParallelTask {
std::vector<std::shared_ptr<std::string>> resps;
std::vector<std::future<void>> futures;
std::atomic<size_t> completed_count{0};
size_t total_tasks{0};
};

struct TxnStateBitMask {
public:
static constexpr uint8_t Start = 0;
Expand All @@ -72,6 +80,7 @@ class PikaClientConn : public net::RedisConn {
void BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs, bool cache_miss_in_rtc);
int DealMessage(const net::RedisCmdArgsType& argv, std::string* response) override { return 0; }
static void DoBackgroundTask(void* arg);
static void ParallelExecRedisCmd(void* arg);

bool IsPubSub() { return is_pubsub_; }
void SetIsPubSub(bool is_pubsub) { is_pubsub_ = is_pubsub; }
Expand Down
5 changes: 5 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,8 @@ class PikaConf : public pstd::BaseConf {
rsync_timeout_ms_.store(value);
}

int binlog_fsync_interval() const;

void SetProtoMaxBulkLen(const int64_t value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("proto-max-bulk-len", std::to_string(value));
Expand Down Expand Up @@ -1065,6 +1067,9 @@ class PikaConf : public pstd::BaseConf {

//Internal used metrics Persisted by pika.conf
std::unordered_set<std::string> internal_used_unfinished_full_sync_;

// Binlog fsync interval
int binlog_fsync_interval_;
};

#endif
8 changes: 7 additions & 1 deletion include/pika_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ class ConsensusCoordinator {
pstd::Status AppendSlaveEntries(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute);
pstd::Status CommitAppLog(const LogOffset& master_committed_id);
pstd::Status UpdateCommittedID();
pstd::Status ApplyBinlog(const std::shared_ptr<Cmd>& cmd_ptr);
pstd::Status ApplyBinlog(const std::vector<Log::LogItem>& logs);
pstd::Status ProcessCoordination();

LogOffset GetCommittedId() {
Expand All @@ -276,7 +276,10 @@ class ConsensusCoordinator {
std::lock_guard l(committed_id_rwlock_);
committed_id_ = offset;
context_->UpdateAppliedIndex(committed_id_);
committed_id_cv_.notify_all();
}
pstd::Mutex* GetCommittedIdMu() { return &committed_id_mu_; }
pstd::CondVar* GetCommittedIdCv() { return &committed_id_cv_; }

private:
pstd::Status PersistAppendBinlog(const std::shared_ptr<Cmd>& cmd_ptr, LogOffset& cur_offset);
Expand All @@ -285,10 +288,13 @@ class ConsensusCoordinator {
std::shared_mutex is_consistency_rwlock_;
bool is_consistency_ = false;
std::shared_mutex committed_id_rwlock_;
pstd::Mutex committed_id_mu_;
pstd::CondVar committed_id_cv_;
LogOffset committed_id_ = LogOffset();
std::shared_mutex prepared_id__rwlock_;
LogOffset prepared_id_ = LogOffset();
std::shared_ptr<Log> logs_;
int binlog_fsync_counter_ = 0;
};

#endif // INCLUDE_PIKA_CONSENSUS_H_
1 change: 1 addition & 0 deletions include/pika_repl_bgworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class PikaReplBgWorker {
net::RedisParser redis_parser_;
std::string ip_port_;
std::string db_name_;
int binlog_fsync_counter_ = 0;

private:
net::BGThread bg_thread_;
Expand Down
20 changes: 14 additions & 6 deletions include/pika_repl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,22 @@ class PikaReplClient {
async_write_db_task_counts_[db_index].fetch_sub(incr_step, std::memory_order::memory_order_seq_cst);
}

int32_t GetUnfinishedAsyncWriteDBTaskCount(const std::string& db_name) {
int32_t db_index = db_name.back() - '0';
assert(db_index >= 0 && db_index <= 7);
return async_write_db_task_counts_[db_index].load(std::memory_order_seq_cst);
}
int32_t GetUnfinishedAsyncWriteDBTaskCount(const std::string& db_name);
void SignalAsyncWriteDBTaskEnd(const std::string& db_name);
void WaitForAsyncWriteDBTaskEnd(const std::string& db_name);

// unfinished_async_write_db_tasks related
pstd::Mutex unfinished_async_write_db_tasks_mu_;
std::unordered_map<std::string, int32_t> unfinished_async_write_db_tasks_;
pstd::CondVar async_write_db_tasks_cond_;

// db_write_block_fds_ related
pstd::Mutex db_write_block_fds_mu_;
std::set<int> db_write_block_fds_;

private:
size_t GetBinlogWorkerIndexByDBName(const std::string &db_name);
size_t GetBinlogWorkerIndexByDBName(const std::string& db_name);
size_t GetDBWorkerIndexByDBName(const std::string& db_name);
size_t GetHashIndexByKey(const std::string& key);
void UpdateNextAvail() { next_avail_ = (next_avail_ + 1) % static_cast<int32_t>(write_binlog_workers_.size()); }

Expand Down
11 changes: 10 additions & 1 deletion include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ class PikaReplicaManager {
const std::shared_ptr<InnerMessage::InnerResponse>& res,
const std::shared_ptr<net::PbConn>& conn, void* res_private_data);
void ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const std::string& db_name);
void SignalAsyncWriteDBTaskEnd(const std::string& db_name);
void WaitForAsyncWriteDBTaskEnd(const std::string& db_name);
void ScheduleReplClientBGTaskByDBName(net::TaskFunc , void* arg, const std::string &db_name);
void ReplServerRemoveClientConn(int fd);
void ReplServerUpdateClientConnMap(const std::string& ip_port, int fd);
Expand Down Expand Up @@ -243,8 +245,15 @@ class PikaReplicaManager {

pstd::Mutex write_queue_mu_;

// db_name -> a queue of write task
using DBWriteTaskQueue = std::map<std::string, std::queue<WriteTask>>;
// ip:port -> a map of DBWriteTaskQueue
using SlaveWriteTaskQueue = std::map<std::string, DBWriteTaskQueue>;

// every host owns a queue, the key is "ip + port"
std::unordered_map<std::string, std::unordered_map<std::string, std::queue<WriteTask>>> write_queues_;
SlaveWriteTaskQueue write_queues_;

// client for replica
std::unique_ptr<PikaReplClient> pika_repl_client_;
std::unique_ptr<PikaReplServer> pika_repl_server_;
};
Expand Down
2 changes: 1 addition & 1 deletion src/net/src/pb_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ WriteStatus PbConn::SendReply() {
if (item_len - write_buf_.item_pos_ != 0) {
return kWriteHalf;
}
LOG(ERROR) << "write item success";
//LOG(ERROR) << "write item success";
}
return kWriteAll;
}
Expand Down
7 changes: 7 additions & 0 deletions src/pika_binlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -469,3 +469,10 @@ Status Binlog::Truncate(uint32_t pro_num, uint64_t pro_offset, uint64_t index) {

return Status::OK();
}

Status Binlog::Sync() {
if (queue_) {
return queue_->Sync();
}
return Status::Corruption("Logger not initialized");
}
67 changes: 60 additions & 7 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "net/src/dispatch_thread.h"
#include "net/src/worker_thread.h"
#include "src/pstd/include/scope_record_lock.h"
#include <future>

#include "rocksdb/perf_context.h"
#include "rocksdb/iostats_context.h"
Expand Down Expand Up @@ -55,6 +56,8 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
c_ptr->SetConn(shared_from_this());
c_ptr->SetResp(resp_ptr);

LOG(INFO) << "PikaClientConn::DoCmd command: " << c_ptr->name() << ", keys: " << c_ptr->current_key().size();

// Check authed
if (AuthRequired()) { // the user is not authed, need to do auth
if (!(c_ptr->flag() & kCmdFlagsNoAuth)) {
Expand Down Expand Up @@ -357,14 +360,64 @@ void PikaClientConn::DoBackgroundTask(void* arg) {
}

void PikaClientConn::BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs, bool cache_miss_in_rtc) {
resp_num.store(static_cast<int32_t>(argvs.size()));
for (const auto& argv : argvs) {
std::shared_ptr<std::string> resp_ptr = std::make_shared<std::string>();
resp_array.push_back(resp_ptr);
ExecRedisCmd(argv, resp_ptr, cache_miss_in_rtc);
if (argvs.empty()) {
return;
}
time_stat_->process_done_ts_ = pstd::NowMicros();
TryWriteResp();
if (argvs.size() > 1) {
auto task = std::make_shared<ParallelTask>();
task->total_tasks = argvs.size();
task->resps.resize(argvs.size());

for (size_t i = 0; i < argvs.size(); ++i) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

你目前压测的方式,这里的argvs都是一个,只有客户端使用pipeline方式的时候才会大于1.
另外,如果是pipeline的方式压测,你这里的逻辑是把这些命令执行又拆分到同一个线程池,线程池都占满的时候,会不会导致hang住?

task->resps[i] = std::make_shared<std::string>();
std::promise<void> promise;
task->futures.push_back(promise.get_future());

g_pika_server->ScheduleClientPool(&PikaClientConn::ParallelExecRedisCmd, new std::tuple(shared_from_this(), argvs[i], task, i, std::move(promise), cache_miss_in_rtc), false, false);
}

for (auto& f : task->futures) {
f.get();
}

for (const auto& resp : task->resps) {
WriteResp(*resp);
}
if (write_completed_cb_) {
write_completed_cb_();
write_completed_cb_ = nullptr;
}
NotifyEpoll(true);
} else {
resp_num.store(static_cast<int32_t>(argvs.size()));
for (const auto& argv : argvs) {
std::shared_ptr<std::string> resp_ptr = std::make_shared<std::string>();
resp_array.push_back(resp_ptr);
ExecRedisCmd(argv, resp_ptr, cache_miss_in_rtc);
}
time_stat_->process_done_ts_ = pstd::NowMicros();
TryWriteResp();
}
}

void PikaClientConn::ParallelExecRedisCmd(void* arg) {
auto* task_args = static_cast<std::tuple<std::shared_ptr<PikaClientConn>, net::RedisCmdArgsType, std::shared_ptr<ParallelTask>, size_t, std::promise<void>, bool>*>(arg);
auto [conn, argv, task, index, promise, cache_miss_in_rtc] = std::move(*task_args);
delete task_args;

std::string opt = argv[0];
pstd::StringToLower(opt);
if (opt == kClusterPrefix) {
if (argv.size() >= 2) {
opt += argv[1];
pstd::StringToLower(opt);
}
}

std::shared_ptr<Cmd> cmd_ptr = conn->DoCmd(argv, opt, task->resps[index], cache_miss_in_rtc);
*(task->resps[index]) = std::move(cmd_ptr->res().message());

promise.set_value();
}

bool PikaClientConn::ReadCmdInCache(const net::RedisCmdArgsType& argv, const std::string& opt) {
Expand Down
6 changes: 6 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,10 @@ int PikaConf::Load() {
if (binlog_file_size_ < 1024 || static_cast<int64_t>(binlog_file_size_) > (1024LL * 1024 * 1024)) {
binlog_file_size_ = 100 * 1024 * 1024; // 100M
}
GetConfInt("binlog-fsync-interval", &binlog_fsync_interval_);
if (binlog_fsync_interval_ < 0) {
binlog_fsync_interval_ = 0;
}
GetConfStr("pidfile", &pidfile_);

// db sync
Expand Down Expand Up @@ -912,3 +916,5 @@ std::vector<rocksdb::CompressionType> PikaConf::compression_per_level() {
}
return types;
}

int PikaConf::binlog_fsync_interval() const { return binlog_fsync_interval_; }
Loading