Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 197 additions & 7 deletions .ci/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
# Lists the Github workflows we want to track. Maps the Github job name to
# the metric name prefix in grafana.
# This metric name is also used as a key in the job->name map.
GITHUB_WORKFLOW_TO_TRACK = {"CI Checks": "github_llvm_premerge_checks"}
GITHUB_WORKFLOW_TO_TRACK = {
"CI Checks": "github_llvm_premerge_checks",
"Build and Test libc++": "github_libcxx_premerge_checks",
}

# Lists the Github jobs to track for a given workflow. The key is the stable
# name (metric name) of the workflow (see GITHUB_WORKFLOW_TO_TRACK).
Expand All @@ -38,7 +41,12 @@
"github_llvm_premerge_checks": {
"Build and Test Linux": "premerge_linux",
"Build and Test Windows": "premerge_windows",
}
},
"github_libcxx_premerge_checks": {
"stage1": "premerge_libcxx_stage1",
"stage2": "premerge_libcxx_stage2",
"stage3": "premerge_libcxx_stage3",
},
}

# The number of workflows to pull when sampling Github workflows.
Expand All @@ -62,13 +70,14 @@
# by trial and error).
GRAFANA_METRIC_MAX_AGE_MN = 120


@dataclass
class JobMetrics:
job_name: str
queue_time: int
run_time: int
status: int
created_at_ns: int
started_at_ns: int
completed_at_ns: int
workflow_id: int
workflow_name: str
Expand All @@ -81,6 +90,159 @@ class GaugeMetric:
time_ns: int


@dataclass
class AggregateMetric:
aggregate_name: str
aggregate_queue_time: int
aggregate_run_time: int
aggregate_status: int
completed_at_ns: int
workflow_id: int


def _construct_aggregate(ag_name: str, job_list: list[JobMetrics]) -> AggregateMetric:
"""Create a libc++ AggregateMetric from a list of libc++ JobMetrics

How aggregates are computed:
queue time: Time from when first job in group is created until last job in
group has started.
run time: Time from when first job in group starts running until last job
in group finishes running.
status: logical 'and' of all the job statuses in the group.

Args:
ag_name: The name for this particular AggregateMetric
job_list: This list of JobMetrics to be combined into the AggregateMetric.
The input list should contain all (and only!) the libc++ JobMetrics
for a particular stage and a particular workflow_id.

Returns:
Returns the AggregateMetric constructed from the inputs.
"""

# Initialize the aggregate values
earliest_create = job_list[0].created_at_ns
earliest_start = job_list[0].started_at_ns
earliest_complete = job_list[0].completed_at_ns
latest_start = job_list[0].started_at_ns
latest_complete = job_list[0].completed_at_ns
ag_status = job_list[0].status
ag_workflow_id = job_list[0].workflow_id

# Go through rest of jobs for this workflow id, if any, updating stats
for job in job_list[1:]:
# Update the status
ag_status = ag_status and job.status
# Get the earliest & latest times
if job.created_at_ns < earliest_create:
earliest_create = job.created_at_ns
if job.completed_at_ns < earliest_complete:
earliest_complete = job.completed_at_ns
if job.started_at_ns > latest_start:
latest_start = job.started_at_ns
if job.started_at_ns < earliest_start:
earliest_start = job.started_at_ns
if job.completed_at_ns > latest_complete:
latest_complete = job.completed_at_ns

# Compute aggregate run time (in seconds, not ns)
ag_run_time = (latest_complete - earliest_start) / 1000000000
# Compute aggregate queue time (in seconds, not ns)
ag_queue_time = (latest_start - earliest_create) / 1000000000
# Append the aggregate metrics to the workflow metrics list.
return AggregateMetric(
ag_name, ag_queue_time, ag_run_time, ag_status, latest_complete, ag_workflow_id
)


def create_and_append_libcxx_aggregates(workflow_metrics: list[JobMetrics]):
"""Find libc++ JobMetric entries and create aggregate metrics for them.

Sort the libc++ JobMetric entries by workflow id, and for each workflow
id group them by stages. Call _construct_aggregate to reate an aggregate
metric for each stage for each unique workflow id. Append each aggregate
metric to the input workflow_metrics list.

Args:
workflow_metrics: A list of JobMetrics entries collected so far.
"""
# Separate the jobs by workflow_id. Only look at JobMetrics entries.
aggregate_data = dict()
for job in workflow_metrics:
# Only want to look at JobMetrics
if not isinstance(job, JobMetrics):
continue
# Only want libc++ jobs.
if job.workflow_name != "Build and Test libc++":
continue
if job.workflow_id not in aggregate_data.keys():
aggregate_data[job.workflow_id] = [job]
else:
aggregate_data[job.workflow_id].append(job)

# Go through each aggregate_data list (workflow id) and find all the
# needed data
for ag_workflow_id in aggregate_data:
job_list = aggregate_data[ag_workflow_id]
stage1_jobs = list()
stage2_jobs = list()
stage3_jobs = list()
# sort jobs into stage1, stage2, & stage3.
for job in job_list:
if job.job_name.find("stage1") > 0:
stage1_jobs.append(job)
elif job.job_name.find("stage2") > 0:
stage2_jobs.append(job)
elif job.job_name.find("stage3") > 0:
stage3_jobs.append(job)

