|
| 1 | +# This Source Code Form is subject to the terms of the Mozilla Public |
| 2 | +# License, v. 2.0. If a copy of the MPL was not distributed with this |
| 3 | +# file, You can obtain one at http://mozilla.org/MPL/2.0/. |
| 4 | +from __future__ import annotations |
| 5 | + |
| 6 | +import io |
| 7 | +import logging |
| 8 | +from pathlib import Path |
| 9 | +from time import sleep |
| 10 | + |
| 11 | +from flask import current_app |
| 12 | + |
| 13 | +from landoapi.hg import ( |
| 14 | + HgRepo, |
| 15 | +) |
| 16 | +from landoapi.models.revisions import Revision |
| 17 | +from landoapi.repos import repo_clone_subsystem |
| 18 | +from landoapi.storage import db |
| 19 | +from landoapi.landing_worker import LandingWorker |
| 20 | +from landoapi.phabricator import PhabricatorClient |
| 21 | + |
| 22 | +from mots.config import FileConfig |
| 23 | +from mots.directory import Directory |
| 24 | + |
| 25 | +logger = logging.getLogger(__name__) |
| 26 | + |
| 27 | + |
| 28 | +def get_conduit_data(method, **kwargs): |
| 29 | + """Fetch result from conduit API request.""" |
| 30 | + phab = PhabricatorClient( |
| 31 | + current_app.config["PHABRICATOR_URL"], |
| 32 | + current_app.config["PHABRICATOR_UNPRIVILEGED_API_KEY"], |
| 33 | + ) |
| 34 | + data = [] |
| 35 | + result = phab.call_conduit(method, **kwargs) |
| 36 | + data += result["data"] |
| 37 | + while result["cursor"]["after"]: |
| 38 | + result = phab.call_conduit(method, after=result["cursor"]["after"], **kwargs) |
| 39 | + data += result["data"] |
| 40 | + return data |
| 41 | + |
| 42 | + |
| 43 | +def get_revisions_list(statuses=None): |
| 44 | + """Get a list of revisions of given statuses.""" |
| 45 | + statuses = statuses or ["needs-review", "accepted"] |
| 46 | + revisions = get_conduit_data( |
| 47 | + "differential.revision.search", |
| 48 | + constraints={"statuses": statuses}, |
| 49 | + ) |
| 50 | + revisions = [ |
| 51 | + { |
| 52 | + "revision_id": r["id"], |
| 53 | + "diff_phid": r["fields"]["diffPHID"], |
| 54 | + "repo_phid": r["fields"]["repositoryPHID"], |
| 55 | + } |
| 56 | + for r in revisions |
| 57 | + if r["fields"]["diffPHID"] and r["fields"]["repositoryPHID"] |
| 58 | + ] |
| 59 | + |
| 60 | + diff_phids = [r["diff_phid"] for r in revisions] |
| 61 | + diff_ids = get_conduit_data( |
| 62 | + "differential.diff.search", constraints={"phids": diff_phids} |
| 63 | + ) |
| 64 | + diff_map = {d["phid"]: d["id"] for d in diff_ids} |
| 65 | + |
| 66 | + repo_phids = [r["repo_phid"] for r in revisions] |
| 67 | + repo_ids = get_conduit_data( |
| 68 | + "diffusion.repository.search", constraints={"phids": repo_phids} |
| 69 | + ) |
| 70 | + repo_map = {d["phid"]: d["fields"]["shortName"] for d in repo_ids} |
| 71 | + |
| 72 | + for r in revisions: |
| 73 | + r["diff_id"] = diff_map[r["diff_phid"]] |
| 74 | + r["repo_name"] = repo_map[r["repo_phid"]] |
| 75 | + del r["diff_phid"] |
| 76 | + del r["repo_phid"] |
| 77 | + |
| 78 | + return revisions |
| 79 | + |
| 80 | + |
| 81 | +def parse_diff(diff): |
| 82 | + """Given a diff, extract list of affected files.""" |
| 83 | + diff_lines = diff.splitlines() |
| 84 | + file_diffs = [line.split(" ")[2:] for line in diff_lines if line.startswith("diff")] |
| 85 | + file_paths = [] |
| 86 | + for file_diff in file_diffs: |
| 87 | + # Parse source/destination paths. |
| 88 | + path1, path2 = file_diff |
| 89 | + file_paths.append("/".join(path1.split("/")[1:])) |
| 90 | + file_paths.append("/".join(path2.split("/")[1:])) |
| 91 | + file_paths = set(file_paths) |
| 92 | + return file_paths |
| 93 | + |
| 94 | + |
| 95 | +def sync_revisions(): |
| 96 | + """Check and update local database with available revisions.""" |
| 97 | + revisions = get_revisions_list() |
| 98 | + logger.debug(f"Processing {len(revisions)} revisions...") |
| 99 | + for r in revisions: |
| 100 | + logger.debug(f"Processing {r}...") |
| 101 | + query = ( |
| 102 | + Revision.revision_id == r["revision_id"], |
| 103 | + Revision.diff_id == r["diff_id"], |
| 104 | + ) |
| 105 | + revision = Revision.query.filter(*query) |
| 106 | + if revision.count(): |
| 107 | + logger.debug(f"{r} already exists in DB, skipping.") |
| 108 | + continue |
| 109 | + revision = Revision.query.filter(Revision.revision_id == r["revision_id"]) |
| 110 | + if revision.count(): |
| 111 | + logger.debug(f"{r} already exists in DB, updating diff ID.") |
| 112 | + revision.diff_id = r["diff_id"] |
| 113 | + db.session.add(revision) |
| 114 | + db.session.commit() |
| 115 | + continue |
| 116 | + logger.debug(f"Creating {r} in DB.") |
| 117 | + revision = Revision(**r) |
| 118 | + |
| 119 | + # Download and store the patch diff in the DB. |
| 120 | + revision.store_patch() |
| 121 | + |
| 122 | + db.session.add(revision) |
| 123 | + db.session.commit() |
| 124 | + # TODO: identify stale revisions (e.g. when a repo has been updated and the |
| 125 | + # parsed state of the revision is no longer relevant, e.g. check hash.) |
| 126 | + |
| 127 | + |
| 128 | +class RevisionWorker(LandingWorker): |
| 129 | + """A worker that pre-processes revisions. |
| 130 | +
|
| 131 | + This worker continuously synchronises revisions with the remote Phabricator API |
| 132 | + and runs all applicable checks and processes on each revision, if needed. |
| 133 | +
|
| 134 | + TODO: this should extend an abstract worker class, not landing worker. |
| 135 | + """ |
| 136 | + |
| 137 | + processes = [ |
| 138 | + "mots", |
| 139 | + ] |
| 140 | + |
| 141 | + def start(self): |
| 142 | + logger.info("Revision worker starting") |
| 143 | + logger.info( |
| 144 | + f"{len(self.applicable_repos)} applicable repos: {self.applicable_repos}" |
| 145 | + ) |
| 146 | + self.running = True |
| 147 | + |
| 148 | + while self.running: |
| 149 | + sync_revisions() |
| 150 | + |
| 151 | + # get stale revisions |
| 152 | + revisions = Revision.query.filter(Revision.is_stale == True) |
| 153 | + if not revisions.count(): |
| 154 | + sleep(1) |
| 155 | + for revision in revisions: |
| 156 | + logger.info( |
| 157 | + "Running mots checks on revision", extra={"id": revision.id} |
| 158 | + ) |
| 159 | + for process in self.processes: |
| 160 | + getattr(self, f"process_{process}")(revision) |
| 161 | + |
| 162 | + def process_mots(self, revision): |
| 163 | + repo = repo_clone_subsystem.repos[revision.repo_name] |
| 164 | + hgrepo = HgRepo( |
| 165 | + str(repo_clone_subsystem.repo_paths[revision.repo_name]), |
| 166 | + config=repo.config_override, |
| 167 | + ) |
| 168 | + # checkout repo, pull & update |
| 169 | + with hgrepo.for_pull(): |
| 170 | + hgrepo.update_repo(repo.pull_path) |
| 171 | + |
| 172 | + # load mots.yml config |
| 173 | + wd = hgrepo.path |
| 174 | + mots_config = FileConfig(Path(wd) / "mots.yaml") |
| 175 | + mots_directory = Directory(mots_config) |
| 176 | + |
| 177 | + # CHECK query before applying patch, and again after. |
| 178 | + mots_directory.load() |
| 179 | + paths = parse_diff(revision.patch) |
| 180 | + query = {} |
| 181 | + query["pre"] = mots_directory.query(*paths) |
| 182 | + |
| 183 | + with hgrepo.for_pull(): |
| 184 | + hgrepo.update_repo(repo.pull_path) |
| 185 | + try: |
| 186 | + hgrepo.apply_patch(io.BytesIO(revision.patch.encode("utf-8"))) |
| 187 | + except Exception as e: |
| 188 | + # Possible merge conflict, skip for now... |
| 189 | + logger.error(e) |
| 190 | + return |
| 191 | + # hg_cmd = ["diff", "-c", "tip"] # TODO: replace this with rev id |
| 192 | + # hg_out = hgrepo.run_hg(hg_cmd) |
| 193 | + |
| 194 | + # Reload directory with new patch. |
| 195 | + mots_directory.load(full_paths=True) |
| 196 | + |
| 197 | + # query mots for diff files |
| 198 | + query["post"] = mots_directory.query(*paths) |
| 199 | + |
| 200 | + query_result = query["pre"] + query["post"] |
| 201 | + revision.data = {} |
| 202 | + revision.data["mots"] = { |
| 203 | + "modules": [m.serialize() for m in query_result.modules], |
| 204 | + "owners": [o.real_name for o in query_result.owners], |
| 205 | + "peers": [p.real_name for p in query_result.peers], |
| 206 | + "paths": query_result.paths, |
| 207 | + "rejected_paths": query_result.rejected_paths, |
| 208 | + } |
| 209 | + revision.is_stale = False |
| 210 | + db.session.commit() |
0 commit comments