Skip to content

Commit 51051b5

Browse files
committed
[DEV-5541] Measure the internal data health metrics
1 parent 86c6258 commit 51051b5

File tree

11 files changed

+221
-44
lines changed

11 files changed

+221
-44
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from dataclasses import dataclass
2+
3+
from athenian.api.internal.features.entries import MinePullRequestMetrics
4+
from athenian.api.internal.miners.github.branches import BranchMinerMetrics
5+
from athenian.api.internal.miners.github.deployment import MineDeploymentsMetrics
6+
from athenian.api.internal.miners.github.release_load import MineReleaseMetrics
7+
from athenian.api.internal.reposet import RepositorySetMetrics
8+
9+
10+
@dataclass(frozen=True, slots=True)
11+
class DataHealthMetrics:
12+
"""Collection of data error statistics to report."""
13+
14+
branches: BranchMinerMetrics
15+
deployments: MineDeploymentsMetrics
16+
prs: MinePullRequestMetrics
17+
releases: MineReleaseMetrics
18+
reposet: RepositorySetMetrics
19+
20+
@classmethod
21+
def empty(cls) -> "DataHealthMetrics":
22+
"""Initialize a new DataHealthMetrics instance filled with zeros."""
23+
return DataHealthMetrics(**{k: v.empty() for k, v in cls.__annotations__.items()})

server/athenian/api/internal/features/entries.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import asyncio
44
from collections.abc import Iterator
55
import dataclasses
6+
from dataclasses import dataclass
67
from datetime import datetime, timezone
78
from functools import partial, reduce
89
from itertools import chain
@@ -1596,7 +1597,7 @@ def make_calculator(
15961597
)
15971598

15981599

1599-
@dataclasses.dataclass(frozen=True, slots=True)
1600+
@dataclass(frozen=True, slots=True)
16001601
class MetricsLineRequest:
16011602
"""Common base for multiple metrics request classes."""
16021603

@@ -1622,7 +1623,7 @@ def all_jira_filters(self) -> Iterator[JIRAFilter]:
16221623
return (td.jira_filter for td in self.teams)
16231624

16241625

1625-
@dataclasses.dataclass(frozen=True, slots=True)
1626+
@dataclass(frozen=True, slots=True)
16261627
class TeamSpecificFilters:
16271628
"""Filters that are different for each team."""
16281629

@@ -1645,6 +1646,21 @@ def __sentry_repr__(self):
16451646
return str(dikt)
16461647

16471648

1649+
@dataclass(slots=True)
1650+
class MinePullRequestMetrics:
1651+
"""Various statistics about mined pull requests."""
1652+
1653+
count: int
1654+
done_count: int
1655+
merged_count: int
1656+
open_count: int
1657+
1658+
@classmethod
1659+
def empty(cls) -> MinePullRequestMetrics:
1660+
"""Initialize a new MinePullRequestMetrics instance filled with zeros."""
1661+
return MinePullRequestMetrics(0, 0, 0, 0)
1662+
1663+
16481664
class PRFactsCalculator:
16491665
"""Calculator for Pull Requests facts."""
16501666

