Skip to content

[CI] Add queue size, running count metrics #122714

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 16, 2025
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 91 additions & 15 deletions .ci/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,67 @@ class JobMetrics:
workflow_id: int


def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, int]):
@dataclass
class GaugeMetric:
name: str
value: int
time_ns: int


def get_sampled_workflow_metrics(github_repo: github.Repository):
"""Gets global statistics about the Github workflow queue

Args:
github_repo: A github repo object to use to query the relevant information.

Returns:
Returns a list of GaugeMetric objects, containing the relevant metrics about
the workflow
"""

# Other states are available (pending, waiting, etc), but the meaning
# is not documented (See #70540).
# "queued" seems to be the info we want.
queued_workflow_count = len(
[
x
for x in github_repo.get_workflow_runs(status="queued")
if x.name in WORKFLOWS_TO_TRACK
]
)
running_workflow_count = len(
[
x
for x in github_repo.get_workflow_runs(status="in_progress")
if x.name in WORKFLOWS_TO_TRACK
]
)

workflow_metrics = []
workflow_metrics.append(
GaugeMetric(
"workflow_queue_size",
queued_workflow_count,
time.time_ns(),
)
)
workflow_metrics.append(
GaugeMetric(
"running_workflow_count",
running_workflow_count,
time.time_ns(),
)
)
# Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
workflow_metrics.append(
GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
)
return workflow_metrics


def get_per_workflow_metrics(
github_repo: github.Repository, workflows_to_track: dict[str, int]
):
"""Gets the metrics for specified Github workflows.

This function takes in a list of workflows to track, and optionally the
Expand All @@ -43,14 +103,14 @@ def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, in
Returns a list of JobMetrics objects, containing the relevant metrics about
the workflow.
"""
workflow_runs = iter(github_repo.get_workflow_runs())

workflow_metrics = []

workflows_to_include = set(workflows_to_track.keys())

while len(workflows_to_include) > 0:
workflow_run = next(workflow_runs)
for workflow_run in iter(github_repo.get_workflow_runs()):
if len(workflows_to_include) == 0:
break

if workflow_run.status != "completed":
continue

Expand Down Expand Up @@ -139,12 +199,25 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
metrics_userid: The userid to use for the upload.
api_key: The API key to use for the upload.
"""

if len(workflow_metrics) == 0:
print("No metrics found to upload.", file=sys.stdout)
return

metrics_batch = []
for workflow_metric in workflow_metrics:
workflow_formatted_name = workflow_metric.job_name.lower().replace(" ", "_")
metrics_batch.append(
f"{workflow_formatted_name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}"
)
if isinstance(workflow_metric, GaugeMetric):
name = workflow_metric.name.lower().replace(" ", "_")
metrics_batch.append(
f"{name} value={workflow_metric.value} {workflow_metric.time_ns}"
)
elif isinstance(workflow_metric, JobMetrics):
name = workflow_metric.job_name.lower().replace(" ", "_")
metrics_batch.append(
f"{name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}"
)
else:
raise ValueError(f"Unsupported object type {type(workflow_metric)}: {str(workflow_metric)}")

request_data = "\n".join(metrics_batch)
response = requests.post(
Expand Down Expand Up @@ -176,16 +249,19 @@ def main():
# Enter the main loop. Every five minutes we wake up and dump metrics for
# the relevant jobs.
while True:
current_metrics = get_metrics(github_repo, workflows_to_track)
if len(current_metrics) == 0:
print("No metrics found to upload.", file=sys.stderr)
continue
current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track)
current_metrics += get_sampled_workflow_metrics(github_repo)
# Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
current_metrics.append(GaugeMetric("metrics_container_heartbeat", 1, time.time_ns()))

upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key)
print(f"Uploaded {len(current_metrics)} metrics", file=sys.stderr)
print(f"Uploaded {len(current_metrics)} metrics", file=sys.stdout)

for workflow_metric in reversed(current_metrics):
workflows_to_track[workflow_metric.job_name] = workflow_metric.workflow_id
if isinstance(workflow_metric, JobMetrics):
workflows_to_track[
workflow_metric.job_name
] = workflow_metric.workflow_id

time.sleep(SCRAPE_INTERVAL_SECONDS)

Expand Down
Loading