if len(stage1_jobs) > 0:
aggregate = _construct_aggregate(
"github_libcxx_premerge_checks_stage1_aggregate", stage1_jobs
)
workflow_metrics.append(aggregate)
if len(stage2_jobs) > 0:
aggregate = _construct_aggregate(
"github_libcxx_premerge_checks_stage2_aggregate", stage2_jobs
)
workflow_metrics.append(aggregate)
if len(stage3_jobs) > 0:
aggregate = _construct_aggregate(
"github_libcxx_premerge_checks_stage3_aggregate", stage3_jobs
)
workflow_metrics.append(aggregate)


def clean_up_libcxx_job_name(old_name: str) -> str:
"""Convert libcxx job names to generically legal strings.

Args:
old_name: A string with the full name of the libc++ test that was run.

Returns:
Returns the input string with characters that might not be acceptable
in some indentifier strings replaced with safer characters.

Take a name like 'stage1 (generic-cxx03, clang-22, clang++-22)'
and convert it to 'stage1_generic_cxx03__clang_22__clangxx_22'.
(Remove parentheses; replace commas, hyphens and spaces with
underscores; replace '+' with 'x'.)
"""
# Names should have exactly one set of parentheses, so break on that. If
# they don't have any parentheses, then don't update them at all.
if old_name.find("(") == -1:
return old_name
stage, remainder = old_name.split("(")
stage = stage.strip()
if remainder[-1] == ")":
remainder = remainder[:-1]
remainder = remainder.replace("-", "_")
remainder = remainder.replace(",", "_")
remainder = remainder.replace(" ", "_")
remainder = remainder.replace("+", "x")
new_name = stage + "_" + remainder
return new_name

def github_get_metrics(
github_repo: github.Repository, last_workflows_seen_as_completed: set[int]
) -> tuple[list[JobMetrics], int]:
Expand Down Expand Up @@ -146,6 +308,10 @@ def github_get_metrics(
if task.name not in GITHUB_WORKFLOW_TO_TRACK:
continue

libcxx_testing = False
if task.name == "Build and Test libc++":
libcxx_testing = True

if task.status == "completed":
workflow_seen_as_completed.add(task.id)

Expand All @@ -155,11 +321,19 @@ def github_get_metrics(

name_prefix = GITHUB_WORKFLOW_TO_TRACK[task.name]
for job in task.jobs():
if libcxx_testing:
# We're not running macos or windows libc++ tests on our
# infrastructure.
if job.name.find("macos") != -1 or job.name.find("windows") != -1:
continue
# This job is not interesting to us.
if job.name not in GITHUB_JOB_TO_TRACK[name_prefix]:
elif job.name not in GITHUB_JOB_TO_TRACK[name_prefix]:
continue

name_suffix = GITHUB_JOB_TO_TRACK[name_prefix][job.name]
if libcxx_testing:
name_suffix = clean_up_libcxx_job_name(job.name)
else:
name_suffix = GITHUB_JOB_TO_TRACK[name_prefix][job.name]
metric_name = name_prefix + "_" + name_suffix

if task.status != "completed":
Expand Down Expand Up @@ -208,21 +382,32 @@ def github_get_metrics(
continue

logging.info(f"Adding a job metric for job {job.id} in workflow {task.id}")
# The timestamp associated with the event is expected by Grafana to be
# in nanoseconds.
# The completed_at_ns timestamp associated with the event is
# expected by Grafana to be in nanoseconds. Because we do math using
# all three times (when creating libc++ aggregates), we need them
# all to be in nanoseconds, even though created_at and started_at
# are not returned to Grafana.
created_at_ns = int(created_at.timestamp()) * 10**9
started_at_ns = int(started_at.timestamp()) * 10**9
completed_at_ns = int(completed_at.timestamp()) * 10**9
workflow_metrics.append(
JobMetrics(
metric_name,
queue_time.seconds,
run_time.seconds,
job_result,
created_at_ns,
started_at_ns,
completed_at_ns,
task.id,
task.name,
)
)

# Finished collecting the JobMetrics for all jobs; now create the
# aggregates for any libc++ jobs.
create_and_append_libcxx_aggregates(workflow_metrics)

for name, value in queued_count.items():
workflow_metrics.append(
GaugeMetric(f"workflow_queue_size_{name}", value, time.time_ns())
Expand Down Expand Up @@ -278,6 +463,11 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
metrics_batch.append(
f"{name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.completed_at_ns}"
)
elif isinstance(workflow_metric, AggregateMetric):
name = workflow_metric.aggregate_name.lower().replace(" ", "_")
metrics_batch.append(
f"{name} queue_time={workflow_metric.aggregate_queue_time},run_time={workflow_metric.aggregate_run_time},status={workflow_metric.aggregate_status} {workflow_metric.completed_at_ns}"
)
else:
raise ValueError(
f"Unsupported object type {type(workflow_metric)}: {str(workflow_metric)}"
Expand Down
Loading