Skip to content

Commit 2a3faf1

Browse files
committed
mds/quiesce: agent: avoid a race condition with rapid db updates
When new roots begin processing but don't yet make it into the currently tracked set, there is a window for the next update with the same roots to treat them as new. We fix it by simplifying the agent model, getting rid of the intermediate `working` set. Since we never remove or add items into the current roots collection, it's safe to update the current set directly from the pending set. The race was due to the fact that `db_update()` relied on the `current` to deduce new roots into `pending`, while the same new root could have already been seen and posted into the `working` set. This would lead to submitting the same new root twice. Without the `working` set such race isn't possible. Fixes: https://tracker.ceph.com/issues/65545 Signed-off-by: Leonid Usov <[email protected]>
1 parent 7714874 commit 2a3faf1

File tree

4 files changed

+136
-85
lines changed

4 files changed

+136
-85
lines changed

src/mds/MDSRankQuiesce.cc

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -518,16 +518,9 @@ void MDSRank::quiesce_agent_setup() {
518518

519519
if (!inserted) {
520520
dout(3) << "duplicate quiesce request for root '" << it->first << "'" << dendl;
521-
// we must update the request id so that old one can't cancel this request.
522-
it->second.first = req_id;
523-
if (it->second.second) {
524-
it->second.second->complete(-EINTR);
525-
it->second.second = c;
526-
} else {
527-
// if we have no context, it means we've completed it
528-
// since we weren't inserted, we must have successfully quiesced
529-
c->complete(0);
530-
}
521+
// report error for the duplicate request, just as MDCache would do
522+
c->complete(-EINPROGRESS);
523+
return std::nullopt;
531524
} else if (debug_rank && (debug_rank != whoami)) {
532525
// the root was pinned to a different rank
533526
// we should acknowledge the quiesce regardless of the other flags

src/mds/QuiesceAgent.cc

Lines changed: 34 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -87,36 +87,38 @@ bool QuiesceAgent::db_update(QuiesceMap& map)
8787
}
8888

8989
void* QuiesceAgent::agent_thread_main() {
90-
working.clear();
91-
std::unique_lock lock(agent_mutex);
90+
std::unique_lock agent_lock(agent_mutex);
91+
92+
while (!stop_agent_thread) {
93+
TrackedRootsVersion old;
9294

93-
while(!stop_agent_thread) {
9495
if (pending.armed) {
95-
working.roots.swap(pending.roots);
96-
working.db_version = pending.db_version;
97-
} else {
98-
// copy current roots
99-
working.roots = current.roots;
100-
working.db_version = current.db_version;
96+
std::swap(old, current);
97+
current.roots.swap(pending.roots);
98+
current.db_version = pending.db_version;
10199
}
102100

103101
dout(20)
104-
<< "current = " << current.db_version
105-
<< ", working = " << working.db_version
106-
<< ", pending = " << pending.db_version << dendl;
107-
108-
current.armed = false;
109-
working.armed = true;
102+
<< "old = " << old.db_version
103+
<< ", current = " << current.db_version
104+
<< dendl;
110105

111-
// it's safe to clear the pending roots under lock because it shouldn't
106+
// it's safe to clear the pending roots under agent_lock because it shouldn't
112107
// ever hold a last shared ptr to quiesced tracked roots, causing their destructors to run cancel.
113108
pending.clear();
114-
lock.unlock();
109+
current.armed = true;
110+
upkeep_needed = false;
111+
112+
// for somebody waiting for the internal state to progress
113+
agent_cond.notify_all();
114+
agent_lock.unlock();
115+
116+
_agent_thread_will_work();
115117

116-
QuiesceMap ack(working.db_version);
118+
QuiesceMap ack(current.db_version);
117119

118120
// upkeep what we believe is the current state.
119-
for (auto& [root, info] : working.roots) {
121+
for (auto& [root, info] : current.roots) {
120122

121123
info->lock();
122124
bool should_quiesce = info->should_quiesce();
@@ -141,7 +143,7 @@ void* QuiesceAgent::agent_thread_main() {
141143
info->unlock();
142144

143145
// TODO: capturing QuiesceAgent& `this` is potentially dangerous
144-
// the assumption is that since the root pointer is weak
146+
// the assumption is that since the tracked root pointer is weak
145147
// it will have been deleted by the QuiesceAgent shutdown sequence
146148
set_upkeep_needed();
147149
}
@@ -165,37 +167,30 @@ void* QuiesceAgent::agent_thread_main() {
165167
}
166168
}
167169

168-
lock.lock();
170+
_agent_thread_did_work();
169171

170-
bool new_version = current.db_version < working.db_version;
171-
current.roots.swap(working.roots);
172-
current.db_version = working.db_version;
173-
174-
lock.unlock();
175-
176-
// clear the old roots and send the ack outside of the lock
177-
working.roots.clear();
172+
// send the ack and clear the old roots outside of the lock
173+
bool new_version = current.db_version != old.db_version;
178174
if (new_version || !ack.roots.empty()) {
179175
dout(20) << "asyncrhonous ack for " << (new_version ? "a new" : "the current") << " version: " << ack << dendl;
180176
int rc = quiesce_control.agent_ack(std::move(ack));
181177
if (rc != 0) {
182178
dout(3) << "got error: " << rc << " trying to send " << ack << dendl;
183179
}
184180
}
181+
old.clear();
185182
ack.clear();
186183

187-
lock.lock();
188-
189-
// notify that we're done working on this version and all acks (if any) were sent
190-
working.clear();
184+
agent_lock.lock();
191185

186+
current.armed = false;
192187
// a new pending version could be set while we weren't locked
193188
// if that's the case just go for another pass
194189
// otherwise, wait for updates
195-
if (!pending.armed && !current.armed && !stop_agent_thread) {
190+
while (!pending.armed && !current.armed && !upkeep_needed && !stop_agent_thread) {
196191
// for somebody waiting for the thread to idle
197192
agent_cond.notify_all();
198-
agent_cond.wait(lock);
193+
agent_cond.wait(agent_lock);
199194
}
200195
}
201196
agent_cond.notify_all();
@@ -206,13 +201,11 @@ void QuiesceAgent::set_pending_roots(QuiesceDbVersion version, TrackedRoots&& ne
206201
{
207202
std::unique_lock l(agent_mutex);
208203

209-
auto actual_version = std::max(current.db_version, working.db_version);
210-
bool rollback = actual_version > version;
211-
204+
bool rollback = current.db_version > version;
205+
212206
if (rollback) {
213207
dout(5) << "version rollback to " << version
214-
<< ". current = " << current.db_version
215-
<< ", working = " << working.db_version
208+
<< ". current = " << current.db_version
216209
<< ", pending = " << pending.db_version << dendl;
217210
}
218211

@@ -230,10 +223,9 @@ void QuiesceAgent::set_upkeep_needed()
230223

231224
dout(20)
232225
<< "current = " << current.db_version
233-
<< ", working = " << working.db_version
234226
<< ", pending = " << pending.db_version << dendl;
235227

236-
current.armed = true;
228+
upkeep_needed = true;
237229
agent_cond.notify_all();
238230
}
239231

src/mds/QuiesceAgent.h

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class QuiesceAgent {
3333
agent_thread.create("quiesce.agt");
3434
};
3535

36-
~QuiesceAgent() {
36+
virtual ~QuiesceAgent() {
3737
shutdown();
3838
}
3939

@@ -213,20 +213,28 @@ class QuiesceAgent {
213213
operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceAgent::TrackedRootsVersion& tr);
214214

215215
TrackedRootsVersion current;
216-
TrackedRootsVersion working;
217216
TrackedRootsVersion pending;
218217

219218
std::mutex agent_mutex;
220219
std::condition_variable agent_cond;
221220
bool stop_agent_thread;
221+
bool upkeep_needed;
222222

223223
template<class L>
224224
QuiesceDbVersion await_idle_locked(L &lock) {
225-
agent_cond.wait(lock, [this] {
226-
return !(current.armed || working.armed || pending.armed);
225+
return await_phase_locked(lock, false, false);
226+
}
227+
228+
template <class L>
229+
QuiesceDbVersion await_phase_locked(L& lock, bool pending_armed, bool current_armed)
230+
{
231+
agent_cond.wait(lock, [=, this] {
232+
return ( !upkeep_needed
233+
&& current.armed == current_armed
234+
&& pending.armed == pending_armed);
227235
});
228236

229-
return current.db_version;
237+
return std::max(current.db_version, pending.db_version);
230238
}
231239

232240
void set_pending_roots(QuiesceDbVersion db_version, TrackedRoots&& new_roots);
@@ -249,4 +257,8 @@ class QuiesceAgent {
249257
} agent_thread;
250258

251259
void* agent_thread_main();
260+
261+
virtual void _agent_thread_will_work() { }
262+
virtual void _agent_thread_did_work() { }
263+
252264
};

src/test/mds/TestQuiesceAgent.cc

Lines changed: 82 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class QuiesceAgentTest : public testing::Test {
5050
QuiesceDbVersion get_latest_version()
5151
{
5252
std::lock_guard l(agent_mutex);
53-
return std::max({current.db_version, working.db_version, pending.db_version});
53+
return std::max(current.db_version, pending.db_version);
5454
}
5555
TrackedRoots& mutable_tracked_roots() {
5656
return current.roots;
@@ -60,6 +60,16 @@ class QuiesceAgentTest : public testing::Test {
6060
std::unique_lock l(agent_mutex);
6161
return await_idle_locked(l);
6262
}
63+
64+
using TRV = TrackedRootsVersion;
65+
std::optional<std::function<void(TRV& pending, TRV& current)>> before_work;
66+
67+
void _agent_thread_will_work() {
68+
auto f = before_work;
69+
if (f) {
70+
(*f)(pending, current);
71+
}
72+
}
6373
};
6474
QuiesceMap latest_ack;
6575
std::unordered_map<QuiesceRoot, QuiescingRoot> quiesce_requests;
@@ -98,19 +108,12 @@ class QuiesceAgentTest : public testing::Test {
98108
auto [it, inserted] = quiesce_requests.try_emplace(r, req_id, c);
99109

100110
if (!inserted) {
101-
// we must update the request id so that old one can't cancel this request.
102-
it->second.first = req_id;
103-
if (it->second.second) {
104-
it->second.second->complete(-EINTR);
105-
it->second.second = c;
106-
} else {
107-
// if we have no context, it means we've completed it
108-
// since we weren't inserted, we must have successfully quiesced
109-
c->complete(0);
110-
}
111+
// it's a conflict that MDCache doesn't deal with
112+
c->complete(-EINPROGRESS);
113+
return req_id;
114+
} else {
115+
return it->second.first;
111116
}
112-
113-
return it->second.first;
114117
};
115118

116119
ci.cancel_request = [this](RequestHandle h) {
@@ -171,10 +174,10 @@ class QuiesceAgentTest : public testing::Test {
171174
}
172175

173176
template <class _Rep = std::chrono::seconds::rep, class _Period = std::chrono::seconds::period, typename D = std::chrono::duration<_Rep, _Period>>
174-
bool await_idle_v(QuiesceDbVersion version, D timeout = std::chrono::duration_cast<D>(std::chrono::seconds(10)))
177+
bool await_idle_v(QuiesceSetVersion v, D timeout = std::chrono::duration_cast<D>(std::chrono::seconds(10)))
175178
{
176-
return timed_run(timeout, [this, version] {
177-
while (version > agent->await_idle()) { };
179+
return timed_run(timeout, [this, v] {
180+
while (QuiesceDbVersion {1, v} > agent->await_idle()) { };
178181
});
179182
}
180183

@@ -475,32 +478,35 @@ TEST_F(QuiesceAgentTest, DuplicateQuiesceRequest) {
475478

476479
EXPECT_TRUE(await_idle());
477480

478-
// now we should have seen the ack with root2 quiesced
481+
// root1 and root2 are still registered internally
482+
// so it should result in a failure to quiesce them again
479483
EXPECT_EQ(3, latest_ack.db_version);
480-
EXPECT_EQ(1, latest_ack.roots.size());
481-
EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root1").state);
484+
EXPECT_EQ(2, latest_ack.roots.size());
485+
EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root1").state);
486+
EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root2").state);
482487

483488
// the actual state of the pinned objects shouldn't have changed
484489
EXPECT_EQ(QS_QUIESCED, pinned1->get_actual_state());
485-
EXPECT_EQ(QS_FAILED, pinned2->get_actual_state());
490+
EXPECT_EQ(QS_QUIESCING, pinned2->get_actual_state());
486491

487492
EXPECT_EQ(0, *pinned1->quiesce_result);
488-
EXPECT_EQ(-EINTR, *pinned2->quiesce_result);
493+
EXPECT_FALSE(pinned2->quiesce_result.has_value());
489494

490-
// releasing the pinned objects will attempt to cancel, but that shouldn't interfere with the current state
495+
// releasing the pinned objects should cancel and remove from internal requests
491496
pinned1.reset();
492497
pinned2.reset();
493498

494-
EXPECT_TRUE(quiesce_requests.contains("root1"));
495-
EXPECT_TRUE(quiesce_requests.contains("root2"));
499+
EXPECT_FALSE(quiesce_requests.contains("root1"));
500+
EXPECT_FALSE(quiesce_requests.contains("root2"));
496501

497-
EXPECT_TRUE(complete_quiesce("root2"));
502+
EXPECT_TRUE(complete_quiesce("root3"));
498503

499504
EXPECT_TRUE(await_idle());
500505
EXPECT_EQ(3, latest_ack.db_version);
501-
EXPECT_EQ(2, latest_ack.roots.size());
502-
EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root1").state);
503-
EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root2").state);
506+
EXPECT_EQ(3, latest_ack.roots.size());
507+
EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root1").state);
508+
EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root2").state);
509+
EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root3").state);
504510
}
505511

506512
TEST_F(QuiesceAgentTest, TimeoutBeforeComplete)
@@ -543,3 +549,51 @@ TEST_F(QuiesceAgentTest, TimeoutBeforeComplete)
543549
EXPECT_EQ(0, tracked.size());
544550
}
545551
}
552+
553+
554+
TEST_F(QuiesceAgentTest, RapidDbUpdates)
555+
{
556+
// This validates that the same new root that happens to be reported
557+
// more than once before we have chance to process it is not submitted
558+
// multiple times
559+
560+
// set a handler that will post v2 whlie we're working on v1
561+
agent->before_work = [this](TestQuiesceAgent::TRV& p, TestQuiesceAgent::TRV& c) {
562+
if (c.db_version.set_version != 1) {
563+
return;
564+
}
565+
agent->before_work.reset();
566+
auto ack = update(2, {
567+
{ "root1", QS_QUIESCING },
568+
{ "root2", QS_QUIESCING },
569+
});
570+
571+
ASSERT_TRUE(ack.has_value());
572+
EXPECT_EQ(2, ack->db_version);
573+
EXPECT_EQ(0, ack->roots.size());
574+
};
575+
576+
{
577+
auto ack = update(1, {
578+
{ "root1", QS_QUIESCING },
579+
});
580+
581+
ASSERT_TRUE(ack.has_value());
582+
EXPECT_EQ(1, ack->db_version);
583+
EXPECT_EQ(0, ack->roots.size());
584+
}
585+
586+
EXPECT_TRUE(await_idle_v(2));
587+
588+
// nothing should be in the ack
589+
// if we incorrectly submit root1 twice
590+
// then it should be repored here as FAILED
591+
EXPECT_EQ(2, latest_ack.db_version);
592+
EXPECT_EQ(0, latest_ack.roots.size());
593+
594+
{
595+
auto tracked = agent->tracked_roots();
596+
EXPECT_EQ(2, tracked.size());
597+
}
598+
}
599+

0 commit comments

Comments
 (0)