|
9 | 9 |
|
10 | 10 | from athenian.api import metadata |
11 | 11 | from athenian.api.async_utils import gather |
12 | | -from athenian.api.db import Database |
| 12 | +from athenian.api.db import Database, dialect_specific_insert |
13 | 13 | from athenian.api.models.persistentdata.models import HealthMetric |
14 | 14 | from athenian.api.models.state.models import AccountGitHubAccount |
15 | 15 | from athenian.api.precompute.context import PrecomputeContext |
@@ -113,10 +113,84 @@ async def _record_pending_fetch_metrics( |
113 | 113 | ) -> None: |
114 | 114 | log = logging.getLogger(f"{metadata.__package__}._record_pending_fetch_metrics") |
115 | 115 | inserted = [] |
116 | | - # df_pending_prs, df_pending_commits = await gather() |
| 116 | + now = datetime.now(timezone.utc) |
| 117 | + pending_prs_rows, pending_branches_rows, _ = await gather( |
| 118 | + mdb.fetch_all( |
| 119 | + """ |
| 120 | + SELECT edges.acc_id, (edges.value - nodes.value) AS diff |
| 121 | + FROM ( |
| 122 | + SELECT |
| 123 | + re.acc_id, count(*) AS value |
| 124 | + FROM github.node_repository_edge_pullrequests re |
| 125 | + JOIN github.account_repos pa |
| 126 | + ON re.parent_id = pa.repo_graph_id AND re.acc_id = pa.acc_id |
| 127 | + GROUP BY re.acc_id |
| 128 | + ) AS edges JOIN ( |
| 129 | + SELECT pr.acc_id, count(DISTINCT pr.database_id) AS value |
| 130 | + FROM github.node_pullrequest pr |
| 131 | + WHERE |
| 132 | + EXISTS ( |
| 133 | + SELECT 1 FROM github.graph_nodes prn |
| 134 | + WHERE prn.acc_id = pr.acc_id AND prn.node_id = pr.graph_id AND NOT prn.deleted) |
| 135 | + AND |
| 136 | + EXISTS ( |
| 137 | + SELECT 1 FROM github.account_repos pa |
| 138 | + WHERE pr.repository_id = pa.repo_graph_id AND pr.acc_id = pa.acc_id) |
| 139 | + GROUP BY pr.acc_id |
| 140 | + ) AS nodes |
| 141 | + ON edges.acc_id = nodes.acc_id; |
| 142 | + """, |
| 143 | + ), |
| 144 | + mdb.fetch_all( |
| 145 | + """ |
| 146 | + SELECT edges.acc_id, (edges.value - nodes.value) AS diff |
| 147 | + FROM ( |
| 148 | + SELECT |
| 149 | + re.acc_id, count(*) AS value |
| 150 | + FROM github.node_repository_edge_refs re |
| 151 | + JOIN github.account_repos pa |
| 152 | + ON re.parent_id = pa.repo_graph_id AND re.acc_id = pa.acc_id |
| 153 | + GROUP BY re.acc_id |
| 154 | + ) AS edges JOIN ( |
| 155 | + SELECT ref.acc_id, count(*) AS value |
| 156 | + FROM github.node_ref ref |
| 157 | + WHERE EXISTS ( |
| 158 | + SELECT 1 FROM github.graph_nodes prn |
| 159 | + WHERE prn.acc_id = ref.acc_id AND prn.node_id = ref.graph_id AND not prn.deleted) |
| 160 | + AND EXISTS ( |
| 161 | + SELECT 1 FROM github.account_repos pa |
| 162 | + WHERE pa.acc_id = ref.acc_id AND pa.repo_graph_id = ref.repository_id) |
| 163 | + GROUP BY ref.acc_id |
| 164 | + ) AS nodes |
| 165 | + ON edges.acc_id = nodes.acc_id; |
| 166 | + """, |
| 167 | + ), |
| 168 | + acc_id_map_task, |
| 169 | + ) |
| 170 | + acc_id_map = acc_id_map_task.result() |
| 171 | + for name, rows in ( |
| 172 | + ("pending_prs", pending_prs_rows), |
| 173 | + ("pending_branches", pending_branches_rows), |
| 174 | + ): |
| 175 | + for row in rows: |
| 176 | + acc, diff = row |
| 177 | + try: |
| 178 | + acc = acc_id_map[acc] |
| 179 | + except KeyError: |
| 180 | + continue |
| 181 | + inserted.append( |
| 182 | + HealthMetric( |
| 183 | + account_id=acc, |
| 184 | + created_at=now, |
| 185 | + name=name, |
| 186 | + value=diff, |
| 187 | + ).explode(with_primary_keys=True), |
| 188 | + ) |
117 | 189 | if inserted: |
118 | | - log.info("inserting %d data inconsistency records", len(inserted)) |
119 | | - await rdb.execute_many(insert(HealthMetric), inserted) |
| 190 | + log.info("inserting %d pending fetch records", len(inserted)) |
| 191 | + await rdb.execute_many( |
| 192 | + (await dialect_specific_insert(rdb))(HealthMetric).on_conflict_do_nothing(), inserted, |
| 193 | + ) |
120 | 194 |
|
121 | 195 |
|
122 | 196 | async def _fetch_acc_id_map(sdb: Database) -> dict[int, int]: |
|
0 commit comments