Skip to content

Commit 093eb52

Browse files
committed
[DEV-5541] Record inconsistency and performance metrics in precomputer
1 parent e1a95bc commit 093eb52

File tree

7 files changed

+150
-14
lines changed

7 files changed

+150
-14
lines changed

server/athenian/api/align/spec

Submodule spec updated from a75e077 to a87a0f1

server/athenian/api/controllers/backoffice_controller.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing import Optional
55

66
from aiohttp import web
7+
import aiohttp.web
78
import aiomcache
89
from asyncpg import IntegrityConstraintViolationError
910
import sentry_sdk
@@ -502,3 +503,13 @@ async def set_account_features(request: AthenianWebRequest, id: int, body: dict)
502503
),
503504
)
504505
return model_response(await get_account_features(id, request.sdb))
506+
507+
508+
async def get_account_health(
509+
request: AthenianWebRequest,
510+
id: Optional[int] = None,
511+
since: Optional[datetime] = None,
512+
until: Optional[datetime] = None,
513+
) -> aiohttp.web.Response:
514+
"""Return the account health metrics measured per hour."""
515+
raise NotImplementedError

server/athenian/api/controllers/metrics_controller.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -818,7 +818,7 @@ async def calculate_for_set_metrics(service, repos, for_sets):
818818

819819

820820
@expires_header(short_term_exptime)
821-
@weight(1)
821+
@weight(2)
822822
async def calc_metrics_code_checks(request: AthenianWebRequest, body: dict) -> web.Response:
823823
"""Calculate metrics on continuous integration runs, such as GitHub Actions, Jenkins, Circle, \
824824
etc."""

server/athenian/api/internal/features/entries.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2056,11 +2056,11 @@ async def _call_cached(
20562056
@staticmethod
20572057
def _set_count_metrics(facts: pd.DataFrame, metrics: MinePullRequestMetrics) -> None:
20582058
metrics.count = len(facts)
2059-
metrics.done_count = facts[PullRequestFacts.f.done].sum()
2060-
metrics.merged_count = (
2061-
facts[PullRequestFacts.f.merged].notnull() & ~facts[PullRequestFacts.f.done]
2062-
).sum()
2063-
metrics.open_count = facts[PullRequestFacts.f.closed].isnull().sum()
2059+
metrics.done_count = int(facts[PullRequestFacts.f.done].sum())
2060+
metrics.merged_count = int(
2061+
(facts[PullRequestFacts.f.merged].notnull() & ~facts[PullRequestFacts.f.done]).sum(),
2062+
)
2063+
metrics.open_count = int(facts[PullRequestFacts.f.closed].isnull().sum())
20642064

20652065

20662066
class ParticipantsMerge:

server/athenian/api/internal/miners/github/release_mine.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -897,13 +897,15 @@ async def main_flow():
897897

898898

899899
def _set_count_metrics(releases: pd.DataFrame, metrics: MineReleaseMetrics) -> None:
900-
metrics.count_by_tag = (releases[ReleaseFacts.f.matched_by].values == ReleaseMatch.tag).sum()
901-
metrics.count_by_branch = (
902-
releases[ReleaseFacts.f.matched_by].values == ReleaseMatch.branch
903-
).sum()
904-
metrics.count_by_event = (
905-
releases[ReleaseFacts.f.matched_by].values == ReleaseMatch.event
906-
).sum()
900+
metrics.count_by_tag = int(
901+
(releases[ReleaseFacts.f.matched_by].values == ReleaseMatch.tag).sum(),
902+
)
903+
metrics.count_by_branch = int(
904+
(releases[ReleaseFacts.f.matched_by].values == ReleaseMatch.branch).sum(),
905+
)
906+
metrics.count_by_event = int(
907+
(releases[ReleaseFacts.f.matched_by].values == ReleaseMatch.event).sum(),
908+
)
907909

908910

909911
def _empty_mined_releases_df():

server/athenian/api/precompute/__main__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
discover_accounts,
1919
notify_almost_expired_accounts,
2020
resolve_deployments,
21+
store_external_health,
2122
sync_labels,
2223
)
2324
from athenian.api.precompute.context import PrecomputeContext
@@ -29,6 +30,7 @@
2930
"notify-almost-expired-accounts": notify_almost_expired_accounts.main,
3031
"discover-accounts": discover_accounts.main,
3132
"accounts": accounts.main,
33+
"store-external-health": store_external_health.main,
3234
}
3335

3436

