Skip to content

Commit 164ef13

Browse files
bvliucopybara-github
authored andcommitted
Enable CloudSQL system utilization metrics.
PiperOrigin-RevId: 838582682
1 parent 137fbe4 commit 164ef13

File tree

3 files changed

+208
-1
lines changed

3 files changed

+208
-1
lines changed

perfkitbenchmarker/providers/aws/aws_relational_db.py

Lines changed: 153 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@
1616
import datetime
1717
import json
1818
import logging
19+
import statistics
1920
import time
2021

2122
from absl import flags
2223
from perfkitbenchmarker import errors
24+
from perfkitbenchmarker import log_util
2325
from perfkitbenchmarker import mysql_iaas_relational_db
2426
from perfkitbenchmarker import postgres_iaas_relational_db
2527
from perfkitbenchmarker import provider_info
2628
from perfkitbenchmarker import relational_db
29+
from perfkitbenchmarker import sample
2730
from perfkitbenchmarker import sqlserver_iaas_relational_db
2831
from perfkitbenchmarker import vm_util
2932
from perfkitbenchmarker.providers.aws import aws_network
@@ -49,6 +52,13 @@
4952
]
5053

5154

55+
def _ConvertDateTimeToUtc(dt):
56+
"""Converts a datetime to UTC. If naive, assumes local time."""
57+
if dt.tzinfo:
58+
return dt.astimezone(datetime.timezone.utc)
59+
return dt.replace(tzinfo=datetime.timezone.utc)
60+
61+
5262
class AWSSQLServerIAASRelationalDb(
5363
sqlserver_iaas_relational_db.SQLServerIAASRelationalDb
5464
):
@@ -58,7 +68,8 @@ class AWSSQLServerIAASRelationalDb(
5868

5969
def CreateIpReservation(self) -> str:
6070
cluster_ip_address = '.'.join(
61-
self.server_vm.internal_ip.split('.')[:-1]+['128'])
71+
self.server_vm.internal_ip.split('.')[:-1] + ['128']
72+
)
6273
return cluster_ip_address
6374

6475
def ReleaseIpReservation(self) -> bool:
@@ -481,6 +492,147 @@ def _InstanceExists(self, instance_id) -> bool:
481492
return False
482493
return True
483494

