Skip to content

Commit 5373bc1

Browse files
authored
feat: add a new tool called scopes (#793)
It's a bit like dorny/paths-filter, but: * it takes only scopes filter from a file * support only python pattern language (no regex) https://docs.python.org/3/library/pathlib.html#pathlib-pattern-language The found scopes are: * printed to stdout * add to GITHUB_OUTPUTS prefix by scope_ Fixes MRGFY-5884
1 parent ce0ba06 commit 5373bc1

File tree

7 files changed

+934
-60
lines changed

7 files changed

+934
-60
lines changed

mergify_cli/ci/cli.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from mergify_cli import utils
44
from mergify_cli.ci import detector
55
from mergify_cli.ci.junit_processing import cli as junit_processing_cli
6+
from mergify_cli.ci.scopes import cli as scopes_cli
67

78

89
class JUnitFile(click.Path):
@@ -187,3 +188,24 @@ async def junit_process( # noqa: PLR0913
187188
tests_target_branch=tests_target_branch,
188189
files=files,
189190
)
191+
192+
193+
@ci.command(
194+
help="""Give the list scope impacted by changed files""",
195+
short_help="""Give the list scope impacted by changed files""",
196+
)
197+
@click.option(
198+
"--config",
199+
"config_path",
200+
required=True,
201+
type=click.Path(exists=True),
202+
default=".mergify-ci.yml",
203+
help="Path to YAML config file.",
204+
)
205+
def scopes(
206+
config_path: str,
207+
) -> None:
208+
try:
209+
scopes_cli.detect(config_path=config_path)
210+
except scopes_cli.ConfigInvalidError as e:
211+
raise click.ClickException(str(e)) from e

mergify_cli/ci/scopes/__init__.py

Whitespace-only changes.

mergify_cli/ci/scopes/cli.py

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
from __future__ import annotations
2+
3+
import json
4+
import os
5+
import pathlib
6+
import subprocess
7+
import typing
8+
9+
import click
10+
import pydantic
11+
import yaml
12+
13+
14+
if typing.TYPE_CHECKING:
15+
from collections import abc
16+
17+
18+
SCOPE_PREFIX = "scope_"
19+
SCOPE_NAME_RE = r"^[A-Za-z0-9_-]+$"
20+
21+
22+
class ConfigInvalidError(Exception):
23+
pass
24+
25+
26+
class ScopeConfig(pydantic.BaseModel):
27+
include: tuple[str, ...] = pydantic.Field(default_factory=tuple)
28+
exclude: tuple[str, ...] = pydantic.Field(default_factory=tuple)
29+
30+
31+
ScopeName = typing.Annotated[
32+
str,
33+
pydantic.StringConstraints(pattern=SCOPE_NAME_RE, min_length=1),
34+
]
35+
36+
37+
class Config(pydantic.BaseModel):
38+
scopes: dict[ScopeName, ScopeConfig]
39+
40+
@classmethod
41+
def from_dict(cls, data: dict[str, typing.Any] | typing.Any) -> Config: # noqa: ANN401
42+
try:
43+
return cls.model_validate(data)
44+
except pydantic.ValidationError as e:
45+
raise ConfigInvalidError(e)
46+
47+
@classmethod
48+
def from_yaml(cls, path: str) -> Config:
49+
with pathlib.Path(path).open(encoding="utf-8") as f:
50+
try:
51+
data = yaml.safe_load(f) or {}
52+
except yaml.YAMLError as e:
53+
raise ConfigInvalidError(e)
54+
55+
return cls.from_dict(data)
56+
57+
58+
def _run(cmd: list[str]) -> str:
59+
try:
60+
return subprocess.check_output(cmd, text=True, encoding="utf-8").strip()
61+
except subprocess.CalledProcessError as e:
62+
msg = f"Command failed: {' '.join(cmd)}\n{e}"
63+
raise click.ClickException(msg) from e
64+
65+
66+
class MergeQueuePullRequest(typing.TypedDict):
67+
number: int
68+
69+
70+
class MergeQueueBatchFailed(typing.TypedDict):
71+
draft_pr_number: int
72+
checked_pull_request: list[int]
73+
74+
75+
class MergeQueueMetadata(typing.TypedDict):
76+
checking_base_sha: str
77+
pull_requests: list[MergeQueuePullRequest]
78+
previous_failed_batches: list[MergeQueueBatchFailed]
79+
80+
81+
def _yaml_docs_from_fenced_blocks(body: str) -> MergeQueueMetadata | None:
82+
lines = []
83+
found = False
84+
for line in body.splitlines():
85+
if line.startswith("```yaml"):
86+
found = True
87+
elif found:
88+
if line.startswith("```"):
89+
break
90+
lines.append(line)
91+
if lines:
92+
return typing.cast("MergeQueueMetadata", yaml.safe_load("\n".join(lines)))
93+
return None
94+
95+
96+
def _detect_base_from_merge_queue_payload(ev: dict[str, typing.Any]) -> str | None:
97+
pr = ev.get("pull_request")
98+
if not isinstance(pr, dict):
99+
return None
100+
title = pr.get("title") or ""
101+
if not isinstance(title, str):
102+
return None
103+
if not title.startswith("merge-queue: "):
104+
return None
105+
body = pr.get("body") or ""
106+
content = _yaml_docs_from_fenced_blocks(body)
107+
if content:
108+
return content["checking_base_sha"]
109+
return None
110+
111+
112+
def _detect_base_from_event(ev: dict[str, typing.Any]) -> str | None:
113+
pr = ev.get("pull_request")
114+
if isinstance(pr, dict):
115+
sha = pr.get("base", {}).get("sha")
116+
if isinstance(sha, str) and sha:
117+
return sha
118+
return None
119+
120+
121+
def detect_base() -> str:
122+
event_path = os.environ.get("GITHUB_EVENT_PATH")
123+
event: dict[str, typing.Any] | None = None
124+
if event_path and pathlib.Path(event_path).is_file():
125+
try:
126+
with pathlib.Path(event_path).open("r", encoding="utf-8") as f:
127+
event = json.load(f)
128+
except FileNotFoundError:
129+
event = None
130+
131+
if event is not None:
132+
# 0) merge-queue PR override
133+
mq_sha = _detect_base_from_merge_queue_payload(event)
134+
if mq_sha:
135+
return mq_sha
136+
137+
# 1) standard event payload
138+
event_sha = _detect_base_from_event(event)
139+
if event_sha:
140+
return event_sha
141+
142+
# 2) base ref (e.g., PR target branch)
143+
base_ref = os.environ.get("GITHUB_BASE_REF")
144+
if base_ref:
145+
return base_ref
146+
147+
msg = (
148+
"Could not detect base SHA. Ensure checkout has sufficient history "
149+
"(e.g., actions/checkout with fetch-depth: 0) or provide GITHUB_EVENT_PATH / GITHUB_BASE_REF."
150+
)
151+
raise click.ClickException(
152+
msg,
153+
)
154+
155+
156+
def git_changed_files(base: str) -> list[str]:
157+
# Committed changes only between base_sha and HEAD.
158+
# Includes: Added (A), Copied (C), Modified (M), Renamed (R), Type-changed (T), Deleted (D)
159+
# Excludes: Unmerged (U), Unknown (X), Broken (B)
160+
out = _run(
161+
["git", "diff", "--name-only", "--diff-filter=ACMRTD", f"{base}...HEAD"],
162+
)
163+
return [line for line in out.splitlines() if line]
164+
165+
166+
def match_scopes(
167+
config: Config,
168+
files: abc.Iterable[str],
169+
) -> tuple[set[str], dict[str, list[str]]]:
170+
scopes_hit: set[str] = set()
171+
per_scope: dict[str, list[str]] = {s: [] for s in config.scopes}
172+
for f in files:
173+
# NOTE(sileht): we use pathlib.full_match to support **, as fnmatch does not
174+
p = pathlib.PurePosixPath(f)
175+
for scope, scope_config in config.scopes.items():
176+
if not scope_config.include and not scope_config.exclude:
177+
continue
178+
179+
# Check if file matches any include
180+
if scope_config.include:
181+
matches_positive = any(
182+
p.full_match(pat) for pat in scope_config.include
183+
)
184+
else:
185+
matches_positive = True
186+
187+
# Check if file matches any exclude
188+
matches_negative = any(p.full_match(pat) for pat in scope_config.exclude)
189+
190+
# File matches the scope if it matches positive patterns and doesn't match negative patterns
191+
if matches_positive and not matches_negative:
192+
scopes_hit.add(scope)
193+
per_scope[scope].append(f)
194+
return scopes_hit, {k: v for k, v in per_scope.items() if v}
195+
196+
197+
def maybe_write_github_outputs(
198+
all_scopes: abc.Iterable[str],
199+
scopes_hit: set[str],
200+
) -> None:
201+
gha = os.environ.get("GITHUB_OUTPUT")
202+
if not gha:
203+
return
204+
with pathlib.Path(gha).open("a", encoding="utf-8") as fh:
205+
for s in sorted(all_scopes):
206+
key = f"{SCOPE_PREFIX}{s}"
207+
val = "true" if s in scopes_hit else "false"
208+
fh.write(f"{key}={val}\n")
209+
210+
211+
def detect(config_path: str) -> None:
212+
cfg = Config.from_yaml(config_path)
213+
base = detect_base()
214+
changed = git_changed_files(base)
215+
scopes_hit, per_scope = match_scopes(cfg, changed)
216+
217+
click.echo(f"Base: {base}")
218+
if scopes_hit:
219+
click.echo("Scopes touched:")
220+
for s in sorted(scopes_hit):
221+
click.echo(f"- {s}")
222+
if os.environ.get("ACTIONS_STEP_DEBUG") == "true":
223+
for f in sorted(per_scope.get(s, [])):
224+
click.echo(f" {f}")
225+
else:
226+
click.echo("No scopes matched.")
227+
228+
maybe_write_github_outputs(cfg.scopes.keys(), scopes_hit)

mergify_cli/tests/ci/scopes/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)