Skip to content

Commit 98242a7

Browse files
committed
Merge PR ceph#55471 into main
* refs/pull/55471/head: qa: verify labelled replication perf metrics qa: test per-client labelled perf counters mds: export per-client metrics as labelled perf counters cephfs_mirror: add labeled replication performance metrics cephfs-mirror: typo ending bracket Reviewed-by: Robin H. Johnson <[email protected]>
2 parents dfd6329 + f29dd57 commit 98242a7

File tree

13 files changed

+471
-15
lines changed

13 files changed

+471
-15
lines changed

qa/tasks/cephfs/test_admin.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from time import sleep
88

99
from teuthology.exceptions import CommandFailedError
10+
from teuthology.contextutil import safe_while
1011

1112
from tasks.cephfs.cephfs_test_case import CephFSTestCase, classhook
1213
from tasks.cephfs.filesystem import FileLayout, FSMissing
@@ -16,6 +17,58 @@
1617

1718
log = logging.getLogger(__name__)
1819

20+
class TestLabeledPerfCounters(CephFSTestCase):
21+
CLIENTS_REQUIRED = 2
22+
MDSS_REQUIRED = 1
23+
24+
def test_per_client_labeled_perf_counters(self):
25+
"""
26+
That the per-client labelled perf counters depict the clients
27+
performaing IO.
28+
"""
29+
def get_counters_for(filesystem, client_id):
30+
dump = self.fs.rank_tell(["counter", "dump"])
31+
per_client_metrics_key = f'mds_client_metrics-{filesystem}'
32+
counters = [c["counters"] for \
33+
c in dump[per_client_metrics_key] if c["labels"]["client"] == client_id]
34+
return counters[0]
35+
36+
# sleep a bit so that we get updated clients...
37+
sleep(10)
38+
39+
# lookout for clients...
40+
dump = self.fs.rank_tell(["counter", "dump"])
41+
42+
fs_suffix = dump["mds_client_metrics"][0]["labels"]["fs_name"]
43+
self.assertGreaterEqual(dump["mds_client_metrics"][0]["counters"]["num_clients"], 2)
44+
45+
per_client_metrics_key = f'mds_client_metrics-{fs_suffix}'
46+
mount_a_id = f'client.{self.mount_a.get_global_id()}'
47+
mount_b_id = f'client.{self.mount_b.get_global_id()}'
48+
49+
clients = [c["labels"]["client"] for c in dump[per_client_metrics_key]]
50+
self.assertIn(mount_a_id, clients)
51+
self.assertIn(mount_b_id, clients)
52+
53+
# write workload
54+
self.mount_a.create_n_files("test_dir/test_file", 1000, sync=True)
55+
with safe_while(sleep=1, tries=30, action=f'wait for counters - {mount_a_id}') as proceed:
56+
counters_dump_a = get_counters_for(fs_suffix, mount_a_id)
57+
while proceed():
58+
if counters_dump_a["total_write_ops"] > 0 and counters_dump_a["total_write_size"] > 0:
59+
return True
60+
61+
# read from the other client
62+
for i in range(100):
63+
self.mount_b.open_background(basename=f'test_dir/test_file_{i}', write=False)
64+
with safe_while(sleep=1, tries=30, action=f'wait for counters - {mount_b_id}') as proceed:
65+
counters_dump_b = get_counters_for(fs_suffix, mount_b_id)
66+
while proceed():
67+
if counters_dump_b["total_read_ops"] > 0 and counters_dump_b["total_read_size"] > 0:
68+
return True
69+
70+
self.fs.teardown()
71+
1972
class TestAdminCommands(CephFSTestCase):
2073
"""
2174
Tests for administration command.

qa/tasks/cephfs/test_mirroring.py

Lines changed: 71 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ class TestMirroring(CephFSTestCase):
2121

2222
MODULE_NAME = "mirroring"
2323

24+
PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR = "cephfs_mirror"
25+
PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_FS = "cephfs_mirror_mirrored_filesystems"
26+
PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER = "cephfs_mirror_peers"
27+
2428
def setUp(self):
2529
super(TestMirroring, self).setUp()
2630
self.primary_fs_name = self.fs.name
@@ -40,6 +44,9 @@ def disable_mirroring_module(self):
4044
self.run_ceph_cmd("mgr", "module", "disable", TestMirroring.MODULE_NAME)
4145

4246
def enable_mirroring(self, fs_name, fs_id):
47+
res = self.mirror_daemon_command(f'counter dump for fs: {fs_name}', 'counter', 'dump')
48+
vbefore = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR][0]
49+
4350
self.run_ceph_cmd("fs", "snapshot", "mirror", "enable", fs_name)
4451
time.sleep(10)
4552
# verify via asok
@@ -48,7 +55,19 @@ def enable_mirroring(self, fs_name, fs_id):
4855
self.assertTrue(res['peers'] == {})
4956
self.assertTrue(res['snap_dirs']['dir_count'] == 0)
5057

58+
# verify labelled perf counter
59+
res = self.mirror_daemon_command(f'counter dump for fs: {fs_name}', 'counter', 'dump')
60+
self.assertEqual(res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_FS][0]["labels"]["filesystem"],
61+
fs_name)
62+
vafter = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR][0]
63+
64+
self.assertGreater(vafter["counters"]["mirrored_filesystems"],
65+
vbefore["counters"]["mirrored_filesystems"])
66+
5167
def disable_mirroring(self, fs_name, fs_id):
68+
res = self.mirror_daemon_command(f'counter dump for fs: {fs_name}', 'counter', 'dump')
69+
vbefore = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR][0]
70+
5271
self.run_ceph_cmd("fs", "snapshot", "mirror", "disable", fs_name)
5372
time.sleep(10)
5473
# verify via asok
@@ -60,6 +79,13 @@ def disable_mirroring(self, fs_name, fs_id):
6079
else:
6180
raise RuntimeError('expected admin socket to be unavailable')
6281

82+
# verify labelled perf counter
83+
res = self.mirror_daemon_command(f'counter dump for fs: {fs_name}', 'counter', 'dump')
84+
vafter = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR][0]
85+
86+
self.assertLess(vafter["counters"]["mirrored_filesystems"],
87+
vbefore["counters"]["mirrored_filesystems"])
88+
6389
def verify_peer_added(self, fs_name, fs_id, peer_spec, remote_fs_name=None):
6490
# verify via asok
6591
res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}',
@@ -74,15 +100,27 @@ def verify_peer_added(self, fs_name, fs_id, peer_spec, remote_fs_name=None):
74100
else:
75101
self.assertTrue(self.fs_name == res['peers'][peer_uuid]['remote']['fs_name'])
76102

77-
def peer_add(self, fs_name, fs_id, peer_spec, remote_fs_name=None):
103+
def peer_add(self, fs_name, fs_id, peer_spec, remote_fs_name=None, check_perf_counter=True):
104+
if check_perf_counter:
105+
res = self.mirror_daemon_command(f'counter dump for fs: {fs_name}', 'counter', 'dump')
106+
vbefore = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_FS][0]
107+
78108
if remote_fs_name:
79109
self.run_ceph_cmd("fs", "snapshot", "mirror", "peer_add", fs_name, peer_spec, remote_fs_name)
80110
else:
81111
self.run_ceph_cmd("fs", "snapshot", "mirror", "peer_add", fs_name, peer_spec)
82112
time.sleep(10)
83113
self.verify_peer_added(fs_name, fs_id, peer_spec, remote_fs_name)
84114

115+
if check_perf_counter:
116+
res = self.mirror_daemon_command(f'counter dump for fs: {fs_name}', 'counter', 'dump')
117+
vafter = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_FS][0]
118+
self.assertGreater(vafter["counters"]["mirroring_peers"], vbefore["counters"]["mirroring_peers"])
119+
85120
def peer_remove(self, fs_name, fs_id, peer_spec):
121+
res = self.mirror_daemon_command(f'counter dump for fs: {fs_name}', 'counter', 'dump')
122+
vbefore = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_FS][0]
123+
86124
peer_uuid = self.get_peer_uuid(peer_spec)
87125
self.run_ceph_cmd("fs", "snapshot", "mirror", "peer_remove", fs_name, peer_uuid)
88126
time.sleep(10)
@@ -91,6 +129,11 @@ def peer_remove(self, fs_name, fs_id, peer_spec):
91129
'fs', 'mirror', 'status', f'{fs_name}@{fs_id}')
92130
self.assertTrue(res['peers'] == {} and res['snap_dirs']['dir_count'] == 0)
93131

132+
res = self.mirror_daemon_command(f'counter dump for fs: {fs_name}', 'counter', 'dump')
133+
vafter = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_FS][0]
134+
135+
self.assertLess(vafter["counters"]["mirroring_peers"], vbefore["counters"]["mirroring_peers"])
136+
94137
def bootstrap_peer(self, fs_name, client_name, site_name):
95138
outj = json.loads(self.get_ceph_cmd_stdout(
96139
"fs", "snapshot", "mirror", "peer_bootstrap", "create", fs_name,
@@ -101,7 +144,11 @@ def import_peer(self, fs_name, token):
101144
self.run_ceph_cmd("fs", "snapshot", "mirror", "peer_bootstrap",
102145
"import", fs_name, token)
103146

104-
def add_directory(self, fs_name, fs_id, dir_name):
147+
def add_directory(self, fs_name, fs_id, dir_name, check_perf_counter=True):
148+
if check_perf_counter:
149+
res = self.mirror_daemon_command(f'counter dump for fs: {fs_name}', 'counter', 'dump')
150+
vbefore = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_FS][0]
151+
105152
# get initial dir count
106153
res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}',
107154
'fs', 'mirror', 'status', f'{fs_name}@{fs_id}')
@@ -118,7 +165,14 @@ def add_directory(self, fs_name, fs_id, dir_name):
118165
log.debug(f'new dir_count={new_dir_count}')
119166
self.assertTrue(new_dir_count > dir_count)
120167

168+
if check_perf_counter:
169+
res = self.mirror_daemon_command(f'counter dump for fs: {fs_name}', 'counter', 'dump')
170+
vafter = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_FS][0]
171+
self.assertGreater(vafter["counters"]["directory_count"], vbefore["counters"]["directory_count"])
172+
121173
def remove_directory(self, fs_name, fs_id, dir_name):
174+
res = self.mirror_daemon_command(f'counter dump for fs: {fs_name}', 'counter', 'dump')
175+
vbefore = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_FS][0]
122176
# get initial dir count
123177
res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}',
124178
'fs', 'mirror', 'status', f'{fs_name}@{fs_id}')
@@ -135,6 +189,11 @@ def remove_directory(self, fs_name, fs_id, dir_name):
135189
log.debug(f'new dir_count={new_dir_count}')
136190
self.assertTrue(new_dir_count < dir_count)
137191

192+
res = self.mirror_daemon_command(f'counter dump for fs: {fs_name}', 'counter', 'dump')
193+
vafter = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_FS][0]
194+
195+
self.assertLess(vafter["counters"]["directory_count"], vbefore["counters"]["directory_count"])
196+
138197
def check_peer_status(self, fs_name, fs_id, peer_spec, dir_name, expected_snap_name,
139198
expected_snap_count):
140199
peer_uuid = self.get_peer_uuid(peer_spec)
@@ -268,7 +327,7 @@ def test_matching_peer(self):
268327
self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
269328

270329
try:
271-
self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph")
330+
self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", check_perf_counter=False)
272331
except CommandFailedError as ce:
273332
if ce.exitstatus != errno.EINVAL:
274333
raise RuntimeError('invalid errno when adding a matching remote peer')
@@ -282,7 +341,7 @@ def test_matching_peer(self):
282341

283342
# and explicitly specifying the spec (via filesystem name) should fail too
284343
try:
285-
self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.primary_fs_name)
344+
self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.primary_fs_name, check_perf_counter=False)
286345
except CommandFailedError as ce:
287346
if ce.exitstatus != errno.EINVAL:
288347
raise RuntimeError('invalid errno when adding a matching remote peer')
@@ -303,7 +362,7 @@ def test_mirror_peer_add_existing(self):
303362
self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
304363

305364
# adding the same peer should be idempotent
306-
self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
365+
self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name, check_perf_counter=False)
307366

308367
# remove peer
309368
self.peer_remove(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph")
@@ -313,7 +372,7 @@ def test_mirror_peer_add_existing(self):
313372
def test_peer_commands_with_mirroring_disabled(self):
314373
# try adding peer when mirroring is not enabled
315374
try:
316-
self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
375+
self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name, check_perf_counter=False)
317376
except CommandFailedError as ce:
318377
if ce.exitstatus != errno.EINVAL:
319378
raise RuntimeError(-errno.EINVAL, 'incorrect error code when adding a peer')
@@ -332,7 +391,7 @@ def test_peer_commands_with_mirroring_disabled(self):
332391
def test_add_directory_with_mirroring_disabled(self):
333392
# try adding a directory when mirroring is not enabled
334393
try:
335-
self.add_directory(self.primary_fs_name, self.primary_fs_id, "/d1")
394+
self.add_directory(self.primary_fs_name, self.primary_fs_id, "/d1", check_perf_counter=False)
336395
except CommandFailedError as ce:
337396
if ce.exitstatus != errno.EINVAL:
338397
raise RuntimeError(-errno.EINVAL, 'incorrect error code when adding a directory')
@@ -344,7 +403,7 @@ def test_directory_commands(self):
344403
self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
345404
self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1')
346405
try:
347-
self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1')
406+
self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1', check_perf_counter=False)
348407
except CommandFailedError as ce:
349408
if ce.exitstatus != errno.EEXIST:
350409
raise RuntimeError(-errno.EINVAL, 'incorrect error code when re-adding a directory')
@@ -364,7 +423,7 @@ def test_directory_commands(self):
364423
def test_add_relative_directory_path(self):
365424
self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
366425
try:
367-
self.add_directory(self.primary_fs_name, self.primary_fs_id, './d1')
426+
self.add_directory(self.primary_fs_name, self.primary_fs_id, './d1', check_perf_counter=False)
368427
except CommandFailedError as ce:
369428
if ce.exitstatus != errno.EINVAL:
370429
raise RuntimeError(-errno.EINVAL, 'incorrect error code when adding a relative path dir')
@@ -378,7 +437,7 @@ def test_add_directory_path_normalization(self):
378437
self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1/d2/d3')
379438
def check_add_command_failure(dir_path):
380439
try:
381-
self.add_directory(self.primary_fs_name, self.primary_fs_id, dir_path)
440+
self.add_directory(self.primary_fs_name, self.primary_fs_id, dir_path, check_perf_counter=False)
382441
except CommandFailedError as ce:
383442
if ce.exitstatus != errno.EEXIST:
384443
raise RuntimeError(-errno.EINVAL, 'incorrect error code when re-adding a directory')
@@ -402,7 +461,7 @@ def test_add_ancestor_and_child_directory(self):
402461
self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1/d2/')
403462
def check_add_command_failure(dir_path):
404463
try:
405-
self.add_directory(self.primary_fs_name, self.primary_fs_id, dir_path)
464+
self.add_directory(self.primary_fs_name, self.primary_fs_id, dir_path, check_perf_counter=False)
406465
except CommandFailedError as ce:
407466
if ce.exitstatus != errno.EINVAL:
408467
raise RuntimeError(-errno.EINVAL, 'incorrect error code when adding a directory')
@@ -1158,7 +1217,7 @@ def test_cephfs_mirror_peer_add_primary(self):
11581217
# try adding the primary file system as a peer to secondary file
11591218
# system
11601219
try:
1161-
self.peer_add(self.secondary_fs_name, self.secondary_fs_id, "client.mirror_remote@ceph", self.primary_fs_name)
1220+
self.peer_add(self.secondary_fs_name, self.secondary_fs_id, "client.mirror_remote@ceph", self.primary_fs_name, check_perf_counter=False)
11621221
except CommandFailedError as ce:
11631222
if ce.exitstatus != errno.EINVAL:
11641223
raise RuntimeError('invalid errno when adding a primary file system')

src/common/options/cephfs-mirror.yaml.in

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,4 +91,15 @@ options:
9191
default: 10
9292
services:
9393
- cephfs-mirror
94-
min: 0
94+
min: 0
95+
- name: cephfs_mirror_perf_stats_prio
96+
type: int
97+
level: advanced
98+
desc: Priority level for mirror daemon replication perf counters
99+
long_desc: The daemon will send perf counter data to the manager daemon if the priority
100+
is not lower than mgr_stats_threshold.
101+
default: 5
102+
services:
103+
- cephfs-mirror
104+
min: 0
105+
max: 11

src/mds/MDSRank.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,10 @@ class MDSRank {
254254
progress_thread.signal();
255255
}
256256

257+
uint64_t get_global_id() const {
258+
return monc->get_global_id();
259+
}
260+
257261
// Daemon lifetime functions: these guys break the abstraction
258262
// and call up into the parent MDSDaemon instance. It's kind
259263
// of unavoidable: if we want any depth into our calls

0 commit comments

Comments
 (0)