66#define PIKA_CONSENSUS_H_
77
88#include < utility>
9+ #include < condition_variable>
10+ #include < unordered_map>
911
1012#include " include/pika_binlog_transverter.h"
1113#include " include/pika_client_conn.h"
@@ -170,7 +172,7 @@ class ConsensusCoordinator {
170172
171173 SyncProgress& SyncPros () { return sync_pros_; }
172174 std::shared_ptr<StableLog> StableLogger () { return stable_logger_; }
173- std::shared_ptr<MemLog> MemLogger () { return mem_logger_; }
175+ std::shared_ptr<MemLog> MemLogger () { return mem_logger_; } void SetStableLogger (std::shared_ptr<StableLog> logger) { stable_logger_ = logger; }
174176
175177 LogOffset committed_index () {
176178 std::lock_guard lock (index_mu_);
@@ -186,28 +188,9 @@ class ConsensusCoordinator {
186188 };
187189 static int InitCmd (net::RedisParser* parser, const net::RedisCmdArgsType& argv);
188190
189- std::string ToStringStatus () {
190- std::stringstream tmp_stream;
191- {
192- std::lock_guard lock (index_mu_);
193- tmp_stream << " Committed_index: " << committed_index_.ToString () << " \r\n " ;
194- }
195- tmp_stream << " Context: "
196- << " \r\n "
197- << context_->ToString ();
198- {
199- std::shared_lock lock (term_rwlock_);
200- tmp_stream << " Term: " << term_ << " \r\n " ;
201- }
202- tmp_stream << " Mem_logger size: " << mem_logger_->Size () << " last offset "
203- << mem_logger_->last_offset ().ToString () << " \r\n " ;
204- tmp_stream << " Stable_logger first offset " << stable_logger_->first_offset ().ToString () << " \r\n " ;
205- LogOffset log_status;
206- stable_logger_->Logger ()->GetProducerStatus (&(log_status.b_offset .filenum ), &(log_status.b_offset .offset ),
207- &(log_status.l_offset .term ), &(log_status.l_offset .index ));
208- tmp_stream << " Physical Binlog Status: " << log_status.ToString () << " \r\n " ;
209- return tmp_stream.str ();
210- }
191+ std::string ToStringStatus ();
192+ void NotifyLogCommitted (const LogOffset& offset);
193+ bool WaitLogCommitted (const LogOffset& offset, std::chrono::milliseconds timeout);
211194
212195 private:
213196 pstd::Status TruncateTo (const LogOffset& offset);
@@ -274,21 +257,28 @@ class ConsensusCoordinator {
274257 }
275258 void SetCommittedId (const LogOffset& offset) {
276259 std::lock_guard l (committed_id_rwlock_);
277- committed_id_ = offset;
278- context_->UpdateAppliedIndex (committed_id_);
260+ if (offset > committed_id_) {
261+ committed_id_ = offset;
262+ context_->UpdateAppliedIndex (committed_id_);
263+ // 通知所有等待的线程
264+ log_commit_cv_.notify_all ();
265+ }
279266 }
280267
281268 private:
282269 pstd::Status PersistAppendBinlog (const std::shared_ptr<Cmd>& cmd_ptr, LogOffset& cur_offset);
283270
284- private:
271+ std::shared_ptr<Log> logs_;
285272 std::shared_mutex is_consistency_rwlock_;
286273 bool is_consistency_ = false ;
287274 std::shared_mutex committed_id_rwlock_;
288275 LogOffset committed_id_ = LogOffset();
289276 std::shared_mutex prepared_id__rwlock_;
290277 LogOffset prepared_id_ = LogOffset();
291- std::shared_ptr<Log> logs_;
278+ // 条件变量,用于通知日志已提交
279+ std::condition_variable_any log_commit_cv_;
280+ // 用于跟踪等待特定日志提交的线程
281+ std::mutex waiting_threads_mu_;
292282};
293283
294284#endif // INCLUDE_PIKA_CONSENSUS_H_
0 commit comments