Skip to content

Commit 7809b0e

Browse files
authored
Merge pull request ceph#50105 from zhsgao/mds_export_state
mds: add an asok command to dump export states Reviewed-by: Venky Shankar <[email protected]>
2 parents 94ce78a + 5506ed6 commit 7809b0e

File tree

7 files changed

+239
-21
lines changed

7 files changed

+239
-21
lines changed

qa/tasks/cephfs/test_exports.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from tasks.cephfs.fuse_mount import FuseMount
55
from tasks.cephfs.cephfs_test_case import CephFSTestCase
66
from teuthology.exceptions import CommandFailedError
7+
from teuthology.contextutil import safe_while, MaxWhileTries
78

89
log = logging.getLogger(__name__)
910

@@ -628,3 +629,98 @@ def test_ephemeral_pin_shrink_mds(self):
628629
log.info("{0} migrations have occured due to the cluster resizing".format(count))
629630
# rebalancing from 3 -> 2 may cause half of rank 0/1 to move and all of rank 2
630631
self.assertLessEqual((count/len(subtrees_old)), (1.0/3.0/2.0 + 1.0/3.0/2.0 + 1.0/3.0)*1.25) # aka .66 with 25% overbudget
632+
633+
class TestDumpExportStates(CephFSTestCase):
634+
MDSS_REQUIRED = 2
635+
CLIENTS_REQUIRED = 1
636+
637+
EXPORT_STATES = ['locking', 'discovering', 'freezing', 'prepping', 'warning', 'exporting']
638+
639+
def setUp(self):
640+
super().setUp()
641+
642+
self.fs.set_max_mds(self.MDSS_REQUIRED)
643+
self.status = self.fs.wait_for_daemons()
644+
645+
self.mount_a.run_shell_payload('mkdir -p test/export')
646+
647+
def tearDown(self):
648+
super().tearDown()
649+
650+
def _wait_for_export_target(self, source, target, sleep=2, timeout=10):
651+
try:
652+
with safe_while(sleep=sleep, tries=timeout//sleep) as proceed:
653+
while proceed():
654+
info = self.fs.getinfo().get_rank(self.fs.id, source)
655+
log.info(f'waiting for rank {target} to be added to the export target')
656+
if target in info['export_targets']:
657+
return
658+
except MaxWhileTries as e:
659+
raise RuntimeError(f'rank {target} has not been added to export target after {timeout}s') from e
660+
661+
def _dump_export_state(self, rank):
662+
states = self.fs.rank_asok(['dump_export_states'], rank=rank, status=self.status)
663+
self.assertTrue(type(states) is list)
664+
self.assertEqual(len(states), 1)
665+
return states[0]
666+
667+
def _test_base(self, path, source, target, state_index, kill):
668+
self.fs.rank_asok(['config', 'set', 'mds_kill_import_at', str(kill)], rank=target, status=self.status)
669+
670+
self.fs.rank_asok(['export', 'dir', path, str(target)], rank=source, status=self.status)
671+
self._wait_for_export_target(source, target)
672+
673+
target_rank = self.fs.get_rank(rank=target, status=self.status)
674+
self.delete_mds_coredump(target_rank['name'])
675+
676+
state = self._dump_export_state(source)
677+
678+
self.assertTrue(type(state['tid']) is int)
679+
self.assertEqual(state['path'], path)
680+
self.assertEqual(state['state'], self.EXPORT_STATES[state_index])
681+
self.assertEqual(state['peer'], target)
682+
683+
return state
684+
685+
def _test_state_history(self, state):
686+
history = state['state_history']
687+
self.assertTrue(type(history) is dict)
688+
size = 0
689+
for name in self.EXPORT_STATES:
690+
self.assertTrue(type(history[name]) is dict)
691+
size += 1
692+
if name == state['state']:
693+
break
694+
self.assertEqual(len(history), size)
695+
696+
def _test_freeze_tree(self, state, waiters):
697+
self.assertTrue(type(state['freeze_tree_time']) is float)
698+
self.assertEqual(state['unfreeze_tree_waiters'], waiters)
699+
700+
def test_discovering(self):
701+
state = self._test_base('/test', 0, 1, 1, 1)
702+
703+
self._test_state_history(state)
704+
self._test_freeze_tree(state, 0)
705+
706+
self.assertEqual(state['last_cum_auth_pins'], 0)
707+
self.assertEqual(state['num_remote_waiters'], 0)
708+
709+
def test_prepping(self):
710+
client_id = self.mount_a.get_global_id()
711+
712+
state = self._test_base('/test', 0, 1, 3, 3)
713+
714+
self._test_state_history(state)
715+
self._test_freeze_tree(state, 0)
716+
717+
self.assertEqual(state['flushed_clients'], [client_id])
718+
self.assertTrue(type(state['warning_ack_waiting']) is list)
719+
720+
def test_exporting(self):
721+
state = self._test_base('/test', 0, 1, 5, 5)
722+
723+
self._test_state_history(state)
724+
self._test_freeze_tree(state, 0)
725+
726+
self.assertTrue(type(state['notify_ack_waiting']) is list)

src/mds/CDir.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,16 @@ class CDir : public MDSCacheObject, public Counter<CDir> {
546546

547547
void maybe_finish_freeze();
548548

549+
size_t count_unfreeze_tree_waiters() {
550+
size_t n = count_unfreeze_dir_waiters();
551+
_walk_tree([&n](CDir *dir) {
552+
n += dir->count_unfreeze_dir_waiters();
553+
return true;
554+
});
555+
return n;
556+
}
557+
inline size_t count_unfreeze_dir_waiters() const { return count_waiters(WAIT_UNFREEZE); }
558+
549559
std::pair<bool,bool> is_freezing_or_frozen_tree() const {
550560
if (freeze_tree_state) {
551561
if (freeze_tree_state->frozen)

src/mds/MDSCacheObject.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,8 @@ class MDSCacheObject {
260260
}
261261
bool is_waiter_for(waitmask_t mask);
262262

263+
inline size_t count_waiters(uint64_t mask) const { return waiting.count(mask); }
264+
263265
virtual void add_waiter(uint64_t mask, MDSContext *c) {
264266
add_waiter(waitmask_t(mask), c);
265267
}

src/mds/MDSDaemon.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,10 @@ void MDSDaemon::set_up_admin_socket()
304304
asok_hook,
305305
"show recent ops, sorted by op duration");
306306
ceph_assert(r == 0);
307+
r = admin_socket->register_command("dump_export_states",
308+
asok_hook,
309+
"dump export states");
310+
ceph_assert(r == 0);
307311
r = admin_socket->register_command("scrub_path name=path,type=CephString "
308312
"name=scrubops,type=CephChoices,"
309313
"strings=force|recursive|repair,n=N,req=false "

src/mds/MDSRank.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2769,6 +2769,9 @@ void MDSRankDispatcher::handle_asok_command(
27692769
if (!op_tracker.dump_historic_ops(f, true)) {
27702770
*css << "op_tracker disabled; set mds_enable_op_tracker=true to enable";
27712771
}
2772+
} else if (command == "dump_export_states") {
2773+
std::lock_guard l(mds_lock);
2774+
mdcache->migrator->dump_export_states(f);
27722775
} else if (command == "osdmap barrier") {
27732776
int64_t target_epoch = 0;
27742777
bool got_val = cmd_getval(cmdmap, "target_epoch", target_epoch);

src/mds/Migrator.cc

Lines changed: 93 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -268,12 +268,12 @@ void Migrator::export_try_cancel(CDir *dir, bool notify_peer)
268268
case EXPORT_LOCKING:
269269
dout(10) << "export state=locking : dropping locks and removing auth_pin" << dendl;
270270
num_locking_exports--;
271-
it->second.state = EXPORT_CANCELLED;
271+
it->second.set_state(EXPORT_CANCELLED);
272272
dir->auth_unpin(this);
273273
break;
274274
case EXPORT_DISCOVERING:
275275
dout(10) << "export state=discovering : canceling freeze and removing auth_pin" << dendl;
276-
it->second.state = EXPORT_CANCELLED;
276+
it->second.set_state(EXPORT_CANCELLED);
277277
dir->unfreeze_tree(); // cancel the freeze
278278
dir->auth_unpin(this);
279279
if (notify_peer &&
@@ -286,7 +286,7 @@ void Migrator::export_try_cancel(CDir *dir, bool notify_peer)
286286

287287
case EXPORT_FREEZING:
288288
dout(10) << "export state=freezing : canceling freeze" << dendl;
289-
it->second.state = EXPORT_CANCELLED;
289+
it->second.set_state(EXPORT_CANCELLED);
290290
dir->unfreeze_tree(); // cancel the freeze
291291
if (dir->is_subtree_root())
292292
mdcache->try_subtree_merge(dir);
@@ -301,13 +301,13 @@ void Migrator::export_try_cancel(CDir *dir, bool notify_peer)
301301
// NOTE: state order reversal, warning comes after prepping
302302
case EXPORT_WARNING:
303303
dout(10) << "export state=warning : unpinning bounds, unfreezing, notifying" << dendl;
304-
it->second.state = EXPORT_CANCELLING;
304+
it->second.set_state(EXPORT_CANCELLING);
305305
// fall-thru
306306

307307
case EXPORT_PREPPING:
308308
if (state != EXPORT_WARNING) {
309309
dout(10) << "export state=prepping : unpinning bounds, unfreezing" << dendl;
310-
it->second.state = EXPORT_CANCELLED;
310+
it->second.set_state(EXPORT_CANCELLED);
311311
}
312312

313313
{
@@ -340,7 +340,7 @@ void Migrator::export_try_cancel(CDir *dir, bool notify_peer)
340340

341341
case EXPORT_EXPORTING:
342342
dout(10) << "export state=exporting : reversing, and unfreezing" << dendl;
343-
it->second.state = EXPORT_CANCELLING;
343+
it->second.set_state(EXPORT_CANCELLING);
344344
export_reverse(dir, it->second);
345345
break;
346346

@@ -865,7 +865,7 @@ void Migrator::export_dir(CDir *dir, mds_rank_t dest)
865865
ceph_assert(export_state.count(dir) == 0);
866866
export_state_t& stat = export_state[dir];
867867
num_locking_exports++;
868-
stat.state = EXPORT_LOCKING;
868+
stat.set_state(EXPORT_LOCKING);
869869
stat.peer = dest;
870870
stat.tid = mdr->reqid.tid;
871871
stat.mut = mdr;
@@ -1140,7 +1140,7 @@ void Migrator::dispatch_export_dir(const MDRequestRef& mdr, int count)
11401140

11411141
if (results.size() == 1 && results.front().first == dir) {
11421142
num_locking_exports--;
1143-
it->second.state = EXPORT_DISCOVERING;
1143+
it->second.set_state(EXPORT_DISCOVERING);
11441144
// send ExportDirDiscover (ask target)
11451145
filepath path;
11461146
dir->inode->make_path(path);
@@ -1191,7 +1191,7 @@ void Migrator::dispatch_export_dir(const MDRequestRef& mdr, int count)
11911191
ceph_assert(export_state.count(sub) == 0);
11921192
auto& stat = export_state[sub];
11931193
num_locking_exports++;
1194-
stat.state = EXPORT_LOCKING;
1194+
stat.set_state(EXPORT_LOCKING);
11951195
stat.peer = dest;
11961196
stat.tid = _mdr->reqid.tid;
11971197
stat.mut = _mdr;
@@ -1244,7 +1244,7 @@ void Migrator::handle_export_discover_ack(const cref_t<MExportDirDiscoverAck> &m
12441244

12451245
if (m->is_success()) {
12461246
// move to freezing the subtree
1247-
it->second.state = EXPORT_FREEZING;
1247+
it->second.set_state(EXPORT_FREEZING);
12481248
auto&& mdr = boost::static_pointer_cast<MDRequestImpl>(std::move(it->second.mut));
12491249
ceph_assert(!it->second.mut); // should have been moved out of
12501250

@@ -1427,18 +1427,18 @@ void Migrator::export_frozen(CDir *dir, uint64_t tid)
14271427
}
14281428

14291429
// send.
1430-
it->second.state = EXPORT_PREPPING;
1430+
it->second.set_state(EXPORT_PREPPING);
14311431
mds->send_message_mds(prep, it->second.peer);
14321432
ceph_assert(g_conf()->mds_kill_export_at != 4);
14331433

14341434
// make sure any new instantiations of caps are flushed out
14351435
ceph_assert(it->second.warning_ack_waiting.empty());
14361436

1437-
set<client_t> export_client_set;
1438-
get_export_client_set(dir, export_client_set);
1437+
ceph_assert(it->second.export_client_set.empty());
1438+
get_export_client_set(dir, it->second.export_client_set);
14391439

14401440
MDSGatherBuilder gather(g_ceph_context);
1441-
mds->server->flush_client_sessions(export_client_set, gather);
1441+
mds->server->flush_client_sessions(it->second.export_client_set, gather);
14421442
if (gather.has_subs()) {
14431443
it->second.warning_ack_waiting.insert(MDS_RANK_NONE);
14441444
gather.set_finisher(new C_M_ExportSessionsFlushed(this, dir, it->second.tid));
@@ -1537,7 +1537,7 @@ void Migrator::handle_export_prep_ack(const cref_t<MExportDirPrepAck> &m)
15371537

15381538
}
15391539

1540-
it->second.state = EXPORT_WARNING;
1540+
it->second.set_state(EXPORT_WARNING);
15411541

15421542
ceph_assert(g_conf()->mds_kill_export_at != 6);
15431543
// nobody to warn?
@@ -1587,8 +1587,8 @@ void Migrator::export_go_synced(CDir *dir, uint64_t tid)
15871587
dout(7) << *dir << " to " << dest << dendl;
15881588

15891589
mdcache->show_subtrees();
1590-
1591-
it->second.state = EXPORT_EXPORTING;
1590+
1591+
it->second.set_state(EXPORT_EXPORTING);
15921592
ceph_assert(g_conf()->mds_kill_export_at != 7);
15931593

15941594
ceph_assert(dir->is_frozen_tree_root());
@@ -1933,7 +1933,7 @@ void Migrator::handle_export_ack(const cref_t<MExportDirAck> &m)
19331933
auto bp = m->imported_caps.cbegin();
19341934
decode(it->second.peer_imported, bp);
19351935

1936-
it->second.state = EXPORT_LOGGINGFINISH;
1936+
it->second.set_state(EXPORT_LOGGINGFINISH);
19371937
ceph_assert(g_conf()->mds_kill_export_at != 9);
19381938
set<CDir*> bounds;
19391939
mdcache->get_subtree_bounds(dir, bounds);
@@ -1970,7 +1970,7 @@ void Migrator::export_notify_abort(CDir *dir, export_state_t& stat, set<CDir*>&
19701970
ceph_assert(stat.state == EXPORT_CANCELLING);
19711971

19721972
if (stat.notify_ack_waiting.empty()) {
1973-
stat.state = EXPORT_CANCELLED;
1973+
stat.set_state(EXPORT_CANCELLED);
19741974
return;
19751975
}
19761976

@@ -2095,7 +2095,7 @@ void Migrator::export_logged_finish(CDir *dir)
20952095
}
20962096

20972097
// wait for notifyacks
2098-
stat.state = EXPORT_NOTIFYING;
2098+
stat.set_state(EXPORT_NOTIFYING);
20992099
ceph_assert(g_conf()->mds_kill_export_at != 11);
21002100

21012101
// no notifies to wait for?
@@ -3217,6 +3217,79 @@ void Migrator::import_finish(CDir *dir, bool notify, bool last)
32173217
}
32183218
}
32193219

3220+
void Migrator::dump_export_states(Formatter *f)
3221+
{
3222+
f->open_array_section("states");
3223+
for (const auto& [dir, state] : export_state) {
3224+
f->open_object_section("state");
3225+
3226+
f->dump_unsigned("tid", state.tid);
3227+
3228+
dir->dump(f, CDir::DUMP_PATH | CDir::DUMP_DIRFRAG);
3229+
3230+
f->dump_string("state", get_export_statename(state.state));
3231+
3232+
f->open_object_section("state_history");
3233+
for (const auto& [s, _1] : state.state_history) {
3234+
f->open_object_section(get_export_statename(s));
3235+
f->dump_stream("start_at") << state.get_start_time(s);
3236+
f->dump_float("time_spent", state.get_time_spent(s));
3237+
f->close_section();
3238+
}
3239+
f->close_section();
3240+
3241+
f->dump_int("peer", state.peer);
3242+
3243+
switch (state.state) {
3244+
case EXPORT_DISCOVERING:
3245+
case EXPORT_FREEZING:
3246+
f->dump_stream("last_cum_auth_pins_change") << state.last_cum_auth_pins_change;
3247+
f->dump_int("last_cum_auth_pins", state.last_cum_auth_pins);
3248+
f->dump_int("num_remote_waiters", state.num_remote_waiters);
3249+
3250+
break;
3251+
3252+
case EXPORT_PREPPING:
3253+
case EXPORT_WARNING:
3254+
f->open_array_section("flushed_clients");
3255+
for (const auto &client : state.export_client_set)
3256+
f->dump_int("client", client.v);
3257+
f->close_section();
3258+
3259+
f->open_array_section("warning_ack_waiting");
3260+
for (const auto &rank : state.warning_ack_waiting)
3261+
f->dump_int("rank", rank);
3262+
f->close_section();
3263+
3264+
if (state.state == EXPORT_PREPPING)
3265+
break;
3266+
// fall-thru
3267+
3268+
case EXPORT_EXPORTING:
3269+
case EXPORT_LOGGINGFINISH:
3270+
case EXPORT_NOTIFYING:
3271+
f->open_array_section("notify_ack_waiting");
3272+
for (const auto &rank : state.notify_ack_waiting)
3273+
f->dump_int("rank", rank);
3274+
f->close_section();
3275+
3276+
break;
3277+
3278+
default:
3279+
break;
3280+
}
3281+
3282+
if (state.state >= EXPORT_DISCOVERING) {
3283+
f->dump_unsigned("approx_size", state.approx_size);
3284+
f->dump_unsigned("unfreeze_tree_waiters", dir->count_unfreeze_tree_waiters());
3285+
f->dump_float("freeze_tree_time", state.get_freeze_tree_time());
3286+
}
3287+
3288+
f->close_section();
3289+
}
3290+
f->close_section();
3291+
}
3292+
32203293
void Migrator::decode_import_inode(CDentry *dn, bufferlist::const_iterator& blp,
32213294
mds_rank_t oldauth, LogSegment *ls,
32223295
map<CInode*, map<client_t,Capability::Export> >& peer_exports,

0 commit comments

Comments
 (0)