diff --git a/.ci/metrics/metrics.py b/.ci/metrics/metrics.py index 8edc00bc6bd37..48d2aa2f330ec 100644 --- a/.ci/metrics/metrics.py +++ b/.ci/metrics/metrics.py @@ -26,7 +26,67 @@ class JobMetrics: workflow_id: int -def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, int]): +@dataclass +class GaugeMetric: + name: str + value: int + time_ns: int + + +def get_sampled_workflow_metrics(github_repo: github.Repository): + """Gets global statistics about the Github workflow queue + + Args: + github_repo: A github repo object to use to query the relevant information. + + Returns: + Returns a list of GaugeMetric objects, containing the relevant metrics about + the workflow + """ + + # Other states are available (pending, waiting, etc), but the meaning + # is not documented (See #70540). + # "queued" seems to be the info we want. + queued_workflow_count = len( + [ + x + for x in github_repo.get_workflow_runs(status="queued") + if x.name in WORKFLOWS_TO_TRACK + ] + ) + running_workflow_count = len( + [ + x + for x in github_repo.get_workflow_runs(status="in_progress") + if x.name in WORKFLOWS_TO_TRACK + ] + ) + + workflow_metrics = [] + workflow_metrics.append( + GaugeMetric( + "workflow_queue_size", + queued_workflow_count, + time.time_ns(), + ) + ) + workflow_metrics.append( + GaugeMetric( + "running_workflow_count", + running_workflow_count, + time.time_ns(), + ) + ) + # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana. + workflow_metrics.append( + GaugeMetric("metrics_container_heartbeat", 1, time.time_ns()) + ) + return workflow_metrics + + +def get_per_workflow_metrics( + github_repo: github.Repository, workflows_to_track: dict[str, int] +): """Gets the metrics for specified Github workflows. This function takes in a list of workflows to track, and optionally the @@ -43,14 +103,14 @@ def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, in Returns a list of JobMetrics objects, containing the relevant metrics about the workflow. """ - workflow_runs = iter(github_repo.get_workflow_runs()) - workflow_metrics = [] workflows_to_include = set(workflows_to_track.keys()) - while len(workflows_to_include) > 0: - workflow_run = next(workflow_runs) + for workflow_run in iter(github_repo.get_workflow_runs()): + if len(workflows_to_include) == 0: + break + if workflow_run.status != "completed": continue @@ -139,12 +199,27 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key): metrics_userid: The userid to use for the upload. api_key: The API key to use for the upload. """ + + if len(workflow_metrics) == 0: + print("No metrics found to upload.", file=sys.stderr) + return + metrics_batch = [] for workflow_metric in workflow_metrics: - workflow_formatted_name = workflow_metric.job_name.lower().replace(" ", "_") - metrics_batch.append( - f"{workflow_formatted_name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}" - ) + if isinstance(workflow_metric, GaugeMetric): + name = workflow_metric.name.lower().replace(" ", "_") + metrics_batch.append( + f"{name} value={workflow_metric.value} {workflow_metric.time_ns}" + ) + elif isinstance(workflow_metric, JobMetrics): + name = workflow_metric.job_name.lower().replace(" ", "_") + metrics_batch.append( + f"{name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}" + ) + else: + raise ValueError( + f"Unsupported object type {type(workflow_metric)}: {str(workflow_metric)}" + ) request_data = "\n".join(metrics_batch) response = requests.post( @@ -176,16 +251,21 @@ def main(): # Enter the main loop. Every five minutes we wake up and dump metrics for # the relevant jobs. while True: - current_metrics = get_metrics(github_repo, workflows_to_track) - if len(current_metrics) == 0: - print("No metrics found to upload.", file=sys.stderr) - continue + current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track) + current_metrics += get_sampled_workflow_metrics(github_repo) + # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana. + current_metrics.append( + GaugeMetric("metrics_container_heartbeat", 1, time.time_ns()) + ) upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key) print(f"Uploaded {len(current_metrics)} metrics", file=sys.stderr) for workflow_metric in reversed(current_metrics): - workflows_to_track[workflow_metric.job_name] = workflow_metric.workflow_id + if isinstance(workflow_metric, JobMetrics): + workflows_to_track[ + workflow_metric.job_name + ] = workflow_metric.workflow_id time.sleep(SCRAPE_INTERVAL_SECONDS)