Skip to content

Commit f55b36d

Browse files
author
wuxianrong
committed
The data backup and recovery functions have been added
1 parent 3e973c3 commit f55b36d

File tree

13 files changed

+806
-21
lines changed

13 files changed

+806
-21
lines changed

.github/workflows/pika.yml

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,7 @@ jobs:
3434
- name: Install Deps
3535
run: |
3636
sudo apt-get update
37-
sudo apt-get install -y gcc-11 g++-11 libssl-dev autoconf libprotobuf-dev protobuf-compiler clang-tidy
38-
sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-11 100
39-
sudo update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-11 100
40-
gcc --version
41-
g++ --version
37+
sudo apt-get install -y autoconf libprotobuf-dev protobuf-compiler clang-tidy
4238
4339
- name: Configure CMake
4440
# Configure CMake in a 'build' subdirectory. `CMAKE_BUILD_TYPE` is only required if you are using a single-configuration generator such as make.
@@ -163,7 +159,7 @@ jobs:
163159
- name: Install deps
164160
run: |
165161
dnf update -y
166-
dnf install -y bash cmake wget git autoconf gcc perl-Digest-SHA tcl which tar g++ tar epel-release gcc-c++ libstdc++-devel openssl-devel gcc-toolset-11
162+
dnf install -y bash cmake wget git autoconf gcc perl-Digest-SHA tcl which tar g++ tar epel-release gcc-c++ libstdc++-devel gcc-toolset-13
167163
168164
- name: Set up Go
169165
uses: actions/setup-go@v5
@@ -177,7 +173,7 @@ jobs:
177173

178174
- name: Configure CMake
179175
run: |
180-
source /opt/rh/gcc-toolset-11/enable
176+
source /opt/rh/gcc-toolset-13/enable
181177
cmake -B build -DCMAKE_BUILD_TYPE=${{ env.BUILD_TYPE }} -DUSE_PIKA_TOOLS=ON -DCMAKE_CXX_FLAGS_DEBUG=-fsanitize=address .
182178
183179
- uses: actions/cache@v3

include/pika_db.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
9494
std::shared_ptr<storage::Storage> storage() const;
9595
void GetBgSaveMetaData(std::vector<std::string>* fileNames, std::string* snapshot_uuid);
9696
void BgSaveDB();
97+
pstd::Status CreateCheckpoint(const std::string& checkpoint_dir);
98+
pstd::Status LoadDBFromCheckpoint(const std::string& checkpoint_dir);
9799
void SetBinlogIoError();
98100
void SetBinlogIoErrorrelieve();
99101
bool IsBinlogIoError();

include/pika_server.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ enum TaskType {
7171
kCompactRangeSets,
7272
kCompactRangeZSets,
7373
kCompactRangeList,
74+
kLoadDBFromCheckpoint,
75+
kCreateCheckpoint,
7476
};
7577

