Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 79 additions & 16 deletions tagbot/action/git.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import re
import subprocess

from datetime import datetime
from datetime import datetime, timezone
from tempfile import mkdtemp
from typing import Optional, cast
from urllib.parse import urlparse
Expand All @@ -10,6 +10,50 @@
from . import Abort


def parse_git_datetime(date_str: str) -> Optional[datetime]:
"""Parse Git date output into a naive UTC datetime.

Handles common Git formats and normalizes timezone offsets.
Returns None if parsing fails.
"""

def normalize_offset(s: str) -> str:
match = re.search(r"([+-]\d{2})(:?)(\d{2})$", s)
if match and not match.group(2):
return f"{s[:- len(match.group(0))]}{match.group(1)}:{match.group(3)}"
return s

cleaned = date_str.strip()
attempts = [cleaned, normalize_offset(cleaned)]
formats = ["%Y-%m-%d %H:%M:%S %z", "%a %b %d %H:%M:%S %Y %z"]

for candidate in attempts:
try:
dt = datetime.fromisoformat(candidate)
except ValueError:
dt = None
if dt:
offset = dt.utcoffset()
if offset:
dt -= offset
return dt.replace(tzinfo=None)
for fmt in formats:
try:
dt = datetime.strptime(candidate, fmt)
except ValueError:
continue
return dt.astimezone(timezone.utc).replace(tzinfo=None)

match = re.search(
r"(\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}[+-]\d{2}:?\d{2})",
cleaned,
)
if match:
return parse_git_datetime(normalize_offset(match.group(1)))

return None


class Git:
"""Provides access to a local Git repository."""

Expand Down Expand Up @@ -70,18 +114,36 @@ def command(self, *argv: str, repo: Optional[str] = "") -> str:
proc = subprocess.run(args, text=True, capture_output=True)
out = proc.stdout.strip()
if proc.returncode:
error_msg = f"Git command '{self._sanitize_command(cmd)}' failed"
err = proc.stderr.strip()
if out:
stdout = self._sanitize_command(out)
logger.error(f"stdout: {stdout}")
error_msg += f"\nstdout: {stdout}"
if proc.stderr:
stderr = self._sanitize_command(proc.stderr.strip())
logger.error(f"stderr: {stderr}")
error_msg += f"\nstderr: {stderr}"
raise Abort(error_msg)
logger.error(f"stdout: {self._sanitize_command(out)}")
if err:
logger.error(f"stderr: {self._sanitize_command(err)}")

detail = err or out
hint = self._hint_for_failure(detail)
message = f"Git command '{self._sanitize_command(cmd)}' failed"
if detail:
message = f"{message}: {self._sanitize_command(detail)}"
if hint:
message = f"{message} ({hint})"
raise Abort(message)
return out

def _hint_for_failure(self, detail: str) -> Optional[str]:
"""Return a user-facing hint for common git errors."""
lowered = detail.casefold()
if "permission to" in lowered and "denied" in lowered:
return "use a PAT with contents:write or a deploy key"
if "workflow" in lowered or "workflows" in lowered:
if "refusing" in lowered or "permission" in lowered:
return "provide workflow scope or avoid workflow changes"
if "publickey" in lowered or "permission denied (publickey)" in lowered:
return "configure SSH deploy key or switch to https with PAT"
if "bad credentials" in lowered or "authentication failed" in lowered:
return "token is invalid or lacks access"
return None
Comment on lines +146 to +158
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new _hint_for_failure method has multiple conditional branches for different error scenarios (permission denied, publickey, bad credentials), but the test suite only covers the workflow permission scenario. Consider adding tests for the other hint branches to ensure comprehensive test coverage.

Copilot uses AI. Check for mistakes.

def check(self, *argv: str, repo: Optional[str] = "") -> bool:
"""Run a Git command, but only return its success status."""
try:
Expand Down Expand Up @@ -177,9 +239,10 @@ def time_of_commit(self, sha: str, repo: str = "") -> datetime:
"""Get the time that a commit was made."""
# The format %cI is "committer date, strict ISO 8601 format".
date = self.command("show", "-s", "--format=%cI", sha, repo=repo)
dt = datetime.fromisoformat(date)
# Convert to UTC and remove time zone information.
offset = dt.utcoffset()
if offset:
dt -= offset
return dt.replace(tzinfo=None)
parsed = parse_git_datetime(date)
if not parsed:
logger.warning(
"Could not parse git date '%s', using current UTC", date.strip()
)
return datetime.utcnow()
return parsed
62 changes: 46 additions & 16 deletions tagbot/action/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from .. import logger
from . import TAGBOT_WEB, Abort, InvalidProject
from .changelog import Changelog
from .git import Git
from .git import Git, parse_git_datetime

