From 096b118d4a42f3b63903c0a4b78e001cb47e7d5a Mon Sep 17 00:00:00 2001 From: Joseph Sawaya Date: Mon, 24 Mar 2025 13:05:17 -0400 Subject: [PATCH 1/2] feat: add ability to read new file format i'm introducing 2 things here: - the ability to read the output of the new ta cache rollups code in the new location in GCS - the ability to read multiple versions of the new rollup file - its probable that the "no version" file never exists in prod GCS but i'd like to establish this pattern in the code nonetheless i also add metrics --- docker-compose.yml | 15 +- docker/test.yml | 2 + ...icsTestCase__gql_query_with_new_ta__0.json | 77 +++++++++ graphql_api/tests/test_test_analytics.py | 153 ++++++++++++++---- .../types/test_analytics/test_analytics.py | 43 +++-- rollouts/__init__.py | 2 + utils/test_results.py | 148 ++++++++++++++++- 7 files changed, 391 insertions(+), 49 deletions(-) create mode 100644 graphql_api/tests/snapshots/analytics__TestAnalyticsTestCase__gql_query_with_new_ta__0.json diff --git a/docker-compose.yml b/docker-compose.yml index 7145edf0db..c7be9d701f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,7 +16,7 @@ services: - RUN_ENV=DEV # Improves pytest-cov performance in python 3.12 # https://github.com/nedbat/coveragepy/issues/1665#issuecomment-1937075835 - - COVERAGE_CORE=sysmon + - COVERAGE_CORE=sysmon env_file: - .testenv @@ -42,3 +42,16 @@ services: redis: image: redis:6-alpine + minio: + image: minio/minio:latest + command: server /export + ports: + - "${MINIO_PORT:-9000}:9000" + environment: + - MINIO_ACCESS_KEY=codecov-default-key + - MINIO_SECRET_KEY=codecov-default-secret + volumes: + - type: tmpfs + target: /export + tmpfs: + size: 256M diff --git a/docker/test.yml b/docker/test.yml index 2e0c4481aa..19492b14a3 100644 --- a/docker/test.yml +++ b/docker/test.yml @@ -23,6 +23,8 @@ services: access_key_id: codecov-default-key secret_access_key: codecov-default-secret verify_ssl: false + port: 9000 + host: minio github: bot: diff --git a/graphql_api/tests/snapshots/analytics__TestAnalyticsTestCase__gql_query_with_new_ta__0.json b/graphql_api/tests/snapshots/analytics__TestAnalyticsTestCase__gql_query_with_new_ta__0.json new file mode 100644 index 0000000000..6dffacc939 --- /dev/null +++ b/graphql_api/tests/snapshots/analytics__TestAnalyticsTestCase__gql_query_with_new_ta__0.json @@ -0,0 +1,77 @@ +[ + { + "cursor": "MC44fHRlc3Q0", + "node": { + "name": "test4", + "failureRate": 0.8, + "flakeRate": 0.0, + "avgDuration": 100.0, + "totalFailCount": 20, + "totalFlakyFailCount": 0, + "totalPassCount": 5, + "totalSkipCount": 5, + "commitsFailed": 5, + "lastDuration": 100.0 + } + }, + { + "cursor": "MC43NXx0ZXN0Mw==", + "node": { + "name": "test3", + "failureRate": 0.75, + "flakeRate": 0.0, + "avgDuration": 100.0, + "totalFailCount": 15, + "totalFlakyFailCount": 0, + "totalPassCount": 5, + "totalSkipCount": 5, + "commitsFailed": 5, + "lastDuration": 100.0 + } + }, + { + "cursor": "MC42NjY2NjY2NjY2NjY2NjY2fHRlc3Qy", + "node": { + "name": "test2", + "failureRate": 0.6666666666666666, + "flakeRate": 0.0, + "avgDuration": 100.0, + "totalFailCount": 10, + "totalFlakyFailCount": 0, + "totalPassCount": 5, + "totalSkipCount": 5, + "commitsFailed": 5, + "lastDuration": 100.0 + } + }, + { + "cursor": "MC41fHRlc3Qx", + "node": { + "name": "test1", + "failureRate": 0.5, + "flakeRate": 0.5, + "avgDuration": 100.0, + "totalFailCount": 5, + "totalFlakyFailCount": 5, + "totalPassCount": 5, + "totalSkipCount": 5, + "commitsFailed": 5, + "lastDuration": 100.0 + } + }, + { + "cursor": "MC4wfHRlc3Qw", + "node": { + "name": "test0", + "failureRate": 0.0, + "flakeRate": 0.0, + "avgDuration": 100.0, + "totalFailCount": 0, + "totalFlakyFailCount": 0, + "totalPassCount": 5, + "totalSkipCount": 5, + "commitsFailed": 5, + "lastDuration": 100.0 + } + } +] diff --git a/graphql_api/tests/test_test_analytics.py b/graphql_api/tests/test_test_analytics.py index c9b835e483..07501ca25d 100644 --- a/graphql_api/tests/test_test_analytics.py +++ b/graphql_api/tests/test_test_analytics.py @@ -1,14 +1,16 @@ import datetime from base64 import b64encode +from itertools import chain from typing import Any import polars as pl import pytest +from django.conf import settings from shared.django_apps.codecov_auth.tests.factories import OwnerFactory from shared.django_apps.core.tests.factories import RepositoryFactory from shared.helpers.redis import get_redis_connection +from shared.storage import get_appropriate_storage_service from shared.storage.exceptions import BucketAlreadyExistsError -from shared.storage.memory import MemoryStorageService from graphql_api.types.enums import ( OrderingDirection, @@ -48,12 +50,32 @@ def __call__(self, updated_at: datetime.datetime) -> dict[str, Any]: } -@pytest.fixture -def mock_storage(mocker): - m = mocker.patch("utils.test_results.get_appropriate_storage_service") - storage_server = MemoryStorageService({}) - m.return_value = storage_server - yield storage_server +def create_no_version_row(updated_at: datetime.datetime) -> list[dict[str, Any]]: + return [ + { + "timestamp_bin": datetime.datetime( + updated_at.year, updated_at.month, updated_at.day + ), + "computed_name": f"test{i}", + "flags": [f"flag{i}"], + "updated_at": updated_at, + "avg_duration": 100.0, + "fail_count": i, + "flaky_fail_count": 1 if i == 1 else 0, + "pass_count": 1, + "skip_count": 1, + "failing_commits": 1, + "last_duration": 100.0, + } + for i in range(5) + ] + + +def create_v1_row(updated_at: datetime.datetime) -> list[dict[str, Any]]: + return [ + {**row, "testsuite": f"testsuite{i}"} + for i, row in enumerate(create_no_version_row(updated_at)) + ] base_gql_query = """ @@ -80,6 +102,15 @@ def mock_storage(mocker): for i in range(0, len(rows_with_duplicate_names) - 1, 2): rows_with_duplicate_names[i]["name"] = rows_with_duplicate_names[i + 1]["name"] +no_version_rows = list( + chain.from_iterable( + create_no_version_row(datetime.datetime.now()) for i in range(5) + ) +) +v1_rows = list( + chain.from_iterable(create_v1_row(datetime.datetime.now()) for i in range(5)) +) + def dedup(rows: list[dict]) -> list[dict]: by_name = {} @@ -135,6 +166,8 @@ def row_to_camel_case(row: dict) -> dict: test_results_table = pl.DataFrame(rows) test_results_table_with_duplicate_names = pl.DataFrame(rows_with_duplicate_names) +test_results_table_no_version = pl.DataFrame(no_version_rows) +test_results_table_v1 = pl.DataFrame(v1_rows) def base64_encode_string(x: str) -> str: @@ -169,15 +202,15 @@ def store_in_redis(repository): @pytest.fixture -def store_in_storage(repository, mock_storage): - from django.conf import settings +def store_in_storage(repository): + storage = get_appropriate_storage_service() try: - mock_storage.create_root_storage(settings.GCS_BUCKET_NAME) + storage.create_root_storage(settings.GCS_BUCKET_NAME) except BucketAlreadyExistsError: pass - mock_storage.write_file( + storage.write_file( settings.GCS_BUCKET_NAME, f"test_results/rollups/{repository.repoid}/{repository.branch}/30", test_results_table.write_ipc(None).getvalue(), @@ -185,7 +218,7 @@ def store_in_storage(repository, mock_storage): yield - mock_storage.delete_file( + storage.delete_file( settings.GCS_BUCKET_NAME, f"test_results/rollups/{repository.repoid}/{repository.branch}/30", ) @@ -215,20 +248,17 @@ def test_get_test_results( repository, store_in_redis, store_in_storage, - mock_storage, ): results = get_results(repository.repoid, repository.branch, 30) assert results is not None assert results.equals(dedup_table(test_results_table)) - def test_get_test_results_no_storage( - self, transactional_db, repository, mock_storage - ): + def test_get_test_results_no_storage(self, transactional_db, repository): assert get_results(repository.repoid, repository.branch, 30) is None def test_get_test_results_no_redis( - self, mocker, transactional_db, repository, store_in_storage, mock_storage + self, mocker, transactional_db, repository, store_in_storage ): m = mocker.patch("services.task.TaskService.cache_test_results_redis") results = get_results(repository.repoid, repository.branch, 30) @@ -237,9 +267,7 @@ def test_get_test_results_no_redis( m.assert_called_once_with(repository.repoid, repository.branch) - def test_test_results( - self, transactional_db, repository, store_in_redis, mock_storage, snapshot - ): + def test_test_results(self, transactional_db, repository, store_in_redis, snapshot): test_results = generate_test_results( repoid=repository.repoid, ordering=TestResultsOrderingParameter.UPDATED_AT, @@ -261,7 +289,7 @@ def test_test_results( ] def test_test_results_asc( - self, transactional_db, repository, store_in_redis, mock_storage, snapshot + self, transactional_db, repository, store_in_redis, snapshot ): test_results = generate_test_results( repoid=repository.repoid, @@ -373,7 +401,6 @@ def test_test_results_pagination( end_cursor, repository, store_in_redis, - mock_storage, snapshot, ): test_results = generate_test_results( @@ -489,7 +516,6 @@ def test_test_results_pagination_asc( end_cursor, repository, store_in_redis, - mock_storage, snapshot, ): test_results = generate_test_results( @@ -515,9 +541,7 @@ def test_test_results_pagination_asc( if isinstance(row["node"], TestResultsRow) ] - def test_test_analytics_term_filter( - self, repository, store_in_redis, mock_storage, snapshot - ): + def test_test_analytics_term_filter(self, repository, store_in_redis, snapshot): test_results = generate_test_results( repoid=repository.repoid, term=rows[0]["name"][2:], @@ -563,9 +587,7 @@ def test_test_analytics_testsuite_filter( if isinstance(row["node"], TestResultsRow) ] - def test_test_analytics_flag_filter( - self, repository, store_in_redis, mock_storage, snapshot - ): + def test_test_analytics_flag_filter(self, repository, store_in_redis, snapshot): test_results = generate_test_results( repoid=repository.repoid, flags=[rows[0]["flags"][0]], @@ -588,7 +610,7 @@ def test_test_analytics_flag_filter( if isinstance(row["node"], TestResultsRow) ] - def test_gql_query(self, repository, store_in_redis, mock_storage): + def test_gql_query(self, repository, store_in_redis): query = base_gql_query % ( repository.author.username, repository.name, @@ -632,7 +654,7 @@ def test_gql_query(self, repository, store_in_redis, mock_storage): ] def test_gql_query_with_duplicate_names( - self, repository, store_in_redis_with_duplicate_names, mock_storage + self, repository, store_in_redis_with_duplicate_names ): query = base_gql_query % ( repository.author.username, @@ -676,7 +698,7 @@ def test_gql_query_with_duplicate_names( for row in dedup(rows_with_duplicate_names) ] - def test_gql_query_aggregates(self, repository, store_in_redis, mock_storage): + def test_gql_query_aggregates(self, repository, store_in_redis): query = base_gql_query % ( repository.author.username, repository.name, @@ -703,7 +725,7 @@ def test_gql_query_aggregates(self, repository, store_in_redis, mock_storage): "totalSlowTests": 1, } - def test_gql_query_flake_aggregates(self, repository, store_in_redis, mock_storage): + def test_gql_query_flake_aggregates(self, repository, store_in_redis): query = base_gql_query % ( repository.author.username, repository.name, @@ -721,3 +743,68 @@ def test_gql_query_flake_aggregates(self, repository, store_in_redis, mock_stora "flakeRate": 0.1, "flakeCount": 1, } + + def test_gql_query_with_new_ta(self, mocker, repository, snapshot): + # set the feature flag + mocker.patch("rollouts.READ_NEW_TA.check_value", return_value=True) + + # read file from samples + storage = get_appropriate_storage_service() + try: + storage.create_root_storage(settings.GCS_BUCKET_NAME) + except BucketAlreadyExistsError: + pass + storage.write_file( + settings.GCS_BUCKET_NAME, + f"test_analytics/branch_rollups/{repository.repoid}/{repository.branch}.arrow", + test_results_table_no_version.write_ipc(None).getvalue(), + ) + + # run the GQL query + query = base_gql_query % ( + repository.author.username, + repository.name, + """ + testResults(ordering: { parameter: FAILURE_RATE, direction: DESC } ) { + totalCount + edges { + cursor + node { + name + failureRate + flakeRate + updatedAt + avgDuration + totalFailCount + totalFlakyFailCount + totalPassCount + totalSkipCount + commitsFailed + lastDuration + } + } + } + """, + ) + + result = self.gql_request(query, owner=repository.author) + + # take a snapshot of the results + assert ( + result["owner"]["repository"]["testAnalytics"]["testResults"]["totalCount"] + == 5 + ) + assert snapshot("json") == [ + { + **edge, + "node": {k: v for k, v in edge["node"].items() if k != "updatedAt"}, + } + for edge in result["owner"]["repository"]["testAnalytics"]["testResults"][ + "edges" + ] + ] + + storage.delete_file( + settings.GCS_BUCKET_NAME, + f"test_analytics/branch_rollups/{repository.repoid}/{repository.branch}.arrow", + ) diff --git a/graphql_api/types/test_analytics/test_analytics.py b/graphql_api/types/test_analytics/test_analytics.py index 28f26fd69e..26a3c2e885 100644 --- a/graphql_api/types/test_analytics/test_analytics.py +++ b/graphql_api/types/test_analytics/test_analytics.py @@ -34,22 +34,37 @@ INTERVAL_1_DAY = 1 -@dataclass class TestResultsRow: # the order here must match the order of the fields in the query - name: str - testsuite: str | None - flags: list[str] - failure_rate: float - flake_rate: float - updated_at: dt.datetime - avg_duration: float - total_fail_count: int - total_flaky_fail_count: int - total_pass_count: int - total_skip_count: int - commits_where_fail: int - last_duration: float + def __init__( + self, + name: str, + failure_rate: float, + flake_rate: float, + updated_at: dt.datetime, + avg_duration: float, + total_fail_count: int, + total_flaky_fail_count: int, + total_pass_count: int, + total_skip_count: int, + commits_where_fail: int, + last_duration: float, + testsuite: str | None = None, + flags: list[str] | None = None, + ): + self.name = name + self.testsuite = testsuite + self.flags = flags or [] + self.failure_rate = failure_rate + self.flake_rate = flake_rate + self.updated_at = updated_at + self.avg_duration = avg_duration + self.total_fail_count = total_fail_count + self.total_flaky_fail_count = total_flaky_fail_count + self.total_pass_count = total_pass_count + self.total_skip_count = total_skip_count + self.commits_where_fail = commits_where_fail + self.last_duration = last_duration def to_dict(self) -> dict: return { diff --git a/rollouts/__init__.py b/rollouts/__init__.py index 7f46f49555..daf59e7835 100644 --- a/rollouts/__init__.py +++ b/rollouts/__init__.py @@ -11,3 +11,5 @@ def owner_slug(owner: Owner) -> str: # By default, features have one variant: # { "enabled": FeatureVariant(True, 1.0) } + +READ_NEW_TA = Feature("read_new_ta") diff --git a/utils/test_results.py b/utils/test_results.py index 1d5e043871..c42c543646 100644 --- a/utils/test_results.py +++ b/utils/test_results.py @@ -1,11 +1,20 @@ +import tempfile +from datetime import date, timedelta + import polars as pl from django.conf import settings from shared.helpers.redis import get_redis_connection +from shared.metrics import Summary from shared.storage import get_appropriate_storage_service from shared.storage.exceptions import FileNotInStorageError +from rollouts import READ_NEW_TA from services.task import TaskService +get_results_summary = Summary( + "test_results_get_results", "Time it takes to download results from GCS", ["impl"] +) + def redis_key( repoid: int, @@ -61,7 +70,9 @@ def dedup_table(table: pl.DataFrame) -> pl.DataFrame: pl.col("total_flaky_fail_count").sum().alias("total_flaky_fail_count"), pl.col("total_pass_count").sum().alias("total_pass_count"), pl.col("total_skip_count").sum().alias("total_skip_count"), - pl.col("commits_where_fail").sum().alias("commits_where_fail"), + pl.col("commits_where_fail") + .sum() + .alias("commits_where_fail"), # TODO: this is wrong pl.col("last_duration").max().alias("last_duration"), ) .sort("name") @@ -87,6 +98,23 @@ def get_results( deserialize """ # try redis + if READ_NEW_TA.check_value(repoid): + func = new_get_results + label = "new" + else: + func = old_get_results + label = "old" + + with get_results_summary.labels(label).time(): + return func(repoid, branch, interval_start, interval_end) + + +def old_get_results( + repoid: int, + branch: str, + interval_start: int, + interval_end: int | None = None, +) -> pl.DataFrame | None: redis_conn = get_redis_connection() key = redis_key(repoid, branch, interval_start, interval_end) result: bytes | None = redis_conn.get(key) @@ -114,3 +142,121 @@ def get_results( table = dedup_table(table) return table + + +def rollup_blob_path(repoid: int, branch: str | None = None) -> str: + return ( + f"test_analytics/branch_rollups/{repoid}/{branch}.arrow" + if branch + else f"test_analytics/repo_rollups/{repoid}.arrow" + ) + + +def no_version_agg_table(table: pl.LazyFrame) -> pl.LazyFrame: + failure_rate_expr = (pl.col("fail_count")).sum() / ( + pl.col("fail_count") + pl.col("pass_count") + ).sum() + + flake_rate_expr = (pl.col("flaky_fail_count")).sum() / ( + pl.col("fail_count") + pl.col("pass_count") + ).sum() + + avg_duration_expr = ( + pl.col("avg_duration") * (pl.col("pass_count") + pl.col("fail_count")) + ).sum() / (pl.col("pass_count") + pl.col("fail_count")).sum() + + table = table.group_by(pl.col("computed_name").alias("name")).agg( + pl.col("flags") + .explode() + .unique() + .alias("flags"), # TODO: filter by this before we aggregate + pl.col("failing_commits").sum().alias("commits_where_fail"), + pl.col("last_duration").max().alias("last_duration"), + failure_rate_expr.alias("failure_rate"), + flake_rate_expr.alias("flake_rate"), + avg_duration_expr.alias("avg_duration"), + pl.col("pass_count").sum().alias("total_pass_count"), + pl.col("fail_count").sum().alias("total_fail_count"), + pl.col("flaky_fail_count").sum().alias("total_flaky_fail_count"), + pl.col("skip_count").sum().alias("total_skip_count"), + pl.col("updated_at").max().alias("updated_at"), + ) + + return table + + +def v1_agg_table(table: pl.LazyFrame) -> pl.LazyFrame: + failure_rate_expr = (pl.col("fail_count")).sum() / ( + pl.col("fail_count") + pl.col("pass_count") + ).sum() + + flake_rate_expr = (pl.col("flaky_fail_count")).sum() / ( + pl.col("fail_count") + pl.col("pass_count") + ).sum() + + avg_duration_expr = ( + pl.col("avg_duration") * (pl.col("pass_count") + pl.col("fail_count")) + ).sum() / (pl.col("pass_count") + pl.col("fail_count")).sum() + + table = table.group_by("computed_name").agg( + pl.col("testsuite").alias( + "testsuite" + ), # TODO: filter by this before we aggregate + pl.col("flags") + .explode() + .unique() + .alias("flags"), # TODO: filter by this before we aggregate + pl.col("failing_commits").sum().alias("commits_where_fail"), + pl.col("last_duration").max().alias("last_duration"), + failure_rate_expr.alias("failure_rate"), + flake_rate_expr.alias("flake_rate"), + avg_duration_expr.alias("avg_duration"), + pl.col("pass_count").sum().alias("total_pass_count"), + pl.col("fail_count").sum().alias("total_fail_count"), + pl.col("flaky_fail_count").sum().alias("total_flaky_fail_count"), + pl.col("skip_count").sum().alias("total_skip_count"), + pl.col("updated_at").max().alias("updated_at"), + ) + + return table + + +def new_get_results( + repoid: int, + branch: str | None, + interval_start: int, + interval_end: int | None = None, +) -> pl.DataFrame | None: + storage_service = get_appropriate_storage_service(repoid) + key = rollup_blob_path(repoid, branch) + try: + with tempfile.TemporaryFile() as tmp: + metadata = {} + storage_service.read_file( + bucket_name=settings.GCS_BUCKET_NAME, + path=key, + file_obj=tmp, + metadata_container=metadata, + ) + + table = pl.scan_ipc(tmp) + + # filter start + start_date = date.today() - timedelta(days=interval_start) + table = table.filter(pl.col("timestamp_bin") >= start_date) + + # filter end + if interval_end is not None: + end_date = date.today() - timedelta(days=interval_end) + table = table.filter(pl.col("timestamp_bin") <= end_date) + + # aggregate + match metadata.get("version"): + case "1": + table = v1_agg_table(table) + case _: # no version is missding + table = no_version_agg_table(table) + + return table.collect() + except FileNotInStorageError: + return None From 9774f44dcd4a9dba4eb69d0a747879c375b5d73b Mon Sep 17 00:00:00 2001 From: Joseph Sawaya Date: Mon, 24 Mar 2025 13:05:17 -0400 Subject: [PATCH 2/2] feat: add ability to read new file format i'm introducing 2 things here: - the ability to read the output of the new ta cache rollups code in the new location in GCS - the ability to read multiple versions of the new rollup file - its probable that the "no version" file never exists in prod GCS but i'd like to establish this pattern in the code nonetheless i also add metrics --- docker-compose.yml | 2 -- docker/test.yml | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index c7be9d701f..4794f568c1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -45,8 +45,6 @@ services: minio: image: minio/minio:latest command: server /export - ports: - - "${MINIO_PORT:-9000}:9000" environment: - MINIO_ACCESS_KEY=codecov-default-key - MINIO_SECRET_KEY=codecov-default-secret diff --git a/docker/test.yml b/docker/test.yml index 19492b14a3..155f06646a 100644 --- a/docker/test.yml +++ b/docker/test.yml @@ -23,6 +23,7 @@ services: access_key_id: codecov-default-key secret_access_key: codecov-default-secret verify_ssl: false + bucket: archive port: 9000 host: minio