7678
struct TaskArg {

src/pika_db.cc

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,75 @@ void DB::BgSaveDB() {
6767
g_pika_server->BGSaveTaskSchedule(&DoBgSave, static_cast<void*>(bg_task_arg));
6868
}
6969

70+
pstd::Status DB::CreateCheckpoint(const std::string& checkpoint_dir) {
71+
std::string checkpoint_sub_path = checkpoint_dir;
72+
if (!checkpoint_sub_path.empty() && checkpoint_sub_path.back() != '/') {
73+
checkpoint_sub_path.push_back('/');
74+
}
75+
checkpoint_sub_path += db_name_;
76+
77+
if (!pstd::FileExists(checkpoint_sub_path)) {
78+
if (pstd::CreatePath(checkpoint_sub_path, 0755) != 0) {
79+
return Status::IOError("Failed to create checkpoint path", checkpoint_sub_path);
80+
}
81+
}
82+
83+
std::shared_lock guard(dbs_rw_);
84+
auto tasks = storage_->CreateCheckpoint(checkpoint_sub_path);
85+
for (auto& task : tasks) {
86+
auto status = task.get();
87+
if (!status.ok()) {
88+
return Status::Corruption("Create checkpoint failed: " + status.ToString());
89+
}
90+
}
91+
return Status::OK();
92+
}
93+
94+
pstd::Status DB::LoadDBFromCheckpoint(const std::string& checkpoint_dir) {
95+
std::string checkpoint_sub_path = checkpoint_dir;
96+
if (!checkpoint_sub_path.empty() && checkpoint_sub_path.back() != '/') {
97+
checkpoint_sub_path.push_back('/');
98+
}
99+
checkpoint_sub_path += db_name_;
100+
101+
if (!pstd::FileExists(checkpoint_sub_path)) {
102+
return Status::NotFound("Checkpoint dir does not exist: " + checkpoint_sub_path);
103+
}
104+
105+
std::lock_guard<std::shared_mutex> guard(dbs_rw_);
106+
opened_ = false;
107+
108+
auto old_storage = storage_;
109+
storage_.reset();
110+
if (old_storage) {
111+
old_storage->Close();
112+
}
113+
114+
storage_ = std::make_shared<storage::Storage>();
115+
auto checkpoint_tasks = storage_->LoadCheckpoint(checkpoint_sub_path, db_path_);
116+
for (auto& task : checkpoint_tasks) {
117+
auto status = task.get();
118+
if (!status.ok()) {
119+
storage_.reset();
120+
return Status::Corruption("Load checkpoint failed: " + status.ToString());
121+
}
122+
}
123+
124+
storage::StorageOptions storage_options = g_pika_server->storage_options();
125+
auto open_status = storage_->Open(storage_options, db_path_);
126+
if (!open_status.ok()) {
127+
storage_.reset();
128+
return Status::Corruption("Storage open failed: " + open_status.ToString());
129+
}
130+
131+
if (!g_pika_conf->raft_enabled()) {
132+
storage_->DisableWal(false);
133+
}
134+
135+
opened_ = true;
136+
return Status::OK();
137+
}
138+
70139
void DB::SetBinlogIoError() { return binlog_io_error_.store(true); }
71140
void DB::SetBinlogIoErrorrelieve() { return binlog_io_error_.store(false); }
72141
bool DB::IsBinlogIoError() { return binlog_io_error_.load(); }

src/pika_server.cc

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,37 @@ Status PikaServer::DoSameThingSpecificDB(const std::set<std::string>& dbs, const
497497
case TaskType::kCompactRangeList:
498498
db_item.second->CompactRange(storage::DataType::kLists, arg.argv[0], arg.argv[1]);
499499
break;
500+
case TaskType::kLoadDBFromCheckpoint: {
501+
// arg.argv[0] should contain checkpoint_path
502+
if (arg.argv.empty()) {
503+
LOG(ERROR) << "LoadDBFromCheckpoint requires checkpoint_path argument";
504+
return Status::InvalidArgument("Missing checkpoint_path");
505+
}
506+
std::string checkpoint_path = arg.argv[0];
507+
auto s = db_item.second->LoadDBFromCheckpoint(checkpoint_path);
508+
if (!s.ok()) {
509+
LOG(ERROR) << "Failed to load DB from checkpoint: " << s.ToString();
510+
return s;
511+
}
512+
LOG(INFO) << "Successfully loaded DB " << db_item.first << " from checkpoint: " << checkpoint_path;
513+
break;
514+
}
515+
case TaskType::kCreateCheckpoint: {
516+
// arg.argv[0] should contain checkpoint_path
517+
if (arg.argv.empty()) {
518+
LOG(ERROR) << "CreateCheckpoint requires checkpoint_path argument";
519+
return Status::InvalidArgument("Missing checkpoint_path");
520+
}
521+
std::string checkpoint_path = arg.argv[0];
522+
auto s = db_item.second->CreateCheckpoint(checkpoint_path);
523+
if (!s.ok()) {
524+
LOG(ERROR) << "Failed to create checkpoint: " << s.ToString();
525+
return s;
526+
}
527+
LOG(INFO) << "Successfully created checkpoint for DB " << db_item.first << " at: " << checkpoint_path;
528+
break;
529+
}
530+
500531
default:
501532
break;
502533
}

src/praft/include/praft/praft.h

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "braft/raft.h"
1919
#include "braft/storage.h"
2020
#include "braft/util.h"
21+
#include "braft/file_system_adaptor.h"
2122
#include "pstd/include/pstd_mutex.h"
2223
#include "pstd/include/pstd_status.h"
2324
#include "rocksdb/status.h"
@@ -31,6 +32,7 @@ namespace storage {
3132
class Storage;
3233
}
3334

35+
// Forward declarations
3436
namespace pikiwidb {
3537
class Binlog;
3638
}
@@ -74,6 +76,8 @@ class PikaStateMachine : public braft::StateMachine {
7476
PikaStateMachine();
7577
~PikaStateMachine() override = default;
7678

79+
void SetLeaderTerm(std::atomic<int64_t>* leader_term);
80+
7781
// Apply committed log entry
7882
void on_apply(braft::Iterator& iter) override;
7983

@@ -97,7 +101,8 @@ class PikaStateMachine : public braft::StateMachine {
97101
void on_stop_following(const ::braft::LeaderChangeContext& ctx) override;
98102

99103
private:
100-
104+
std::atomic<bool> is_node_first_start_up_{true}; // 标记节点是否首次启动
105+
std::atomic<int64_t>* leader_term_{nullptr};
101106
};
102107

103108
// Raft node wrapper
@@ -130,6 +135,11 @@ class PikaRaftNode {
130135
// Get cluster status information
131136
void GetStatus(std::string* status_str);
132137

138+
void GetLeaderLeaseStatus(braft::LeaderLeaseStatus* status) const;
139+
140+
// Trigger a snapshot creation
141+
pstd::Status DoSnapshot(int64_t self_snapshot_index = 0, bool is_sync = true);
142+
133143
braft::Node* GetRaftNode() { return node_.get(); }
134144

135145
private:
@@ -144,6 +154,10 @@ class PikaRaftNode {
144154
std::string raft_log_uri_;
145155
std::string raft_meta_uri_;
146156
std::string raft_snapshot_uri_;
157+
158+
// Snapshot adaptor
159+
scoped_refptr<braft::FileSystemAdaptor> snapshot_adaptor_;
160+
std::atomic<int64_t> leader_term_{-1};
147161
};
148162

149163
// Raft cluster manager
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) 2024-present, Qihoo, Inc. All rights reserved.
3+
* This source code is licensed under the BSD-style license found in the
4+
* LICENSE file in the root directory of this source tree. An additional grant
5+
* of patent rights can be found in the PATENTS file in the same directory.
6+
*/
7+
8+
#pragma once
9+
10+
#include <string>
11+
12+
#include "braft/file_system_adaptor.h"
13+
#include "braft/macros.h"
14+
#include "braft/snapshot.h"
15+
16+
#define PRAFT_SNAPSHOT_META_FILE "__raft_snapshot_meta"
17+
#define PRAFT_SNAPSHOT_PATH "snapshot/snapshot_"
18+
#define IS_RDONLY 0x01
19+
20+
// 自定义文件系统适配器,用于Braft快照生成
21+
class PPosixFileSystemAdaptor : public braft::PosixFileSystemAdaptor {
22+
public:
23+
PPosixFileSystemAdaptor() {}
24+
~PPosixFileSystemAdaptor() {}
25+
26+
braft::FileAdaptor* open(const std::string& path, int oflag, const ::google::protobuf::Message* file_meta,
27+
butil::File::Error* e) override;
28+
29+
void AddAllFiles(const std::string& dir, braft::LocalSnapshotMetaTable* snapshot_meta_memtable,
30+
const std::string& base_path);
31+
32+
private:
33+
braft::raft_mutex_t mutex_;
34+
};

0 commit comments

Comments
 (0)