495+
# Consider decoupling from BaseAwsRelationalDb (more generic version would
496+
# take namespace, metric, region, etc).
497+
def _CollectCloudWatchMetrics(
498+
self,
499+
metric_name: str,
500+
metric_sample_name: str,
501+
unit: str,
502+
start_time: datetime.datetime,
503+
end_time: datetime.datetime,
504+
) -> list[sample.Sample]:
505+
"""Collects metrics from AWS CloudWatch."""
506+
logging.info(
507+
'Collecting metric %s for instance %s', metric_name, self.instance_id
508+
)
509+
start_time_str = start_time.astimezone(datetime.timezone.utc).strftime(
510+
relational_db.METRICS_TIME_FORMAT
511+
)
512+
end_time_str = end_time.astimezone(datetime.timezone.utc).strftime(
513+
relational_db.METRICS_TIME_FORMAT
514+
)
515+
cmd = util.AWS_PREFIX + [
516+
'cloudwatch',
517+
'get-metric-statistics',
518+
'--namespace',
519+
'AWS/RDS',
520+
'--metric-name',
521+
metric_name,
522+
'--start-time',
523+
start_time_str,
524+
'--end-time',
525+
end_time_str,
526+
'--period',
527+
'60',
528+
'--statistics',
529+
'Average', # RDS metrics are at 1 minute granularity
530+
'--dimensions',
531+
f'Name=DBInstanceIdentifier,Value={self.instance_id}',
532+
'--region',
533+
self.region,
534+
]
535+
try:
536+
stdout, _ = util.IssueRetryableCommand(cmd)
537+
except errors.VmUtil.IssueCommandError as e:
538+
logging.warning(
539+
'Could not collect metric %s for instance %s: %s',
540+
metric_name,
541+
self.instance_id,
542+
e,
543+
)
544+
return []
545+
response = json.loads(stdout)
546+
datapoints = response['Datapoints']
547+
if not datapoints:
548+
logging.warning('No datapoints for metric %s', metric_name)
549+
return []
550+
551+
points = []
552+
for dp in datapoints:
553+
value = dp['Average']
554+
if unit == 'MB/s':
555+
value /= 1024 * 1024
556+
points.append((datetime.datetime.fromisoformat(dp['Timestamp']), value))
557+
if not points:
558+
logging.warning('No values found for metric %s', metric_name)
559+
return []
560+
points.sort(key=lambda x: x[0])
561+
timestamps = [p[0] for p in points]
562+
values = [p[1] for p in points]
563+
avg_val = statistics.mean(values)
564+
min_val = min(values)
565+
max_val = max(values)
566+
samples = []
567+
samples.append(
568+
sample.Sample(
569+
f'{metric_sample_name}_average', avg_val, unit, metadata={}
570+
)
571+
)
572+
samples.append(
573+
sample.Sample(f'{metric_sample_name}_min', min_val, unit, metadata={})
574+
)
575+
samples.append(
576+
sample.Sample(f'{metric_sample_name}_max', max_val, unit, metadata={})
577+
)
578+
samples.append(
579+
sample.CreateTimeSeriesSample(
580+
values,
581+
[t.timestamp() for t in timestamps],
582+
f'{metric_sample_name}_time_series',
583+
unit,
584+
60,
585+
)
586+
)
587+
log_util.LogToShortLogAndRoot(
588+
f'{metric_sample_name}: average={avg_val:.2f}, min={min(values):.2f},'
589+
f' max={max(values):.2f}, count={len(values)}'
590+
)
591+
human_readable_ts = [f'{t}: {v:.2f} {unit}' for t, v in reversed(points)]
592+
log_util.LogToShortLogAndRoot(
593+
f'{metric_sample_name}_time_series:\n{'\n'.join(human_readable_ts)}'
594+
)
595+
return samples
596+
597+
def CollectMetrics(
598+
self, start_time: datetime.datetime, end_time: datetime.datetime
599+
) -> list[sample.Sample]:
600+
"""Collects metrics during the run phase."""
601+
logging.info(
602+
'Collecting metrics for time range: %s to %s',
603+
start_time.strftime(relational_db.METRICS_TIME_FORMAT),
604+
end_time.strftime(relational_db.METRICS_TIME_FORMAT),
605+
)
606+
607+
time_to_wait = (
608+
end_time
609+
+ datetime.timedelta(
610+
seconds=relational_db.METRICS_COLLECTION_DELAY_SECONDS
611+
)
612+
- datetime.datetime.now()
613+
)
614+
if time_to_wait.total_seconds() > 0:
615+
logging.info(
616+
'Waiting %s seconds for metrics to be available.',
617+
int(time_to_wait.total_seconds()),
618+
)
619+
time.sleep(time_to_wait.total_seconds())
620+
metrics_to_collect = [
621+
('CPUUtilization', 'cpu_utilization', '%'),
622+
('ReadIOPS', 'disk_read_iops', 'iops'),
623+
('WriteIOPS', 'disk_write_iops', 'iops'),
624+
('ReadThroughput', 'disk_read_throughput', 'MB/s'),
625+
('WriteThroughput', 'disk_write_throughput', 'MB/s'),
626+
]
627+
all_samples = []
628+
for metric_name, metric_sample_name, unit in metrics_to_collect:
629+
all_samples.extend(
630+
self._CollectCloudWatchMetrics(
631+
metric_name, metric_sample_name, unit, start_time, end_time
632+
)
633+
)
634+
return all_samples
635+
484636
def _Exists(self):
485637
"""Returns true if the underlying resource exists.
486638

perfkitbenchmarker/relational_db.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,8 @@
222222
ENABLE_DATA_CACHE = flags.DEFINE_bool(
223223
'gcp_db_enable_data_cache', False, 'Whether to enable data cache.'
224224
)
225+
METRICS_TIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
226+
METRICS_COLLECTION_DELAY_SECONDS = 165
225227

226228

227229
FLAGS = flags.FLAGS

tests/providers/aws/aws_relational_db_test.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import builtins
1717
import contextlib
18+
import datetime
1819
import json
1920
import os
2021
import textwrap
@@ -459,6 +460,58 @@ def testUpdateClusterClassNotChanged(self):
459460

460461
mock_issue_command.assert_not_called()
461462

463+
def testCollectMetrics(self):
464+
db = self.CreateDbFromSpec()
465+
db.instance_id = 'pkb-db-instance-123'
466+
db.region = 'us-west-2'
467+
468+
# Mock the response from AWS CloudWatch
469+
mock_response = {
470+
'Datapoints': [
471+
{
472+
'Timestamp': '2025-11-26T10:00:00Z',
473+
'Average': 10.0,
474+
},
475+
{
476+
'Timestamp': '2025-11-26T10:01:00Z',
477+
'Average': 20.0,
478+
},
479+
]
480+
}
481+
self.enter_context(
482+
mock.patch.object(
483+
aws_relational_db.util,
484+
'IssueRetryableCommand',
485+
return_value=(json.dumps(mock_response), ''),
486+
)
487+
)
488+
489+
start_time = datetime.datetime(2025, 11, 26, 10, 0, 0)
490+
end_time = datetime.datetime(2025, 11, 26, 10, 1, 0)
491+
samples = db.CollectMetrics(start_time, end_time)
492+
493+
# Check the number of samples returned (4 per metric * 5 metrics)
494+
self.assertLen(samples, 20)
495+
496+
# Spot check a few sample values
497+
sample_names = [s.metric for s in samples]
498+
self.assertIn('cpu_utilization_average', sample_names)
499+
self.assertIn('cpu_utilization_min', sample_names)
500+
self.assertIn('cpu_utilization_max', sample_names)
501+
self.assertIn('disk_read_iops_average', sample_names)
502+
503+
cpu_avg = next(s for s in samples if s.metric == 'cpu_utilization_average')
504+
self.assertEqual(cpu_avg.value, 15.0)
505+
self.assertEqual(cpu_avg.unit, '%')
506+
507+
cpu_min = next(s for s in samples if s.metric == 'cpu_utilization_min')
508+
self.assertEqual(cpu_min.value, 10.0)
509+
self.assertEqual(cpu_min.unit, '%')
510+
511+
cpu_max = next(s for s in samples if s.metric == 'cpu_utilization_max')
512+
self.assertEqual(cpu_max.value, 20.0)
513+
self.assertEqual(cpu_max.unit, '%')
514+
462515

463516
if __name__ == '__main__':
464517
unittest.main()

0 commit comments

Comments
 (0)