GitlabClient: Any = None
GitlabUnknown: Any = None
Expand Down Expand Up @@ -809,12 +809,10 @@ def _build_commit_datetime_cache(self, shas: List[str]) -> None:
if len(parts) == 2:
commit_sha, iso_date = parts
if commit_sha in sha_set:
# Parse ISO 8601 date and convert to UTC without timezone
dt = datetime.fromisoformat(iso_date)
offset = dt.utcoffset()
if offset:
dt = dt - offset
dt = dt.replace(tzinfo=None)
dt = parse_git_datetime(iso_date)
if not dt:
logger.debug("Could not parse git log date '%s'", iso_date)
continue
self.__commit_datetimes[commit_sha] = dt
found += 1
if found >= len(uncached):
Expand Down Expand Up @@ -1480,15 +1478,40 @@ def create_release(self, version: str, sha: str, is_latest: bool = True) -> None
# Use make_latest=False for backfilled old releases to avoid marking them
# as the "Latest" release on GitHub
make_latest_str = "true" if is_latest else "false"
self._repo.create_git_release(
version_tag,
version_tag,
log,
target_commitish=target,
draft=self._draft,
make_latest=make_latest_str,
generate_release_notes=(self._changelog_format == "github"),
)

def _release_already_exists(exc: GithubException) -> bool:
data = getattr(exc, "data", {}) or {}
for err in data.get("errors", []):
if isinstance(err, dict) and err.get("code") == "already_exists":
return True
return "already exists" in str(exc)

try:
self._repo.create_git_release(
version_tag,
version_tag,
log,
target_commitish=target,
draft=self._draft,
make_latest=make_latest_str,
generate_release_notes=(self._changelog_format == "github"),
)
except GithubException as e:
if e.status == 422 and _release_already_exists(e):
logger.info(f"Release for tag {version_tag} already exists, skipping")
return
if e.status == 403 and "resource not accessible" in str(e).lower():
logger.error(
"Release creation blocked: token lacks required permissions. "
"Use a PAT with contents:write (and workflows if tagging "
"workflow changes)."
)
if e.status == 401:
logger.error(
"Release creation failed: bad credentials. Refresh the token or "
"use a PAT with repo scope."
)
raise
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new error handling for 401 and 403 status codes in the create_release method lacks test coverage. While these branches log helpful error messages before re-raising the exception, they should be tested to ensure the error messages are logged correctly and the exception is properly re-raised.

Copilot uses AI. Check for mistakes.
logger.info(f"GitHub release {version_tag} created successfully")

