Skip to content

Commit f84b45c

Browse files
Merge pull request ceph#56956 from ceph/wip-lusov-quiesce-agent-race
mds/quiesce: agent: avoid a race condition with rapid db updates
2 parents 4e987b6 + 2a3faf1 commit f84b45c

File tree

4 files changed

+136
-85
lines changed

4 files changed

+136
-85
lines changed

src/mds/MDSRankQuiesce.cc

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -518,16 +518,9 @@ void MDSRank::quiesce_agent_setup() {
518518

519519
if (!inserted) {
520520
dout(3) << "duplicate quiesce request for root '" << it->first << "'" << dendl;
521-
// we must update the request id so that old one can't cancel this request.
522-
it->second.first = req_id;
523-
if (it->second.second) {
524-
it->second.second->complete(-EINTR);
525-
it->second.second = c;
526-
} else {
527-
// if we have no context, it means we've completed it
528-
// since we weren't inserted, we must have successfully quiesced
529-
c->complete(0);
530-
}
521+
// report error for the duplicate request, just as MDCache would do
522+
c->complete(-EINPROGRESS);
523+
return std::nullopt;
531524
} else if (debug_rank && (debug_rank != whoami)) {
532525
// the root was pinned to a different rank
533526
// we should acknowledge the quiesce regardless of the other flags

src/mds/QuiesceAgent.cc

Lines changed: 34 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -87,36 +87,38 @@ bool QuiesceAgent::db_update(QuiesceMap& map)
8787
}
8888

8989
void* QuiesceAgent::agent_thread_main() {
90-
working.clear();
91-
std::unique_lock lock(agent_mutex);
90+
std::unique_lock agent_lock(agent_mutex);
91+
92+
while (!stop_agent_thread) {
93+
TrackedRootsVersion old;
9294

93-
while(!stop_agent_thread) {
9495
if (pending.armed) {
95-
working.roots.swap(pending.roots);
96-
working.db_version = pending.db_version;
97-
} else {
98-
// copy current roots
99-
working.roots = current.roots;
100-
working.db_version = current.db_version;
96+
std::swap(old, current);
97+
current.roots.swap(pending.roots);
98+
current.db_version = pending.db_version;
10199
}
102100

103101
dout(20)
104-
<< "current = " << current.db_version
105-
<< ", working = " << working.db_version
106-
<< ", pending = " << pending.db_version << dendl;
107-
108-
current.armed = false;
109-
working.armed = true;
102+
<< "old = " << old.db_version
103+
<< ", current = " << current.db_version
104+
<< dendl;
110105

111-
// it's safe to clear the pending roots under lock because it shouldn't
106+
// it's safe to clear the pending roots under agent_lock because it shouldn't
112107
// ever hold a last shared ptr to quiesced tracked roots, causing their destructors to run cancel.
113108
pending.clear();
114-
lock.unlock();
109+
current.armed = true;
110+
upkeep_needed = false;
111+
112+
// for somebody waiting for the internal state to progress
113+
agent_cond.notify_all();
114+
agent_lock.unlock();
115+
116+
_agent_thread_will_work();
115117

116-
QuiesceMap ack(working.db_version);
118+
QuiesceMap ack(current.db_version);
117119

118120
// upkeep what we believe is the current state.
119-
for (auto& [root, info] : working.roots) {
121+
for (auto& [root, info] : current.roots) {
120122

121123
info->lock();
122124
bool should_quiesce = info->should_quiesce();
@@ -141,7 +143,7 @@ void* QuiesceAgent::agent_thread_main() {
141143
info->unlock();
142144

143145
// TODO: capturing QuiesceAgent& `this` is potentially dangerous
144-
// the assumption is that since the root pointer is weak
146+
// the assumption is that since the tracked root pointer is weak
145147
// it will have been deleted by the QuiesceAgent shutdown sequence
146148
set_upkeep_needed();
147149
}
@@ -165,37 +167,30 @@ void* QuiesceAgent::agent_thread_main() {
165167
}
166168
}
167169

168-
lock.lock();
170+
_agent_thread_did_work();
169171

170-
bool new_version = current.db_version < working.db_version;
171-
current.roots.swap(working.roots);
172-
current.db_version = working.db_version;
173-
174-
lock.unlock();
175-
176-
// clear the old roots and send the ack outside of the lock
177-
working.roots.clear();
172+
// send the ack and clear the old roots outside of the lock
173+
bool new_version = current.db_version != old.db_version;
178174
if (new_version || !ack.roots.empty()) {
179175
dout(20) << "asyncrhonous ack for " << (new_version ? "a new" : "the current") << " version: " << ack << dendl;
180176
int rc = quiesce_control.agent_ack(std::move(ack));
181177
if (rc != 0) {
182178
dout(3) << "got error: " << rc << " trying to send " << ack << dendl;
183179
}
184180
}
181+
old.clear();
185182
ack.clear();
186183

187-
lock.lock();
188-
189-
// notify that we're done working on this version and all acks (if any) were sent
190-
working.clear();
184+
agent_lock.lock();
191185

186+
current.armed = false;
192187
// a new pending version could be set while we weren't locked
193188
// if that's the case just go for another pass
194189
// otherwise, wait for updates
195-
if (!pending.armed && !current.armed && !stop_agent_thread) {
190+
while (!pending.armed && !current.armed && !upkeep_needed && !stop_agent_thread) {
196191
// for somebody waiting for the thread to idle
197192
agent_cond.notify_all();
198-
agent_cond.wait(lock);
193+
agent_cond.wait(agent_lock);
199194
}
200195
}
201196
agent_cond.notify_all();
@@ -206,13 +201,11 @@ void QuiesceAgent::set_pending_roots(QuiesceDbVersion version, TrackedRoots&& ne
206201
{
207202
std::unique_lock l(agent_mutex);
208203

209-
auto actual_version = std::max(current.db_version, working.db_version);
210-
bool rollback = actual_version > version;
211-
204+
bool rollback = current.db_version > version;
205+
212206
if (rollback) {
213207
dout(5) << "version rollback to " << version
214-
<< ". current = " << current.db_version
215-
<< ", working = " << working.db_version
208+
<< ". current = " << current.db_version
216209
<< ", pending = " << pending.db_version << dendl;
217210
}
218211

@@ -230,10 +223,9 @@ void QuiesceAgent::set_upkeep_needed()
230223

231224
dout(20)
232225
<< "current = " << current.db_version
233-
<< ", working = " << working.db_version
234226
<< ", pending = " << pending.db_version << dendl;
235227

236-
current.armed = true;
228+
upkeep_needed = true;
237229
agent_cond.notify_all();
238230
}
239231

src/mds/QuiesceAgent.h

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class QuiesceAgent {
3333
agent_thread.create("quiesce.agt");
3434
};
3535

36-
~QuiesceAgent() {
36+
virtual ~QuiesceAgent() {
3737
shutdown();
3838
}
3939

@@ -213,20 +213,28 @@ class QuiesceAgent {
213213
operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceAgent::TrackedRootsVersion& tr);
214214

215215
TrackedRootsVersion current;
216-
TrackedRootsVersion working;
217216
TrackedRootsVersion pending;
218217

219218
std::mutex agent_mutex;
220219
std::condition_variable agent_cond;
221220
bool stop_agent_thread;
221+
bool upkeep_needed;
222222

223223
template<class L>
224224
QuiesceDbVersion await_idle_locked(L &lock) {
225-
agent_cond.wait(lock, [this] {
226-
return !(current.armed || working.armed || pending.armed);
225+
return await_phase_locked(lock, false, false);
226+
}
227+
228+
template <class L>
229+
QuiesceDbVersion await_phase_locked(L& lock, bool pending_armed, bool current_armed)
230+
{
231+
agent_cond.wait(lock, [=, this] {
232+
return ( !upkeep_needed
233+
&& current.armed == current_armed
234+
&& pending.armed == pending_armed);
227235
});
228236

229-
return current.db_version;
237+
return std::max(current.db_version, pending.db_version);
230238
}
231239

232240
void set_pending_roots(QuiesceDbVersion db_version, TrackedRoots&& new_roots);
@@ -249,4 +257,8 @@ class QuiesceAgent {
249257
} agent_thread;
250258

251259
void* agent_thread_main();
260+
261+
virtual void _agent_thread_will_work() { }
262+
virtual void _agent_thread_did_work() { }
263+
252264
};

src/test/mds/TestQuiesceAgent.cc

Lines changed: 82 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class QuiesceAgentTest : public testing::Test {
5050
QuiesceDbVersion get_latest_version()
5151
{
5252
std::lock_guard l(agent_mutex);
53-
return std::max({current.db_version, working.db_version, pending.db_version});
53+
return std::max(current.db_version, pending.db_version);
5454
}
5555
TrackedRoots& mutable_tracked_roots() {
5656
return current.roots;
@@ -60,6 +60,16 @@ class QuiesceAgentTest : public testing::Test {
6060
std::unique_lock l(agent_mutex);
6161
return await_idle_locked(l);
6262
}
63+
64+
using TRV = TrackedRootsVersion;
65+
std::optional<std::function<void(TRV& pending, TRV& current)>> before_work;
66+
67+
void _agent_thread_will_work() {
68+
auto f = before_work;
69+
if (f) {
70+
(*f)(pending, current);
71+
}
72+
}
6373
};
6474
QuiesceMap latest_ack;
6575
std::unordered_map<QuiesceRoot, QuiescingRoot> quiesce_requests;
@@ -98,19 +108,12 @@ class QuiesceAgentTest : public testing::Test {
98108
auto [it, inserted] = quiesce_requests.try_emplace(r, req_id, c);
99109

100110
if (!inserted) {
101-
// we must update the request id so that old one can't cancel this request.
102-
it->second.first = req_id;
103-
if (it->second.second) {
104-
it->second.second->complete(-EINTR);
105-
it->second.second = c;
106-
} else {
107-
// if we have no context, it means we've completed it
108-
// since we weren't inserted, we must have successfully quiesced
109-
c->complete(0);
110-
}
111+
// it's a conflict that MDCache doesn't deal with
112+
c->complete(-EINPROGRESS);
113+
return req_id;
114+
} else {
115+
return it->second.first;
111116
}
112-
113-
return it->second.first;
114117
};
115118

116119
ci.cancel_request = [this](RequestHandle h) {
@@ -177,10 +180,10 @@ class QuiesceAgentTest : public testing::Test {
177180
}
178181

179182
template <class _Rep = std::chrono::seconds::rep, class _Period = std::chrono::seconds::period, typename D = std::chrono::duration<_Rep, _Period>>
180-
bool await_idle_v(QuiesceDbVersion version, D timeout = std::chrono::duration_cast<D>(std::chrono::seconds(10)))
183+
bool await_idle_v(QuiesceSetVersion v, D timeout = std::chrono::duration_cast<D>(std::chrono::seconds(10)))
181184
{
182-
return timed_run(timeout, [this, version] {
183-
while (version > agent->await_idle()) { };
185+
return timed_run(timeout, [this, v] {
186+
while (QuiesceDbVersion {1, v} > agent->await_idle()) { };
184187
});
185188
}
186189

@@ -481,32 +484,35 @@ TEST_F(QuiesceAgentTest, DuplicateQuiesceRequest) {
481484

482485
EXPECT_TRUE(await_idle());
483486

484-
// now we should have seen the ack with root2 quiesced
487+
// root1 and root2 are still registered internally
488+
// so it should result in a failure to quiesce them again
485489
EXPECT_EQ(3, latest_ack.db_version);
486-
EXPECT_EQ(1, latest_ack.roots.size());
487-
EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root1").state);
490+
EXPECT_EQ(2, latest_ack.roots.size());
491+
EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root1").state);
492+
EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root2").state);
488493

489494
// the actual state of the pinned objects shouldn't have changed
490495
EXPECT_EQ(QS_QUIESCED, pinned1->get_actual_state());
491-
EXPECT_EQ(QS_FAILED, pinned2->get_actual_state());
496+
EXPECT_EQ(QS_QUIESCING, pinned2->get_actual_state());
492497

493498
EXPECT_EQ(0, *pinned1->quiesce_result);
494-
EXPECT_EQ(-EINTR, *pinned2->quiesce_result);
499+
EXPECT_FALSE(pinned2->quiesce_result.has_value());
495500

496-
// releasing the pinned objects will attempt to cancel, but that shouldn't interfere with the current state
501+
// releasing the pinned objects should cancel and remove from internal requests
497502
pinned1.reset();
498503
pinned2.reset();
499504

500-
EXPECT_TRUE(quiesce_requests.contains("root1"));
501-
EXPECT_TRUE(quiesce_requests.contains("root2"));
505+
EXPECT_FALSE(quiesce_requests.contains("root1"));
506+
EXPECT_FALSE(quiesce_requests.contains("root2"));
502507

503-
EXPECT_TRUE(complete_quiesce("root2"));
508+
EXPECT_TRUE(complete_quiesce("root3"));
504509

505510
EXPECT_TRUE(await_idle());
506511
EXPECT_EQ(3, latest_ack.db_version);
507-
EXPECT_EQ(2, latest_ack.roots.size());
508-
EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root1").state);
509-
EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root2").state);
512+
EXPECT_EQ(3, latest_ack.roots.size());
513+
EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root1").state);
514+
EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root2").state);
515+
EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root3").state);
510516
}
511517

512518
TEST_F(QuiesceAgentTest, TimeoutBeforeComplete)
@@ -549,3 +555,51 @@ TEST_F(QuiesceAgentTest, TimeoutBeforeComplete)
549555
EXPECT_EQ(0, tracked.size());
550556
}
551557
}
558+
559+
560+
TEST_F(QuiesceAgentTest, RapidDbUpdates)
561+
{
562+
// This validates that the same new root that happens to be reported
563+
// more than once before we have chance to process it is not submitted
564+
// multiple times
565+
566+
// set a handler that will post v2 whlie we're working on v1
567+
agent->before_work = [this](TestQuiesceAgent::TRV& p, TestQuiesceAgent::TRV& c) {
568+
if (c.db_version.set_version != 1) {
569+
return;
570+
}
571+
agent->before_work.reset();
572+
auto ack = update(2, {
573+
{ "root1", QS_QUIESCING },
574+
{ "root2", QS_QUIESCING },
575+
});
576+
577+
ASSERT_TRUE(ack.has_value());
578+
EXPECT_EQ(2, ack->db_version);
579+
EXPECT_EQ(0, ack->roots.size());
580+
};
581+
582+
{
583+
auto ack = update(1, {
584+
{ "root1", QS_QUIESCING },
585+
});
586+
587+
ASSERT_TRUE(ack.has_value());
588+
EXPECT_EQ(1, ack->db_version);
589+
EXPECT_EQ(0, ack->roots.size());
590+
}
591+
592+
EXPECT_TRUE(await_idle_v(2));
593+
594+
// nothing should be in the ack
595+
// if we incorrectly submit root1 twice
596+
// then it should be repored here as FAILED
597+
EXPECT_EQ(2, latest_ack.db_version);
598+
EXPECT_EQ(0, latest_ack.roots.size());
599+
600+
{
601+
auto tracked = agent->tracked_roots();
602+
EXPECT_EQ(2, tracked.size());
603+
}
604+
}
605+

0 commit comments

Comments
 (0)