Skip to content

Commit d6a50a4

Browse files
buzhimingyonghuwangshao1
authored andcommitted
feat: Add PacificA data replication consistency scheme (#2994)
* finish cmd set consistency append log * trySync * testing v1 * add annotate * add persistcontext * v1 finished * delete test code * fix bug and adjust logic * adjust code * adjust code * adjust code * add PacificA consistency test cases and fix bug * add PacificA consistency test cases and fix bug * test: Adjust PacificA consistency test * delete code --------- Co-authored-by: wangshao1 <[email protected]>
1 parent 7306a46 commit d6a50a4

25 files changed

+1335
-63
lines changed

include/pika_admin.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class SlaveofCmd : public Cmd {
3535
private:
3636
std::string master_ip_;
3737
int64_t master_port_ = -1;
38+
bool is_consistency_cmd_ = false;
3839
bool is_none_ = false;
3940
void DoInitial() override;
4041
void Clear() override {

include/pika_binlog.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class Binlog : public pstd::noncopyable {
5454
void Unlock() { mutex_.unlock(); }
5555

5656
pstd::Status Put(const std::string& item);
57+
pstd::Status Put(const std::string& item, LogOffset *cur_logoffset,std::string& binlog);
5758
pstd::Status IsOpened();
5859
pstd::Status GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset, uint32_t* term = nullptr, uint64_t* logic_id = nullptr);
5960
/*
@@ -80,7 +81,7 @@ class Binlog : public pstd::noncopyable {
8081
void Close();
8182

8283
private:
83-
pstd::Status Put(const char* item, int len);
84+
pstd::Status Put(const char* item, int len,LogOffset *cur_logoffset = nullptr, bool is_consistency = false);
8485
pstd::Status EmitPhysicalRecord(RecordType t, const char* ptr, size_t n, int* temp_pro_offset);
8586
static pstd::Status AppendPadding(pstd::WritableFile* file, uint64_t* len);
8687
void InitLogFile();

include/pika_command.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,9 @@ class CmdRes {
339339
kInvalidTransaction,
340340
kTxnQueued,
341341
kTxnAbort,
342+
kMultiKey,
343+
kNoExists,
344+
kConsistencyTimeout, // consistency time out
342345
};
343346

344347
CmdRes() = default;
@@ -433,6 +436,14 @@ class CmdRes {
433436
result.append(message_);
434437
result.append(kNewLine);
435438
break;
439+
case kMultiKey:
440+
result = "-WRONGTYPE Operation against a key holding the wrong kind of value";
441+
result.append(kNewLine);
442+
break;
443+
case kNoExists:
444+
return message_;
445+
case kConsistencyTimeout:
446+
return "-ERR consistency timeout\r\n";
436447
default:
437448
break;
438449
}

include/pika_consensus.h

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class Context : public pstd::noncopyable {
2525
void Reset(const LogOffset& offset);
2626

2727
std::shared_mutex rwlock_;
28-
LogOffset applied_index_;
28+
LogOffset applied_index_ = LogOffset();
2929
SyncWindow applied_win_;
3030

3131
std::string ToString() {
@@ -52,11 +52,28 @@ class SyncProgress {
5252
pstd::Status Update(const std::string& ip, int port, const LogOffset& start, const LogOffset& end,
5353
LogOffset* committed_index);
5454
int SlaveSize();
55+
int SlaveBinlogStateSize() {
56+
std::shared_lock l(rwlock_);
57+
return slave_binlog_state_size;
58+
}
59+
void AddSlaveBinlogStateSize() {
60+
std::lock_guard l(rwlock_);
61+
slave_binlog_state_size++;
62+
}
63+
void SubSlaveBinlogStateSize() {
64+
std::lock_guard l(rwlock_);
65+
slave_binlog_state_size--;
66+
}
67+
void AddMatchIndex(const std::string& ip, int port, const LogOffset& offset) {
68+
std::lock_guard l(rwlock_);
69+
match_index_[ip + std::to_string(port)] = offset;
70+
}
5571

5672
private:
5773
std::shared_mutex rwlock_;
5874
std::unordered_map<std::string, std::shared_ptr<SlaveNode>> slaves_;
5975
std::unordered_map<std::string, LogOffset> match_index_;
76+
int slave_binlog_state_size = 0;
6077
};
6178

6279
class MemLog {
@@ -100,6 +117,34 @@ class MemLog {
100117
LogOffset last_offset_;
101118
};
102119

120+
class Log {
121+
public:
122+
struct LogItem {
123+
LogItem(const LogOffset& _offset, std::shared_ptr<Cmd> _cmd_ptr, std::string _binlog)
124+
: offset(_offset), cmd_ptr(std::move(_cmd_ptr)), binlog_(std::move(_binlog)) {}
125+
LogOffset offset;
126+
std::shared_ptr<Cmd> cmd_ptr;
127+
std::string binlog_;
128+
};
129+
130+
Log();
131+
int Size();
132+
void AppendLog(const LogItem& item);
133+
LogOffset LastOffset();
134+
LogOffset FirstOffset();
135+
LogItem At(int index);
136+
int FindOffset(const LogOffset& send_offset);
137+
pstd::Status Truncate(const LogOffset& offset);
138+
pstd::Status TruncateFrom(const LogOffset& offset);
139+
140+
private:
141+
int FindLogIndex(const LogOffset& offset);
142+
std::shared_mutex logs_mutex_;
143+
std::vector<LogItem> logs_;
144+
LogOffset last_index_ = LogOffset();
145+
LogOffset first_index_ = LogOffset();
146+
};
147+
103148
class ConsensusCoordinator {
104149
public:
105150
ConsensusCoordinator(const std::string& db_name);
@@ -199,5 +244,51 @@ class ConsensusCoordinator {
199244
SyncProgress sync_pros_;
200245
std::shared_ptr<StableLog> stable_logger_;
201246
std::shared_ptr<MemLog> mem_logger_;
247+
248+
// pacificA
249+
public:
250+
void InitContext() { context_->Init(); }
251+
bool checkFinished(const LogOffset& offset);
252+
pstd::Status AppendEntries(const std::shared_ptr<Cmd>& cmd_ptr, LogOffset& cur_logoffset);
253+
void SetConsistency(bool is_consistency);
254+
bool GetISConsistency();
255+
pstd::Status SendBinlog(std::shared_ptr<SlaveNode> slave_ptr, std::string db_name);
256+
pstd::Status Truncate(const LogOffset& offset);
257+
pstd::Status AppendSlaveEntries(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute);
258+
pstd::Status CommitAppLog(const LogOffset& master_committed_id);
259+
pstd::Status UpdateCommittedID();
260+
pstd::Status ApplyBinlog(const std::shared_ptr<Cmd>& cmd_ptr);
261+
pstd::Status ProcessCoordination();
262+
263+
LogOffset GetCommittedId() {
264+
std::lock_guard l(committed_id_rwlock_);
265+
return committed_id_;
266+
}
267+
LogOffset GetPreparedId() {
268+
std::lock_guard l(prepared_id__rwlock_);
269+
return prepared_id_;
270+
}
271+
void SetPreparedId(const LogOffset& offset) {
272+
std::lock_guard l(prepared_id__rwlock_);
273+
prepared_id_ = offset;
274+
}
275+
void SetCommittedId(const LogOffset& offset) {
276+
std::lock_guard l(committed_id_rwlock_);
277+
committed_id_ = offset;
278+
context_->UpdateAppliedIndex(committed_id_);
279+
}
280+
281+
private:
282+
pstd::Status PersistAppendBinlog(const std::shared_ptr<Cmd>& cmd_ptr, LogOffset& cur_offset);
283+
284+
private:
285+
std::shared_mutex is_consistency_rwlock_;
286+
bool is_consistency_ = false;
287+
std::shared_mutex committed_id_rwlock_;
288+
LogOffset committed_id_ = LogOffset();
289+
std::shared_mutex prepared_id__rwlock_;
290+
LogOffset prepared_id_ = LogOffset();
291+
std::shared_ptr<Log> logs_;
202292
};
293+
203294
#endif // INCLUDE_PIKA_CONSENSUS_H_

include/pika_define.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,10 +160,11 @@ enum SlaveState {
160160
kSlaveNotSync = 0,
161161
kSlaveDbSync = 1,
162162
kSlaveBinlogSync = 2,
163+
KCandidate = 3,
163164
};
164165

165166
// debug only
166-
const std::string SlaveStateMsg[] = {"SlaveNotSync", "SlaveDbSync", "SlaveBinlogSync"};
167+
const std::string SlaveStateMsg[] = {"SlaveNotSync", "SlaveDbSync", "SlaveBinlogSync", "Candidate"};
167168

168169
enum BinlogSyncState {
169170
kNotSync = 0,
@@ -274,9 +275,12 @@ class RmNode : public Node {
274275
struct WriteTask {
275276
struct RmNode rm_node_;
276277
struct BinlogChip binlog_chip_;
278+
LogOffset committed_id_ = LogOffset();
277279
LogOffset prev_offset_;
278280
WriteTask(const RmNode& rm_node, const BinlogChip& binlog_chip, const LogOffset& prev_offset)
279281
: rm_node_(rm_node), binlog_chip_(binlog_chip), prev_offset_(prev_offset) {}
282+
WriteTask(const RmNode& rm_node, const BinlogChip& binlog_chip, const LogOffset& prev_offset, const LogOffset& committed_id)
283+
: rm_node_(rm_node), binlog_chip_(binlog_chip), prev_offset_(prev_offset), committed_id_(committed_id) {}
280284
};
281285

282286
// slowlog define

include/pika_rm.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class SyncMasterDB : public SyncDB {
7171
pstd::Status ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr);
7272
pstd::Status ConsensusProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute);
7373
LogOffset ConsensusCommittedIndex();
74+
7475
LogOffset ConsensusLastIndex();
7576

7677
std::shared_ptr<StableLog> StableLogger() { return coordinator_.StableLogger(); }
@@ -92,6 +93,27 @@ class SyncMasterDB : public SyncDB {
9293
pstd::Mutex session_mu_;
9394
int32_t session_id_ = 0;
9495
ConsensusCoordinator coordinator_;
96+
97+
//pacificA public:
98+
public:
99+
void InitContext(){
100+
coordinator_.InitContext();
101+
}
102+
bool checkFinished(const LogOffset& offset);
103+
void SetConsistency(bool is_consistenct);
104+
bool GetISConsistency();
105+
pstd::Status ProcessCoordination();
106+
void SetPreparedId(const LogOffset& offset);
107+
void SetCommittedId(const LogOffset& offset);
108+
LogOffset GetPreparedId();
109+
LogOffset GetCommittedId();
110+
pstd::Status AppendSlaveEntries(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute);
111+
pstd::Status AppendCandidateBinlog(const std::string& ip, int port, const LogOffset& offset);
112+
pstd::Status UpdateCommittedID();
113+
pstd::Status CommitAppLog(const LogOffset& master_committed_id);
114+
pstd::Status Truncate(const LogOffset& offset);
115+
116+
95117
};
96118

97119
class SyncSlaveDB : public SyncDB {
@@ -191,6 +213,8 @@ class PikaReplicaManager {
191213

192214
std::shared_mutex& GetDBLock() { return dbs_rw_; }
193215

216+
void BuildBinlogOffset(const LogOffset& offset, InnerMessage::BinlogOffset* boffset);
217+
194218
void DBLock() {
195219
dbs_rw_.lock();
196220
}

include/pika_server.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ class PikaServer : public pstd::noncopyable {
9696
std::string master_ip();
9797
int master_port();
9898
int role();
99+
int last_role();
99100
bool leader_protected_mode();
100101
void CheckLeaderProtectedMode();
101102
bool readonly(const std::string& table);
@@ -158,6 +159,10 @@ class PikaServer : public pstd::noncopyable {
158159
bool TryAddSlave(const std::string& ip, int64_t port, int fd, const std::vector<DBStruct>& table_structs);
159160
pstd::Mutex slave_mutex_; // protect slaves_;
160161
std::vector<SlaveItem> slaves_;
162+
int slave_size() {
163+
std::lock_guard l(slave_mutex_);
164+
return slaves_.size();
165+
}
161166

162167
/**
163168
* Sotsmgrt use
@@ -169,7 +174,7 @@ class PikaServer : public pstd::noncopyable {
169174
*/
170175
void SyncError();
171176
void RemoveMaster();
172-
bool SetMaster(std::string& master_ip, int master_port);
177+
bool SetMaster(std::string& master_ip, int master_port, bool is_consistency = false);
173178

174179
/*
175180
* Slave State Machine
@@ -182,6 +187,8 @@ class PikaServer : public pstd::noncopyable {
182187
void UpdateMetaSyncTimestamp();
183188
void UpdateMetaSyncTimestampWithoutLock();
184189
bool IsFirstMetaSync();
190+
bool IsConsistency();
191+
void SetConsistency(bool is_consistency);
185192
void SetFirstMetaSync(bool v);
186193

187194
/*
@@ -516,6 +523,7 @@ class PikaServer : public pstd::noncopyable {
516523
exec_stat_map.insert(std::make_pair(cmd_name, 0));
517524
}
518525
}
526+
519527
private:
520528
/*
521529
* TimingTask use
@@ -573,7 +581,9 @@ class PikaServer : public pstd::noncopyable {
573581
std::string master_ip_;
574582
int master_port_ = 0;
575583
int repl_state_ = PIKA_REPL_NO_CONNECT;
584+
bool is_consistency_ = false;
576585
int role_ = PIKA_ROLE_SINGLE;
586+
int last_role_ = PIKA_ROLE_SINGLE;
577587
int last_meta_sync_timestamp_ = 0;
578588
bool first_meta_sync_ = false;
579589
bool force_full_sync_ = false;

include/pika_slave_node.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,9 @@ class SlaveNode : public RmNode {
6767

6868
BinlogSyncState b_state{kNotSync};
6969
SyncWindow sync_win;
70-
LogOffset sent_offset;
71-
LogOffset acked_offset;
70+
LogOffset sent_offset = LogOffset();
71+
LogOffset acked_offset = LogOffset();
72+
LogOffset target_offset = LogOffset();
7273

7374
std::string ToStringStatus();
7475

src/pika_admin.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ static AuthResult AuthenticateUser(const std::string& cmdName, const std::string
9595
* slaveof no one
9696
* slaveof ip port
9797
* slaveof ip port force
98+
* slaveof ip port strong
9899
*/
99100
void SlaveofCmd::DoInitial() {
100101
if (!CheckArg(argv_.size())) {
@@ -111,6 +112,7 @@ void SlaveofCmd::DoInitial() {
111112
is_none_ = true;
112113
return;
113114
}
115+
114116
// self is master of A , want to slaveof B
115117
if ((g_pika_server->role() & PIKA_ROLE_MASTER) != 0) {
116118
res_.SetRes(CmdRes::kErrOther, "already master of others, invalid usage");
@@ -132,12 +134,15 @@ void SlaveofCmd::DoInitial() {
132134
if (argv_.size() == 4) {
133135
if (strcasecmp(argv_[3].data(), "force") == 0) {
134136
g_pika_server->SetForceFullSync(true);
137+
} else if (strcasecmp(argv_[3].data(), "strong") == 0) {
138+
is_consistency_cmd_ = true; // 设置 is_consistency 为 true
135139
} else {
136140
res_.SetRes(CmdRes::kWrongNum, kCmdNameSlaveof);
137141
}
138142
}
139143
}
140144

145+
141146
void SlaveofCmd::Do() {
142147
// Check if we are already connected to the specified master
143148
if ((master_ip_ == "127.0.0.1" || g_pika_server->master_ip() == master_ip_) &&
@@ -159,7 +164,7 @@ void SlaveofCmd::Do() {
159164
* the data synchronization was successful, but only changes the status of the
160165
* slaveof executor to slave */
161166

162-
bool sm_ret = g_pika_server->SetMaster(master_ip_, static_cast<int32_t>(master_port_));
167+
bool sm_ret = g_pika_server->SetMaster(master_ip_, static_cast<int32_t>(master_port_),is_consistency_cmd_);
163168

164169
if (sm_ret) {
165170
res_.SetRes(CmdRes::kOk);

0 commit comments

Comments
 (0)