@@ -134,6 +136,14 @@ class Formatter(argparse.ArgumentDefaultsHelpFormatter, argparse.RawTextHelpForm
134136
accounts_parser.add_argument(
135137
"--timeout", type=int, default=20 * 60, help="Maximum processing time for one account",
136138
)
139+
140+
store_external_health_parser = subparsers.add_parser(
141+
"store-external-health",
142+
help="Persist external account data health metrics: metadata and performance",
143+
)
144+
store_external_health_parser.add_argument(
145+
"--prometheus", type=str, help="Prometheus API endpoint",
146+
)
137147
return parser.parse_args()
138148

139149

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import argparse
2+
from datetime import datetime, timezone
3+
import logging
4+
from typing import Any
5+
6+
import aiohttp
7+
from sqlalchemy import insert, select
8+
9+
from athenian.api import metadata
10+
from athenian.api.async_utils import gather
11+
from athenian.api.db import Database
12+
from athenian.api.models.persistentdata.models import HealthMetric
13+
from athenian.api.models.state.models import AccountGitHubAccount
14+
from athenian.api.precompute.context import PrecomputeContext
15+
16+
17+
async def _record_performance_metrics(
18+
prometheus_endpoint: str,
19+
rdb: Database,
20+
):
21+
log = logging.getLogger(f"{metadata.__package__}._record_performance_metrics")
22+
for pct in ("50", "95"):
23+
inserted = []
24+
for attempt in range(3):
25+
async with aiohttp.ClientSession() as session:
26+
now = datetime.now(timezone.utc)
27+
async with session.get(
28+
f"{prometheus_endpoint}/api/v1/query",
29+
params={
30+
"query": (
31+
f"histogram_quantile(0.{pct}, "
32+
"sum(rate(request_latency_seconds_bucket[24h])) by "
33+
"(endpoint, account, le))"
34+
),
35+
},
36+
) as response:
37+
if not response.ok:
38+
log.error(
39+
"[%d] failed to query Prometheus: %d: %s",
40+
attempt + 1,
41+
response.status,
42+
await response.text(),
43+
)
44+
continue
45+
for obj in (await response.json())["data"]["result"]:
46+
if (value := obj["value"][1]) != "NaN" and (
47+
account := obj["metric"]["account"]
48+
) != "N/A":
49+
inserted.append(
50+
HealthMetric(
51+
account_id=int(account),
52+
name=f'p{pct}/{obj["metric"]["endpoint"]}',
53+
created_at=now,
54+
value=float(value),
55+
).explode(with_primary_keys=True),
56+
)
57+
break
58+
if inserted:
59+
log.info("inserting %d p%s records", len(inserted), pct)
60+
await rdb.execute_many(insert(HealthMetric), inserted)
61+
62+
63+
async def _record_inconsistency_metrics(
64+
prometheus_endpoint: str,
65+
sdb: Database,
66+
rdb: Database,
67+
):
68+
log = logging.getLogger(f"{metadata.__package__}._record_inconsistency_metrics")
69+
acc_id_map = dict(
70+
await sdb.fetch_all(select(AccountGitHubAccount.id, AccountGitHubAccount.account_id)),
71+
)
72+
inserted = []
73+
for attempt in range(3):
74+
async with aiohttp.ClientSession() as session:
75+
now = datetime.now(timezone.utc)
76+
async with session.get(
77+
f"{prometheus_endpoint}/api/v1/query",
78+
params={
79+
"query": "metadata_github_consistency_nodes_issues",
80+
},
81+
) as response:
82+
if not response.ok:
83+
log.error(
84+
"[%d] failed to query Prometheus: %d: %s",
85+
attempt + 1,
86+
response.status,
87+
await response.text(),
88+
)
89+
continue
90+
for obj in (await response.json())["data"]["result"]:
91+
try:
92+
account = acc_id_map[int(obj["metric"]["acc_id"])]
93+
except KeyError:
94+
continue
95+
inserted.append(
96+
HealthMetric(
97+
account_id=account,
98+
name=f'inconsistency/{obj["metric"]["node_type"]}',
99+
created_at=now,
100+
value=int(obj["value"][1]),
101+
).explode(with_primary_keys=True),
102+
)
103+
if inserted:
104+
log.info("inserting %d data inconsistency records", len(inserted))
105+
await rdb.execute_many(insert(HealthMetric), inserted)
106+
107+
108+
async def main(context: PrecomputeContext, args: argparse.Namespace) -> Any:
109+
"""Fill missing commit references in the deployed components."""
110+
await gather(
111+
_record_performance_metrics(args.prometheus, context.rdb),
112+
_record_inconsistency_metrics(args.prometheus, context.sdb, context.rdb),
113+
)

0 commit comments

Comments
 (0)