diff --git a/src/replica/duplication/replica_duplicator_manager.cpp b/src/replica/duplication/replica_duplicator_manager.cpp index 4bdc58f320..b0a02b4b45 100644 --- a/src/replica/duplication/replica_duplicator_manager.cpp +++ b/src/replica/duplication/replica_duplicator_manager.cpp @@ -102,6 +102,7 @@ replica_duplicator_manager::get_duplication_confirms_to_update() const void replica_duplicator_manager::sync_duplication(const duplication_entry &ent) { + zauto_lock l(_lock); auto it = ent.progress.find(get_gpid().get_partition_index()); if (it == ent.progress.end()) { // Inconsistent with the meta server. @@ -109,8 +110,6 @@ void replica_duplicator_manager::sync_duplication(const duplication_entry &ent) return; } - zauto_lock l(_lock); - dupid_t dupid = ent.dupid; duplication_status::type next_status = ent.status; @@ -183,7 +182,6 @@ void replica_duplicator_manager::update_confirmed_decree_if_secondary(decree con return; } - zauto_lock l(_lock); remove_all_duplications(); if (confirmed >= 0) { // duplication ongoing // confirmed decree never decreases @@ -197,7 +195,8 @@ void replica_duplicator_manager::update_confirmed_decree_if_secondary(decree con void replica_duplicator_manager::METRIC_FUNC_NAME_SET(dup_pending_mutations)() { - int64_t total = 0; + zauto_lock l(_lock); + uint64_t total = 0; for (const auto &dup : _duplications) { total += dup.second->get_pending_mutations_count(); } @@ -227,6 +226,7 @@ replica_duplicator_manager::get_dup_states() const void replica_duplicator_manager::remove_all_duplications() { + zauto_lock l(_lock); // fast path if (_duplications.empty()) { return; diff --git a/src/task/task.h b/src/task/task.h index 49a5ba05dc..22c16c8036 100644 --- a/src/task/task.h +++ b/src/task/task.h @@ -48,6 +48,7 @@ #include "utils/fmt_logging.h" #include "utils/join_point.h" #include "utils/ports.h" +#include "utils/synchronize.h" #include "utils/utils.h" namespace dsn { @@ -329,6 +330,11 @@ class raw_task : public task : task(code, hash, node), _cb(std::move(cb)) { } + ~raw_task() override + { + ::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_lock); + _cb = nullptr; + } void exec() override { @@ -338,10 +344,13 @@ class raw_task : public task } protected: - void clear_non_trivial_on_task_end() override { _cb = nullptr; } - -protected: + void clear_non_trivial_on_task_end() override + { + ::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_lock); + _cb = nullptr; + } task_handler _cb; + ::dsn::utils::ex_lock_nr _lock; }; //----------------- timer task -------------------------------------------------------