1616import datetime
1717import json
1818import logging
19- import statistics
2019import time
2120
2221from absl import flags
2322from perfkitbenchmarker import errors
24- from perfkitbenchmarker import log_util
2523from perfkitbenchmarker import mysql_iaas_relational_db
2624from perfkitbenchmarker import postgres_iaas_relational_db
2725from perfkitbenchmarker import provider_info
@@ -105,6 +103,7 @@ class BaseAwsRelationalDb(relational_db.BaseRelationalDb):
105103 """
106104
107105 REQUIRED_ATTRS = ['CLOUD' , 'IS_MANAGED' , 'ENGINE' ]
106+ METRICS_COLLECTION_DELAY_SECONDS = 165
108107
109108 def __init__ (self , relational_db_spec ):
110109 super ().__init__ (relational_db_spec )
@@ -493,19 +492,31 @@ def _InstanceExists(self, instance_id) -> bool:
493492 return False
494493 return True
495494
496- # Consider decoupling from BaseAwsRelationalDb (more generic version would
497- # take namespace, metric, region, etc).
498- def _CollectCloudWatchMetrics (
495+ def _GetMetricsToCollect (self ) -> list [relational_db .MetricSpec ]:
496+ """Returns a list of metrics to collect."""
497+ # pyformat: disable
498+ return [
499+ relational_db .MetricSpec ('CPUUtilization' , 'cpu_utilization' , '%' , None ),
500+ relational_db .MetricSpec ('ReadIOPS' , 'disk_read_iops' , 'iops' , None ),
501+ relational_db .MetricSpec ('WriteIOPS' , 'disk_write_iops' , 'iops' , None ),
502+ relational_db .MetricSpec ('ReadThroughput' , 'disk_read_throughput' , 'MB/s' , lambda x : x / (1024 * 1024 )),
503+ relational_db .MetricSpec ('WriteThroughput' , 'disk_write_throughput' , 'MB/s' , lambda x : x / (1024 * 1024 )),
504+ relational_db .MetricSpec ('FreeStorageSpace' , 'disk_bytes_used' , 'GB' , lambda x : x / (1024 * 1024 * 1024 )),
505+ ]
506+ # pyformat: enable
507+
508+ def _CollectProviderMetric (
499509 self ,
500- metric_name : str ,
501- metric_sample_name : str ,
502- unit : str ,
510+ metric : relational_db .MetricSpec ,
503511 start_time : datetime .datetime ,
504512 end_time : datetime .datetime ,
513+ collect_percentiles : bool = False ,
505514 ) -> list [sample .Sample ]:
506515 """Collects metrics from AWS CloudWatch."""
507516 logging .info (
508- 'Collecting metric %s for instance %s' , metric_name , self .instance_id
517+ 'Collecting metric %s for instance %s' ,
518+ metric .provider_name ,
519+ self .instance_id ,
509520 )
510521 start_time_str = start_time .astimezone (datetime .timezone .utc ).strftime (
511522 relational_db .METRICS_TIME_FORMAT
@@ -519,7 +530,7 @@ def _CollectCloudWatchMetrics(
519530 '--namespace' ,
520531 'AWS/RDS' ,
521532 '--metric-name' ,
522- metric_name ,
533+ metric . provider_name ,
523534 '--start-time' ,
524535 start_time_str ,
525536 '--end-time' ,
@@ -538,101 +549,27 @@ def _CollectCloudWatchMetrics(
538549 except errors .VmUtil .IssueCommandError as e :
539550 logging .warning (
540551 'Could not collect metric %s for instance %s: %s' ,
541- metric_name ,
552+ metric . provider_name ,
542553 self .instance_id ,
543554 e ,
544555 )
545556 return []
546557 response = json .loads (stdout )
547558 datapoints = response ['Datapoints' ]
548559 if not datapoints :
549- logging .warning ('No datapoints for metric %s' , metric_name )
560+ logging .warning ('No datapoints for metric %s' , metric . provider_name )
550561 return []
551562
552563 points = []
553564 for dp in datapoints :
554565 value = dp ['Average' ]
555- if unit == 'MB/s' :
556- value /= 1024 * 1024
557- points .append ((datetime .datetime .fromisoformat (dp ['Timestamp' ]), value ))
558- if not points :
559- logging .warning ('No values found for metric %s' , metric_name )
560- return []
561- points .sort (key = lambda x : x [0 ])
562- timestamps = [p [0 ] for p in points ]
563- values = [p [1 ] for p in points ]
564- avg_val = statistics .mean (values )
565- min_val = min (values )
566- max_val = max (values )
567- samples = []
568- samples .append (
569- sample .Sample (
570- f'{ metric_sample_name } _average' , avg_val , unit , metadata = {}
571- )
572- )
573- samples .append (
574- sample .Sample (f'{ metric_sample_name } _min' , min_val , unit , metadata = {})
575- )
576- samples .append (
577- sample .Sample (f'{ metric_sample_name } _max' , max_val , unit , metadata = {})
578- )
579- samples .append (
580- sample .CreateTimeSeriesSample (
581- values ,
582- [t .timestamp () for t in timestamps ],
583- f'{ metric_sample_name } _time_series' ,
584- unit ,
585- 60 ,
586- )
587- )
588- log_util .LogToShortLogAndRoot (
589- f'{ metric_sample_name } : average={ avg_val :.2f} , min={ min (values ):.2f} ,'
590- f' max={ max (values ):.2f} , count={ len (values )} '
591- )
592- human_readable_ts = [f'{ t } : { v :.2f} { unit } ' for t , v in reversed (points )]
593- log_util .LogToShortLogAndRoot (
594- f'{ metric_sample_name } _time_series:\n { '\n ' .join (human_readable_ts )} '
595- )
596- return samples
566+ if metric .conversion_func :
567+ value = metric .conversion_func (value )
568+ points .append ((datetime .datetime .fromtimestamp (dp ['Timestamp' ]), value ))
597569
598- def CollectMetrics (
599- self , start_time : datetime .datetime , end_time : datetime .datetime
600- ) -> list [sample .Sample ]:
601- """Collects metrics during the run phase."""
602- logging .info (
603- 'Collecting metrics for time range: %s to %s' ,
604- start_time .strftime (relational_db .METRICS_TIME_FORMAT ),
605- end_time .strftime (relational_db .METRICS_TIME_FORMAT ),
606- )
607-
608- time_to_wait = (
609- end_time
610- + datetime .timedelta (
611- seconds = relational_db .METRICS_COLLECTION_DELAY_SECONDS
612- )
613- - datetime .datetime .now ()
570+ return self ._CreateSamples (
571+ points , metric .sample_name , metric .unit , collect_percentiles
614572 )
615- if time_to_wait .total_seconds () > 0 :
616- logging .info (
617- 'Waiting %s seconds for metrics to be available.' ,
618- int (time_to_wait .total_seconds ()),
619- )
620- time .sleep (time_to_wait .total_seconds ())
621- metrics_to_collect = [
622- ('CPUUtilization' , 'cpu_utilization' , '%' ),
623- ('ReadIOPS' , 'disk_read_iops' , 'iops' ),
624- ('WriteIOPS' , 'disk_write_iops' , 'iops' ),
625- ('ReadThroughput' , 'disk_read_throughput' , 'MB/s' ),
626- ('WriteThroughput' , 'disk_write_throughput' , 'MB/s' ),
627- ]
628- all_samples = []
629- for metric_name , metric_sample_name , unit in metrics_to_collect :
630- all_samples .extend (
631- self ._CollectCloudWatchMetrics (
632- metric_name , metric_sample_name , unit , start_time , end_time
633- )
634- )
635- return all_samples
636573
637574 def _Exists (self ):
638575 """Returns true if the underlying resource exists.
0 commit comments