def _check_rate_limit(self) -> None:
Expand Down Expand Up @@ -1523,6 +1546,13 @@ def handle_error(self, e: Exception, *, raise_abort: bool = True) -> None:
logger.warning("GitHub returned a 5xx error code")
logger.info(trace)
allowed = True
elif e.status == 401:
logger.error(
"GitHub returned 401 Bad credentials. Verify that your token "
"is valid and has access to the repository and registry."
)
internal = False
allowed = False
elif e.status == 403:
self._check_rate_limit()
logger.error(
Expand Down
33 changes: 22 additions & 11 deletions test/action/test_changelog.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import textwrap

from datetime import datetime, timedelta, timezone
from unittest.mock import Mock
from unittest.mock import Mock, patch

import yaml

Expand All @@ -12,7 +12,12 @@
from tagbot.action.repo import Repo


def _changelog(*, template="", ignore=set(), subdir=None):
@patch("tagbot.action.repo.Github")
def _changelog(mock_gh, *, template="", ignore=set(), subdir=None):
mock_gh_instance = Mock()
mock_gh.return_value = mock_gh_instance
mock_repo = Mock()
mock_gh_instance.get_repo.return_value = mock_repo
r = Repo(
repo="",
registry="",
Expand All @@ -31,6 +36,10 @@ def _changelog(*, template="", ignore=set(), subdir=None):
branch=None,
subdir=subdir,
)
# Mock get_all_tags to return empty list (tests override as needed)
r.get_all_tags = Mock(return_value=[])
# Mock _build_tags_cache to return empty dict
r._build_tags_cache = Mock(return_value={})
return r._changelog


Expand All @@ -41,12 +50,16 @@ def test_slug():

def test_previous_release():
c = _changelog()
tags = ["ignore", "v1.2.4-ignore", "v1.2.3", "v1.2.2", "v1.0.2", "v1.0.10"]
tags = [
"ignore",
"v1.2.4-ignore",
"v1.2.3",
"v1.2.2",
"v1.0.2",
"v1.0.10",
]
c._repo.get_all_tags = Mock(return_value=tags)
# Mock get_release to return a minimal release-like object
c._repo._repo.get_release = Mock(
side_effect=lambda tag: type("obj", (object,), {"tag_name": tag})()
)
c._repo._repo.get_release = Mock(side_effect=lambda t: Mock(tag_name=t))
assert c._previous_release("v1.0.0") is None
assert c._previous_release("v1.0.2") is None
rel = c._previous_release("v1.2.5")
Expand Down Expand Up @@ -95,10 +108,7 @@ def test_previous_release_subdir():
"Foo-v2.0.0",
]
c._repo.get_all_tags = Mock(return_value=tags)
# Mock get_release to return a minimal release-like object
c._repo._repo.get_release = Mock(
side_effect=lambda tag: type("obj", (object,), {"tag_name": tag})()
)
c._repo._repo.get_release = Mock(side_effect=lambda t: Mock(tag_name=t))
assert c._previous_release("Foo-v1.0.0") is None
assert c._previous_release("Foo-v1.0.2") is None
rel = c._previous_release("Foo-v1.2.5")
Expand Down Expand Up @@ -249,6 +259,7 @@ def test_collect_data():
c = _changelog()
c._repo._repo = Mock(full_name="A/B.jl", html_url="https://github.com/A/B.jl")
c._repo._project = Mock(return_value="B")
c._repo.is_version_yanked = Mock(return_value=False)
c._previous_release = Mock(
side_effect=[
Mock(tag_name="v1.2.2", created_at=datetime.now(timezone.utc)),
Expand Down
23 changes: 22 additions & 1 deletion test/action/test_git.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def test_command(run):
run.return_value.configure_mock(stderr="err\n", returncode=1)
with pytest.raises(Abort) as exc_info:
g.command("d")
assert "stderr: err" in str(exc_info.value)
assert str(exc_info.value) == "Git command 'git -C dir d' failed: err"


def test_check():
Expand Down Expand Up @@ -165,3 +165,24 @@ def test_time_of_commit():
g = _git(command="2019-12-22T12:49:26+07:00")
assert g.time_of_commit("a") == datetime(2019, 12, 22, 5, 49, 26)
g.command.assert_called_with("show", "-s", "--format=%cI", "a", repo="")


@patch("subprocess.run")
def test_command_includes_hint(run):
g = Git("", "Foo/Bar", "", "user", "email")
g._Git__dir = "dir"
run.return_value.configure_mock(
stdout="",
stderr="refusing to allow a GitHub App to update workflow",
returncode=1,
)
with pytest.raises(Abort) as exc_info:
g.command("push", "origin", "v1")
assert "workflow" in str(exc_info.value)
assert "provide workflow scope" in str(exc_info.value)


def test_time_of_commit_fallback_formats():
g = _git(command=["2019-12-22 12:49:26 +0000", "Mon Dec 23 12:00:00 2024 +0000"])
assert g.time_of_commit("a") == datetime(2019, 12, 22, 12, 49, 26)
assert g.time_of_commit("b") == datetime(2024, 12, 23, 12, 0, 0)
17 changes: 17 additions & 0 deletions test/action/test_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,23 @@ def test_create_release_skips_existing():
r._repo.create_git_release.assert_called()


def test_create_release_handles_existing_release_error():
r = _repo(user="user", email="email")
r._commit_sha_of_release_branch = Mock(return_value=None)
r._git.create_tag = Mock()
r._repo = Mock(default_branch="default")
r._repo.get_releases = Mock(return_value=[])
r._changelog.get = Mock(return_value="l")
r._repo.create_git_release.side_effect = GithubException(
422, {"errors": [{"code": "already_exists"}]}, {}
)

r.create_release("v1.0.0", "abc123")

r._git.create_tag.assert_called_once()
r._repo.create_git_release.assert_called_once()


def test_create_release_subdir():
r = _repo(user="user", email="email", subdir="path/to/Foo.jl")
r._commit_sha_of_release_branch = Mock(return_value="a")
Expand Down