Skip to content

Commit 849dddf

Browse files
raymond13513copybara-github
authored andcommitted
Refactor: Extract BaseDisruptionTrigger to a separate file.
PiperOrigin-RevId: 823116546
1 parent 7754250 commit 849dddf

File tree

4 files changed

+1812
-1721
lines changed

4 files changed

+1812
-1721
lines changed
Lines changed: 345 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,345 @@
1+
# Copyright 2022 PerfKitBenchmarker Authors. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Module containing base class for disruption triggers."""
15+
16+
import collections
17+
from collections.abc import Mapping, MutableSequence
18+
import copy
19+
import logging
20+
import statistics
21+
from typing import Any
22+
23+
from absl import flags
24+
from perfkitbenchmarker import benchmark_spec as bm_spec
25+
from perfkitbenchmarker import sample
26+
from perfkitbenchmarker import virtual_machine
27+
from perfkitbenchmarker.time_triggers import base_time_trigger
28+
29+
TIME_SERIES_SAMPLES_FOR_AGGREGATION = [
30+
sample.TPM_TIME_SERIES,
31+
sample.OPS_TIME_SERIES,
32+
sample.QPS_TIME_SERIES,
33+
]
34+
PERCENTILES = [0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]
35+
36+
DEGRADATION_PERCENT = flags.DEFINE_float(
37+
'maintenance_degradation_percent',
38+
95.0,
39+
(
40+
'Percentage to consider a given second is a degradation. This should'
41+
' set to (1 - max variance) of the benchmark. I.e if the benchmark have'
42+
' max variance of 5%, this should be set to 95.'
43+
),
44+
)
45+
46+
MAINTENANCE_DEGRADATION_WINDOW = flags.DEFINE_float(
47+
'maintenance_degradation_window',
48+
2,
49+
'Multiple of LM duration to consider the degradation after LM starts.',
50+
)
51+
52+
53+
class BaseDisruptionTrigger(base_time_trigger.BaseTimeTrigger):
54+
"""Class contains logic for triggering maintenance events."""
55+
56+
def __init__(self, delay: int):
57+
super().__init__(delay)
58+
self.disruption_ends = None
59+
60+
def TriggerMethod(self, vm: virtual_machine.VirtualMachine):
61+
"""Trigger the disruption."""
62+
raise NotImplementedError()
63+
64+
def SetUp(self):
65+
"""See base class."""
66+
raise NotImplementedError()
67+
68+
def WaitForDisruption(self) -> MutableSequence[Mapping[str, Any]]:
69+
"""Wait for disruption to end and return the end time."""
70+
return []
71+
72+
def AppendSamples(
73+
self,
74+
unused_sender,
75+
benchmark_spec: bm_spec.BenchmarkSpec,
76+
samples: MutableSequence[sample.Sample],
77+
):
78+
"""Append samples related to disruption."""
79+
80+
def generate_disruption_total_time_samples() -> (
81+
MutableSequence[sample.Sample]
82+
):
83+
events = self.WaitForDisruption()
84+
85+
# Host maintenance is in s
86+
self.disruption_ends = max(
87+
[float(d['Host_maintenance_end']) * 1000 for d in events],
88+
default=0,
89+
)
90+
91+
# Populate the run_number "LM Total Time" by copying the metadata from
92+
# (one of) the existing samples. Ideally pkb.DoRunPhase() would have sole
93+
# responsibility for populating run_number for all samples, but making
94+
# that change might be risky.
95+
sample_metadata = (
96+
copy.deepcopy(samples[0].metadata) if len(samples) > 0 else {}
97+
)
98+
99+
return [
100+
sample.Sample(
101+
'LM Total Time',
102+
d['LM_total_time'],
103+
'seconds',
104+
sample_metadata | d,
105+
)
106+
for d in events
107+
]
108+
109+
samples += generate_disruption_total_time_samples()
110+
self._AppendAggregatedMetrics(samples)
111+
112+
def _AppendAggregatedMetrics(self, samples: MutableSequence[sample.Sample]):
113+
"""Finds the time series samples and add generate the aggregated metrics."""
114+
additional_samples = []
115+
for s in samples:
116+
if s.metric in TIME_SERIES_SAMPLES_FOR_AGGREGATION:
117+
additional_samples += self._AggregateThroughputSample(s)
118+
samples += additional_samples
119+
120+
def _AggregateThroughputSample(
121+
self, s: sample.Sample
122+
) -> MutableSequence[sample.Sample]:
123+
"""Aggregate a time series sample into disruption metrics.
124+
125+
Split the samples and compute mean and median and calls relevant
126+
methods to generate an aggregated sample based on the time series sample.
127+
128+
Args:
129+
s: A time series sample create using CreateTimeSeriesSample in samples.py
130+
131+
Returns:
132+
A list of samples.
133+
"""
134+
metadata = copy.deepcopy(s.metadata)
135+
time_series = metadata['timestamps']
136+
values = metadata['values']
137+
interval = metadata['interval']
138+
139+
# Default ramp up starts and ramp down starts if the benchmark does not
140+
# provide it in the metadata.
141+
ramp_up_ends = time_series[0]
142+
ramp_down_starts = time_series[-1]
143+
disruption_ends = self.GetDisruptionEnds()
144+
if disruption_ends is None:
145+
disruption_ends = time_series[-1]
146+
if sample.RAMP_DOWN_STARTS in metadata:
147+
ramp_down_starts = metadata[sample.RAMP_DOWN_STARTS]
148+
if sample.RAMP_UP_ENDS in metadata:
149+
ramp_up_ends = metadata[sample.RAMP_UP_ENDS]
150+
151+
disruption_start = sample.ConvertDateTimeToUnixMs(self.trigger_time)
152+
153+
disruption_duration = disruption_ends - disruption_start
154+
155+
base_line_values = []
156+
values_after_disruption_starts = []
157+
values_after_disruption_ends = []
158+
total_missing_seconds = 0
159+
for i in range(len(values)):
160+
time = time_series[i]
161+
if time >= ramp_up_ends and time <= ramp_down_starts:
162+
interval_values = []
163+
# If more than 1 sequential value is missing from the time series.
164+
# Distrubute the ops throughout the time series
165+
if i > 0:
166+
time_gap_in_seconds = (time - time_series[i - 1]) / 1000
167+
missing_entry_count = int((time_gap_in_seconds / interval) - 1)
168+
if missing_entry_count > 1:
169+
total_missing_seconds += missing_entry_count * interval
170+
interval_values.extend(
171+
[int(values[i] / float(missing_entry_count + 1))]
172+
* (missing_entry_count + 1)
173+
)
174+
175+
else:
176+
interval_values.append(values[i])
177+
if time <= disruption_start:
178+
base_line_values.extend(interval_values)
179+
else:
180+
if (
181+
MAINTENANCE_DEGRADATION_WINDOW.value is not None
182+
and MAINTENANCE_DEGRADATION_WINDOW.value > 0
183+
):
184+
window = MAINTENANCE_DEGRADATION_WINDOW.value
185+
assert 1 <= window <= 10, (
186+
'maintenance_degradation_window must be between 1 and 10'
187+
f' inclusive, or None. Got: {window}'
188+
)
189+
if time <= disruption_start + disruption_duration * window:
190+
values_after_disruption_starts.extend(interval_values)
191+
else:
192+
values_after_disruption_starts.extend(interval_values)
193+
194+
if time > disruption_ends:
195+
values_after_disruption_ends.extend(interval_values)
196+
197+
median = statistics.median(base_line_values)
198+
mean = statistics.mean(base_line_values)
199+
200+
logging.info('Disruption Baseline median: %s', median)
201+
logging.info('Disruption Baseline mean: %s', mean)
202+
203+
# Keep the metadata from the original sample except time series metadata
204+
for field in sample.TIME_SERIES_METADATA:
205+
if field in metadata:
206+
del metadata[field]
207+
208+
samples = self._ComputeLossPercentile(
209+
mean, values_after_disruption_starts, metadata
210+
) + self._ComputeLossWork(
211+
median, values_after_disruption_starts, interval, metadata
212+
)
213+
if values_after_disruption_ends:
214+
mean_after_disruption_ends = statistics.mean(values_after_disruption_ends)
215+
samples += self._ComputeDegradation(
216+
mean, mean_after_disruption_ends, metadata
217+
)
218+
logging.info('Mean after disruption ends: %s', mean_after_disruption_ends)
219+
logging.info(
220+
'Number of samples after disruption ends: %s',
221+
len(values_after_disruption_ends),
222+
)
223+
224+
# Seconds that are missing i.e without throughput.
225+
samples.append(
226+
sample.Sample(
227+
'total_missing_seconds',
228+
total_missing_seconds,
229+
's',
230+
metadata=metadata,
231+
)
232+
)
233+
return samples
234+
235+
def GetDisruptionEnds(self) -> float | None:
236+
"""Get the disruption ends."""
237+
return self.disruption_ends
238+
239+
def _ComputeLossPercentile(
240+
self,
241+
mean: float,
242+
values_after_disruption: MutableSequence[float],
243+
metadata: Mapping[str, Any],
244+
) -> MutableSequence[sample.Sample]:
245+
"""Compute loss percentile metrics.
246+
247+
This method samples of seconds_dropped_below_x_percent from 0% to 90%
248+
in 10 percent increment. This is computed by a nested for loop and
249+
comparing if value dropped below a given percentile.
250+
251+
Args:
252+
mean: Mean of the baseline
253+
values_after_disruption: List of samples after disruption.
254+
metadata: Metadata for samples
255+
256+
Returns:
257+
Samples of loss percentile metrics.
258+
"""
259+
seconds_dropped_below_percentile = collections.defaultdict(int)
260+
for value in values_after_disruption:
261+
for p in PERCENTILES:
262+
if value <= mean * p:
263+
seconds_dropped_below_percentile[p] += 1
264+
265+
return [
266+
sample.Sample(
267+
f'seconds_dropped_below_{int(p * 100)}_percent',
268+
seconds_dropped_below_percentile[p],
269+
's',
270+
metadata=metadata,
271+
)
272+
for p in PERCENTILES
273+
]
274+
275+
def _ComputeLossWork(
276+
self,
277+
median: float,
278+
values_after_disruption: MutableSequence[float],
279+
interval: float,
280+
metadata: Mapping[str, Any],
281+
):
282+
"""Compute the loss work metrics for disruption.
283+
284+
This method returns two metrics.
285+
1. The totl loss seconds. This is defined as the loss time across the LM.
286+
It is the sum of (Median - value) / median * interval
287+
2. Unresponsive metrics. This is the cube of the lost time to degrade
288+
the impact of LM in the higher percentile.
289+
It is the sum of ((Median - value) / median) ** 3 * interval
290+
291+
Args:
292+
median: median of the baseline
293+
values_after_disruption: List of samples after disruption
294+
interval: Interval of the metrics.
295+
metadata: Metadata for samples
296+
297+
Returns:
298+
List of samples.
299+
"""
300+
total_loss_seconds = 0
301+
unresponsive_metric = 0
302+
for value in values_after_disruption:
303+
if value < median * DEGRADATION_PERCENT.value / 100.0:
304+
total_loss_seconds += (median - value) / median * interval
305+
unresponsive_metric += (((median - value) / median) ** (3.0)) * interval
306+
307+
samples = []
308+
samples.append(
309+
sample.Sample(
310+
'unresponsive_metric',
311+
round(unresponsive_metric, 4),
312+
'metric',
313+
metadata=metadata,
314+
)
315+
)
316+
samples.append(
317+
sample.Sample(
318+
'total_loss_seconds',
319+
round(total_loss_seconds, 4),
320+
'seconds',
321+
metadata=metadata,
322+
)
323+
)
324+
return samples
325+
326+
def _ComputeDegradation(
327+
self, baseline_mean: float, mean: float, metadata: Mapping[str, Any]
328+
) -> MutableSequence[sample.Sample]:
329+
"""Compute the degradation after LM ends to baseline."""
330+
return [
331+
sample.Sample(
332+
'degradation_percent',
333+
round((baseline_mean - mean) / baseline_mean * 100, 4),
334+
'%',
335+
metadata=metadata,
336+
)
337+
]
338+
339+
@property
340+
def trigger_name(self) -> str:
341+
raise NotImplementedError()
342+
343+
344+
def Register(unused_parsed_flags): # pylint: disable=invalid-name
345+
pass

0 commit comments

Comments
 (0)