@@ -1696,6 +1712,7 @@ async def __call__(
16961712
with_jira: JIRAEntityToFetch | int,
16971713
branches: Optional[pd.DataFrame] = None,
16981714
default_branches: Optional[dict[str, str]] = None,
1715+
metrics: Optional[MinePullRequestMetrics] = None,
16991716
) -> pd.DataFrame:
17001717
"""
17011718
Calculate facts about pull request on GitHub.
@@ -1725,6 +1742,8 @@ async def __call__(
17251742
)
17261743
if df.empty:
17271744
df = pd.DataFrame(columns=PullRequestFacts.f)
1745+
if metrics is not None:
1746+
self._set_count_metrics(df, metrics)
17281747
return df
17291748

17301749
@sentry_span
@@ -2026,6 +2045,15 @@ async def _call_cached(
20262045
)
20272046
return all_facts_df, with_jira
20282047

2048+
@staticmethod
2049+
def _set_count_metrics(facts: pd.DataFrame, metrics: MinePullRequestMetrics) -> None:
2050+
metrics.prs.count = len(facts)
2051+
metrics.prs.done_count = facts[PullRequestFacts.f.done].sum()
2052+
metrics.prs.merged_count = (
2053+
facts[PullRequestFacts.f.merged].notnull() & ~facts[PullRequestFacts.f.done]
2054+
).sum()
2055+
metrics.prs.open_count = facts[PullRequestFacts.f.closed].isnull().sum()
2056+
20292057

20302058
class ParticipantsMerge:
20312059
"""Utilities to merge multiple collections of participants."""

server/athenian/api/internal/miners/github/branches.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,15 @@
2525
class BranchMinerMetrics:
2626
"""Branch source data error statistics."""
2727

28-
empty: int
28+
count: int
29+
empty_count: int
2930
no_default: int
3031

32+
@classmethod
33+
def empty(cls) -> "BranchMinerMetrics":
34+
"""Initialize a new BranchMinerMetrics instance filled with zeros."""
35+
return BranchMinerMetrics(0, 0, 0)
36+
3137

3238
@cached_methods
3339
class BranchMiner:
@@ -176,7 +182,9 @@ async def extract_branches(
176182
if items:
177183
report("the following repositories have 0 branches: %s", items)
178184
if metrics is not None:
179-
metrics.empty += len(items)
185+
metrics.empty_count += len(items)
186+
if metrics is not None:
187+
metrics.count = len(branches)
180188
return branches, default_branches
181189

182190
@classmethod

server/athenian/api/internal/miners/github/commit.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from dataclasses import dataclass
12
from datetime import datetime, timedelta, timezone
23
from enum import Enum
34
import logging
@@ -67,6 +68,23 @@ class FilterCommitsProperty(Enum):
6768
DAG = tuple[np.ndarray, np.ndarray, np.ndarray]
6869

6970

71+
@dataclass(slots=True)
72+
class CommitDAGMetrics:
73+
"""Commit DAG error statistics, per repository.
74+
75+
Intersections are possible. For example, `bool(pristine & corrupted)` can be `True`.
76+
"""
77+
78+
pristine: set[str]
79+
corrupted: set[str]
80+
orphaned: set[str]
81+
82+
@classmethod
83+
def empty(cls) -> "CommitDAGMetrics":
84+
"""Initialize a new CommitDAGMetrics instance filled with zeros."""
85+
return CommitDAGMetrics(set(), set(), set())
86+
87+
7088
def _postprocess_extract_commits(result, with_deployments=True, **_):
7189
if isinstance(result, tuple):
7290
if with_deployments:
@@ -367,6 +385,7 @@ async def fetch_repository_commits(
367385
mdb: Database,
368386
pdb: Database,
369387
cache: Optional[aiomcache.Client],
388+
metrics: Optional[CommitDAGMetrics] = None,
370389
) -> dict[str, tuple[bool, DAG]]:
371390
"""
372391
Load full commit DAGs for the given repositories.
@@ -379,6 +398,7 @@ async def fetch_repository_commits(
379398
3. Commit timestamp. \
380399
4. Commit repository name.
381400
:param prune: Remove any commits that are not accessible from `branches`.
401+
:param metrics: Mutable error statistics, will be written on new fetches.
382402
:return: Map from repository names to their DAG consistency indicators and bodies.
383403
"""
384404
if branches.empty:
@@ -495,6 +515,8 @@ async def execute():
495515
for repo, pdag in repos.items():
496516
if repo not in result:
497517
result[repo] = (True, _empty_dag()) if prune else pdag
518+
if metrics is not None and len(result[repo][1][0]) == 0:
519+
metrics.pristine.add(repo)
498520
return result
499521

500522

@@ -636,10 +658,12 @@ async def _fetch_commit_history_dag(
636658
meta_ids: tuple[int, ...],
637659
mdb: Database,
638660
alloc=None,
661+
metrics: Optional[CommitDAGMetrics] = None,
639662
) -> tuple[bool, str, np.ndarray, np.ndarray, np.ndarray]:
663+
# these are some approximately sensible defaults, found by experiment
640664
max_stop_heads = 25
641665
max_inner_partitions = 25
642-
log = logging.getLogger("%s._fetch_commit_history_dag" % metadata.__package__)
666+
log = logging.getLogger(f"{metadata.__package__}._fetch_commit_history_dag")
643667
# there can be duplicates, remove them
644668
head_hashes = np.asarray(head_hashes, dtype="S40")
645669
head_ids = np.asarray(head_ids, dtype=int)
@@ -688,13 +712,16 @@ async def _fetch_commit_history_dag(
688712
bads, bad_seeds, bad_hashes = verify_edges_integrity(new_edges, alloc)
689713
if bads:
690714
log.warning(
691-
"%d @ %d new DAG edges are not consistent (%d commits / %d existing): %s",
715+
"%s: %d @ %d new DAG edges are not consistent (%d commits / %d existing): %s",
716+
repo,
692717
len(bads),
693718
len(bad_seeds),
694719
len(bad_hashes),
695720
len(hashes),
696721
[new_edges[i] for i in bad_seeds[:10]],
697722
)
723+
if metrics is not None:
724+
metrics.corrupted.add(repo)
698725
consistent = False
699726
for i in bads[::-1]:
700727
new_edges.pop(i)
@@ -733,6 +760,8 @@ async def _fetch_commit_history_dag(
733760
"skipping orphans which are suspiciously young: %s",
734761
", ".join(removed_orphans_hashes),
735762
)
763+
if metrics is not None:
764+
metrics.orphaned.add(repo)
736765
consistent = False
737766
for i in sorted(removed_orphans_indexes, reverse=True):
738767
new_edges.pop(i)

server/athenian/api/internal/miners/github/deployment.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,19 @@
141141
from athenian.api.unordered_unique import in1d_str, map_array_values, unordered_unique
142142

143143

144+
@dataclass(slots=True)
145+
class MineDeploymentsMetrics:
146+
"""Deployment mining error statistics."""
147+
148+
count: int
149+
unresolved: int
150+
151+
@classmethod
152+
def empty(cls) -> "MineDeploymentsMetrics":
153+
"""Initialize a new MineDeploymentsMetrics instance filled with zeros."""
154+
return MineDeploymentsMetrics(0, 0)
155+
156+
144157
async def mine_deployments(
145158
repositories: Collection[str],
146159
participants: ReleaseParticipants,
@@ -166,6 +179,7 @@ async def mine_deployments(
166179
cache: Optional[aiomcache.Client],
167180
with_extended_prs: bool = False,
168181
with_jira: bool = False,
182+
metrics: Optional[MineDeploymentsMetrics] = None,
169183
) -> pd.DataFrame:
170184
"""Gather facts about deployments that satisfy the specified filters.
171185
@@ -209,6 +223,7 @@ async def mine_deployments(
209223
cache,
210224
with_extended_prs,
211225
with_jira,
226+
metrics,
212227
)
213228
if notifications.empty:
214229
return pd.DataFrame()
@@ -240,6 +255,9 @@ async def mine_deployments(
240255
subst.fill(pd.DataFrame())
241256
joined["labels"].values[no_labels] = subst
242257

258+
if metrics is not None:
259+
metrics.count = len(joined)
260+
243261
return joined
244262

245263

@@ -327,6 +345,7 @@ async def _mine_deployments(
327345
cache: Optional[aiomcache.Client],
328346
with_extended_prs: bool,
329347
with_jira: bool,
348+
metrics: Optional[MineDeploymentsMetrics],
330349
) -> tuple[
331350
pd.DataFrame,
332351
pd.DataFrame,
@@ -338,6 +357,7 @@ async def _mine_deployments(
338357
bool,
339358
bool,
340359
]:
360+
log = logging.getLogger(f"{metadata.__package__}.mine_deployments")
341361
if not isinstance(repositories, (set, frozenset, KeysView)):
342362
repositories = set(repositories)
343363
if repositories:
@@ -361,10 +381,19 @@ async def _mine_deployments(
361381
rdb,
362382
cache,
363383
)
384+
notifications_count_before_pruning = len(notifications)
364385
(notifications, components), labels = await gather(
365386
fetch_components_and_prune_unresolved(notifications, prefixer, account, rdb),
366387
fetch_labels(notifications.index.values, account, rdb),
367388
)
389+
if unresolved := notifications_count_before_pruning - len(notifications):
390+
log.warning(
391+
"removed %d / %d unresolved notifications",
392+
unresolved,
393+
notifications_count_before_pruning,
394+
)
395+
if metrics is not None:
396+
metrics.unresolved = unresolved
368397
if notifications.empty:
369398
return (*repeat(pd.DataFrame(), 7), with_extended_prs, with_jira)
370399

@@ -409,7 +438,7 @@ async def _mine_deployments(
409438
rdb,
410439
cache,
411440
),
412-
name="_fetch_precomputed_deployed_releases(%d)" % len(notifications),
441+
name=f"_fetch_precomputed_deployed_releases({len(notifications)})",
413442
)
414443
facts = await _fetch_precomputed_deployment_facts(
415444
notifications.index.values, default_branches, release_settings, account, pdb,
@@ -579,7 +608,6 @@ async def _invalidate_precomputed_on_out_of_order_notifications(
579608
not yet precomputed.
580609
581610
A unicode string array with the names of affected precomputed deployments is returned.
582-
583611
"""
584612
if len(to_invalidate := _find_invalid_precomputed_deploys(notifications, missed_mask)):
585613
log = logging.getLogger(f"{metadata.__package__}.mine_deployments")

server/athenian/api/internal/miners/github/deployment_light.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -244,10 +244,8 @@ async def fetch_components_and_prune_unresolved(
244244
"""Remove deployment notifications with unresolved components. Fetch the components."""
245245
components = await read_sql_query(
246246
select(DeployedComponent).where(
247-
and_(
248-
DeployedComponent.account_id == account,
249-
DeployedComponent.deployment_name.in_any_values(notifications.index.values),
250-
),
247+
DeployedComponent.account_id == account,
248+
DeployedComponent.deployment_name.in_any_values(notifications.index.values),
251249
),
252250
rdb,
253251
DeployedComponent,

0 commit comments

Comments
 (0)