5858
5959NON_EXISTENT_KEY_STR = "N/A"
6060
61+ MDS_SUBVOLUME_QUERY_COUNTERS_MAP = OrderedDict ({'subv_read_iops' : 16 ,
62+ 'subv_write_iops' : 17 ,
63+ 'subv_read_throughput' : 18 ,
64+ 'subv_write_throughput' : 19 ,
65+ 'subv_avg_read_latency' : 20 ,
66+ 'subv_avg_write_latency' : 21 })
67+ MDS_SUBVOLUME_QUERY_COUNTERS = list (MDS_SUBVOLUME_QUERY_COUNTERS_MAP .keys ())
68+ SUBVOLUME_QUERY_IDS = "subvolume_query_ids"
69+ QUERY_SUBVOLUME_COUNTERS = "query_raw_counters"
70+
6171logger = logging .getLogger (__name__ )
6272
6373class FilterSpec (object ):
@@ -386,6 +396,24 @@ def get_raw_perf_counters(self, query):
386396 # send an asynchronous client metadata refresh
387397 self .update_client_meta ()
388398
399+ def get_raw_perf_counters_subvolumes (self , query ):
400+ self .log .debug ("get_raw_perf_counters_subvolumes={}" .format (query ))
401+ raw_subvolume_counters = query .setdefault ("subvolume_raw_counters" , {})
402+
403+ for query_id in query [SUBVOLUME_QUERY_IDS ]:
404+ result = self .module .get_mds_perf_counters (query_id )
405+ incoming_metrics = result ['metrics' ][1 ]
406+
407+ self .log .debug ("get_raw_perf_counters_subvolumes queryid{}" .format (query_id ))
408+
409+ for counter in incoming_metrics :
410+ self .log .debug ("get_raw_perf_counters_subvolumes counter{}" .format (counter ))
411+ try :
412+ subvolume_path = counter ['k' ][1 ][0 ] # [mds_rank, subvol_path]
413+ raw_subvolume_counters [subvolume_path ] = counter ['c' ]
414+ except (IndexError , KeyError ) as e :
415+ self .log .error (f"Failed to parse subvolume counter { counter } : { e } " )
416+
389417 def get_raw_perf_counters_global (self , query ):
390418 raw_perf_counters = query .setdefault (QUERY_RAW_COUNTERS_GLOBAL , {})
391419 result = self .module .get_mds_perf_counters (query [GLOBAL_QUERY_ID ])
@@ -404,6 +432,7 @@ def get_raw_perf_counters_global(self, query):
404432 def process_mds_reports (self ):
405433 for query in self .user_queries .values ():
406434 self .get_raw_perf_counters (query )
435+ self .get_raw_perf_counters_subvolumes (query )
407436 self .get_raw_perf_counters_global (query )
408437
409438 def scrub_expired_queries (self ):
@@ -412,7 +441,7 @@ def scrub_expired_queries(self):
412441 user_query = self .user_queries [filter_spec ]
413442 self .log .debug ("scrubbing query={}" .format (user_query ))
414443 if user_query [QUERY_LAST_REQUEST ] < expire_time :
415- expired_query_ids = user_query [QUERY_IDS ].copy ()
444+ expired_query_ids = user_query [QUERY_IDS ].copy () + user_query [ SUBVOLUME_QUERY_IDS ]. copy ()
416445 expired_query_ids .append (user_query [GLOBAL_QUERY_ID ])
417446 self .log .debug ("unregistering query={} ids={}" .format (user_query , expired_query_ids ))
418447 self .unregister_mds_perf_queries (filter_spec , expired_query_ids )
@@ -431,6 +460,19 @@ def prepare_mds_perf_query(self, rank, client_id, client_ip):
431460 'performance_counter_descriptors' : MDS_PERF_QUERY_COUNTERS ,
432461 }
433462
463+ def prepare_subvolume_perf_query (self , rank ):
464+ mds_rank_regex = MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS
465+ if rank != - 1 :
466+ mds_rank_regex = '^({})$' .format (rank )
467+
468+ return {
469+ 'key_descriptor' : [
470+ {'type' : 'mds_rank' , 'regex' : mds_rank_regex },
471+ {'type' : 'subvolume_path' , 'regex' : '^(.*)$' },
472+ ],
473+ 'performance_counter_descriptors' : MDS_SUBVOLUME_QUERY_COUNTERS ,
474+ }
475+
434476 def prepare_global_perf_query (self , client_id , client_ip ):
435477 client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS .format (client_id , client_ip )
436478 return {
@@ -468,6 +510,32 @@ def register_mds_perf_query(self, filter_spec):
468510 raise
469511 return query_ids
470512
513+ def register_subvolume_perf_query (self , filter_spec ):
514+ """
515+ Register subvolume perf queries for each MDS rank in the filter_spec.
516+ """
517+ mds_ranks = filter_spec .mds_ranks
518+
519+ query_ids = []
520+ try :
521+ # Register a perf query for each MDS rank
522+ for rank in mds_ranks :
523+ query = self .prepare_subvolume_perf_query (rank )
524+ self .log .info ("register_subvolume_perf_query: {}" .format (query ))
525+
526+ query_id = self .module .add_mds_perf_query (query )
527+ if query_id is None : # query ID can be 0
528+ raise RuntimeError ("failed to add subvolume perf query: {}" .format (query ))
529+ query_ids .append (query_id )
530+
531+ except Exception :
532+ # Roll back all successful registrations
533+ for query_id in query_ids :
534+ self .module .remove_mds_perf_query (query_id )
535+ raise
536+
537+ return query_ids
538+
471539 def register_global_perf_query (self , filter_spec ):
472540 client_id = filter_spec .client_id
473541 client_ip = filter_spec .client_ip
@@ -487,6 +555,7 @@ def register_query(self, filter_spec):
487555 user_query = {
488556 QUERY_IDS : self .register_mds_perf_query (filter_spec ),
489557 GLOBAL_QUERY_ID : self .register_global_perf_query (filter_spec ),
558+ SUBVOLUME_QUERY_IDS : self .register_subvolume_perf_query (filter_spec ),
490559 QUERY_LAST_REQUEST : datetime .now (),
491560 }
492561 self .user_queries [filter_spec ] = user_query
@@ -498,16 +567,19 @@ def register_query(self, filter_spec):
498567 return user_query
499568
500569 def generate_report (self , user_query ):
501- result = {} # type: Dict
570+ result = {} # type: Dict
502571 global fs_list
503- # start with counter info -- metrics that are global and per mds
504572 result ["version" ] = PERF_STATS_VERSION
505573 result ["global_counters" ] = MDS_GLOBAL_PERF_QUERY_COUNTERS
506574 result ["counters" ] = MDS_PERF_QUERY_COUNTERS
507575
508- # fill in client metadata
509576 raw_perfs_global = user_query .setdefault (QUERY_RAW_COUNTERS_GLOBAL , {})
510577 raw_perfs = user_query .setdefault (QUERY_RAW_COUNTERS , {})
578+ raw_subvolumes = user_query .setdefault ("subvolume_raw_counters" , {})
579+
580+ logger .debug (f"raw_perfs={ raw_perfs } , raw_subvolumes={ raw_subvolumes } , user_query={ user_query } " )
581+
582+ # -- Populate client metadata
511583 with self .meta_lock :
512584 raw_counters_clients = []
513585 for val in raw_perfs .values ():
@@ -521,7 +593,7 @@ def generate_report(self, user_query):
521593 client_meta = (result_meta .setdefault (fs_name , {})).setdefault (client_id , {})
522594 client_meta .update (meta [fs_name ][client_id ])
523595
524- # start populating global perf metrics w/ client metadata
596+ # -- Global perf metrics (by client)
525597 metrics = result .setdefault ("global_metrics" , {})
526598 for fs_name in fs_list :
527599 if fs_name in meta and len (meta [fs_name ]):
@@ -531,16 +603,27 @@ def generate_report(self, user_query):
531603 del global_client_metrics [:]
532604 global_client_metrics .extend (counters )
533605
534- # and, now per-mds metrics keyed by mds rank along with delayed ranks
535- metrics = result .setdefault ("metrics" , {})
606+ # -- Per-MDS metrics
607+ mds_metrics = result .setdefault ("metrics" , {})
608+ mds_metrics ["delayed_ranks" ] = []
609+ if raw_perfs :
610+ mds_metrics ["delayed_ranks" ] = [rank for rank , counters in raw_perfs .items () if counters [0 ]]
611+ for rank , counters in raw_perfs .items ():
612+ mds_key = f"mds.{ rank } "
613+ mds_metrics [mds_key ] = counters [1 ]
614+ else :
615+ logger .debug ("No per-MDS raw_perfs available; skipping MDS metrics population" )
616+
617+ # -- Subvolume metrics
618+ if isinstance (next (iter (raw_subvolumes .keys ()), None ), str ):
619+ # Flat subvolumes keyed directly by subvolume_path
620+ logger .debug ("Detected flat subvolume counters" )
621+ result ["subvolume_metrics" ] = raw_subvolumes
622+ result ["subvolume_metrics" ]["valid_metrics" ] = MDS_SUBVOLUME_QUERY_COUNTERS
536623
537- metrics ["delayed_ranks" ] = [rank for rank , counters in raw_perfs .items () if counters [0 ]]
538- for rank , counters in raw_perfs .items ():
539- mds_key = "mds.{}" .format (rank )
540- mds_metrics = metrics .setdefault (mds_key , {})
541- mds_metrics .update (counters [1 ])
542624 return result
543625
626+
544627 def extract_query_filters (self , cmd ):
545628 mds_rank_spec = cmd .get ('mds_rank' , None )
546629 client_id_spec = cmd .get ('client_id' , None )
0 commit comments