2222#define dout_context g_ceph_context
2323#define dout_subsys ceph_subsys_mds_quiesce
2424#undef dout_prefix
25- #define dout_prefix *_dout << " quiesce.mgr <" << __func__ << " > "
25+ #define dout_prefix *_dout << " quiesce.mgr. " << membership.me << " <" << __func__ << " > "
2626
2727#undef dout
2828#define dout (lvl ) \
@@ -54,35 +54,34 @@ static QuiesceTimeInterval time_distance(QuiesceTimePoint lhs, QuiesceTimePoint
5454
5555bool QuiesceDbManager::db_thread_has_work () const
5656{
57- return false
57+ return db_thread_should_exit
5858 || pending_acks.size () > 0
5959 || pending_requests.size () > 0
6060 || pending_db_updates.size () > 0
6161 || (agent_callback.has_value () && agent_callback->if_newer < db_version ())
62- || (! cluster_membership.has_value () || cluster_membership->epoch != membership.epoch );
62+ || (cluster_membership.has_value () && cluster_membership->epoch != membership.epoch );
6363}
6464
6565void * QuiesceDbManager::quiesce_db_thread_main ()
6666{
67- db_thread_enter ();
68-
6967 std::unique_lock ls (submit_mutex);
7068 QuiesceTimeInterval next_event_at_age = QuiesceTimeInterval::max ();
7169 QuiesceDbVersion last_acked = {0 , 0 };
7270
73- while (true ) {
71+ dout (5 ) << " Entering the main thread" << dendl;
72+ bool keep_working = true ;
73+ while (keep_working) {
7474
7575 auto db_age = db.get_age ();
7676
7777 if (!db_thread_has_work () && next_event_at_age > db_age) {
7878 submit_condition.wait_for (ls, next_event_at_age - db_age);
7979 }
8080
81- if (!membership_upkeep ()) {
82- break ;
83- }
81+ auto [is_member, should_exit] = membership_upkeep ();
82+ keep_working = !should_exit;
8483
85- {
84+ if (is_member) {
8685 decltype (pending_acks) acks (std::move (pending_acks));
8786 decltype (pending_requests) requests (std::move (pending_requests));
8887 decltype (pending_db_updates) db_updates (std::move (pending_db_updates));
@@ -105,6 +104,10 @@ void* QuiesceDbManager::quiesce_db_thread_main()
105104 } else {
106105 next_event_at_age = replica_upkeep (std::move (db_updates));
107106 }
107+ } else {
108+ ls.unlock ();
109+ dout (15 ) << " not a cluster member, keeping idle " << dendl;
110+ next_event_at_age = QuiesceTimeInterval::max ();
108111 }
109112
110113 complete_requests ();
@@ -131,7 +134,7 @@ void* QuiesceDbManager::quiesce_db_thread_main()
131134 }
132135 }
133136
134- if (send_ack) {
137+ if (is_member && send_ack) {
135138 auto db_version = quiesce_map.db_version ;
136139 dout (20 ) << " synchronous agent ack: " << quiesce_map << dendl;
137140 auto rc = membership.send_ack (std::move (quiesce_map));
@@ -148,7 +151,7 @@ void* QuiesceDbManager::quiesce_db_thread_main()
148151
149152 ls.unlock ();
150153
151- db_thread_exit () ;
154+ dout ( 5 ) << " Exiting the main thread " << dendl ;
152155
153156 return 0 ;
154157}
@@ -160,43 +163,36 @@ void QuiesceDbManager::update_membership(const QuiesceClusterMembership& new_mem
160163 bool will_participate = new_membership.members .contains (new_membership.me );
161164 dout (20 ) << " will participate: " << std::boolalpha << will_participate << std::noboolalpha << dendl;
162165
163- if (cluster_membership && !will_participate) {
164- // stop the thread
165- cluster_membership.reset ();
166+ if (will_participate && !quiesce_db_thread.is_started ()) {
167+ // start the thread
168+ dout (5 ) << " starting the db mgr thread at epoch: " << new_membership.epoch << dendl;
169+ db_thread_should_exit = false ;
170+ quiesce_db_thread.create (" quiesce_db_mgr" );
171+ } else {
166172 submit_condition.notify_all ();
167- lock.unlock ();
168- ceph_assert (quiesce_db_thread.is_started ());
169- dout (5 ) << " stopping the db mgr thread at epoch: " << new_membership.epoch << dendl;
170- quiesce_db_thread.join ();
171- } else if (will_participate) {
172- if (!cluster_membership) {
173- // start the thread
174- dout (5 ) << " starting the db mgr thread at epoch: " << new_membership.epoch << dendl;
175- quiesce_db_thread.create (" quiesce_db_mgr" );
176- } else {
177- submit_condition.notify_all ();
178- }
179- if (inject_request) {
180- pending_requests.push_front (inject_request);
181- }
173+ }
174+
175+ if (inject_request) {
176+ pending_requests.push_front (inject_request);
177+ }
178+
179+ if (will_participate) {
182180 cluster_membership = new_membership;
183-
184- std::lock_guard lc (agent_mutex);
185- if (agent_callback) {
186- agent_callback->if_newer = {0 , 0 };
187- }
181+ } else {
182+ cluster_membership.reset ();
188183 }
189184
190- if (!will_participate && inject_request) {
191- inject_request->complete (-EPERM);
185+ std::lock_guard lc (agent_mutex);
186+ if (agent_callback) {
187+ agent_callback->if_newer = {0 , 0 };
192188 }
193189}
194190
195- bool QuiesceDbManager::membership_upkeep ()
191+ std::pair<QuiesceDbManager::IsMemberBool, QuiesceDbManager::ShouldExitBool> QuiesceDbManager::membership_upkeep ()
196192{
197193 if (cluster_membership && cluster_membership->epoch == membership.epoch ) {
198194 // no changes
199- return true ;
195+ return { true , db_thread_should_exit} ;
200196 }
201197
202198 bool was_leader = membership.epoch > 0 && membership.leader == membership.me ;
@@ -206,7 +202,7 @@ bool QuiesceDbManager::membership_upkeep()
206202 << std::boolalpha << was_leader << " ->" << is_leader << std::noboolalpha
207203 << " members:" << cluster_membership->members << dendl;
208204 } else {
209- dout (10 ) << " shutdown ! was_leader: " << was_leader << dendl;
205+ dout (10 ) << " not a member ! was_leader: " << was_leader << dendl;
210206 }
211207
212208 if (is_leader) {
@@ -238,18 +234,24 @@ bool QuiesceDbManager::membership_upkeep()
238234 done_requests[await_ctx.req_ctx ] = EINPROGRESS;
239235 }
240236 awaits.clear ();
241- // reject pending requests
237+ // reject pending requests as not leader
242238 while (!pending_requests.empty ()) {
243- done_requests[pending_requests.front ()] = EPERM ;
239+ done_requests[pending_requests.front ()] = ENOTTY ;
244240 pending_requests.pop_front ();
245241 }
246242 }
247243
248244 if (cluster_membership) {
249245 membership = *cluster_membership;
246+ dout (15 ) << " Updated membership" << dendl;
247+ } else {
248+ membership.epoch = 0 ;
249+ peers.clear ();
250+ awaits.clear ();
251+ db.clear ();
250252 }
251253
252- return cluster_membership.has_value ();
254+ return { cluster_membership.has_value (), db_thread_should_exit } ;
253255}
254256
255257QuiesceTimeInterval QuiesceDbManager::replica_upkeep (decltype (pending_db_updates)&& db_updates)
@@ -291,7 +293,7 @@ QuiesceTimeInterval QuiesceDbManager::replica_upkeep(decltype(pending_db_updates
291293 if (db.set_version > update.db_version .set_version ) {
292294 dout (3 ) << " got an older version of DB from the leader: " << db.set_version << " > " << update.db_version .set_version << dendl;
293295 dout (3 ) << " discarding the DB" << dendl;
294- db.reset ();
296+ db.clear ();
295297 } else {
296298 for (auto & [qs_id, qs] : update.sets ) {
297299 db.sets .insert_or_assign (qs_id, std::move (qs));
0 commit comments