|
1 | 1 | import re |
2 | 2 | import subprocess |
3 | 3 |
|
4 | | -from datetime import datetime |
| 4 | +from datetime import datetime, timezone |
5 | 5 | from tempfile import mkdtemp |
6 | 6 | from typing import Optional, cast |
7 | 7 | from urllib.parse import urlparse |
|
10 | 10 | from . import Abort |
11 | 11 |
|
12 | 12 |
|
| 13 | +def parse_git_datetime( |
| 14 | + date_str: str, _depth: int = 0, _max_depth: int = 2 |
| 15 | +) -> Optional[datetime]: |
| 16 | + """Parse Git date output into a naive UTC datetime. |
| 17 | +
|
| 18 | + Handles common Git formats and normalizes timezone offsets. |
| 19 | + Returns None if parsing fails. |
| 20 | + """ |
| 21 | + |
| 22 | + def normalize_offset(s: str) -> str: |
| 23 | + match = re.search(r"([+-]\d{2})(:?)(\d{2})$", s) |
| 24 | + if match and not match.group(2): |
| 25 | + # Build prefix separately to avoid an overly long formatted line |
| 26 | + prefix = s[: -len(match.group(0))] |
| 27 | + return f"{prefix}{match.group(1)}:{match.group(3)}" |
| 28 | + return s |
| 29 | + |
| 30 | + cleaned = date_str.strip() |
| 31 | + attempts = [cleaned, normalize_offset(cleaned)] |
| 32 | + formats = ["%Y-%m-%d %H:%M:%S %z", "%a %b %d %H:%M:%S %Y %z"] |
| 33 | + |
| 34 | + for candidate in attempts: |
| 35 | + try: |
| 36 | + dt = datetime.fromisoformat(candidate) |
| 37 | + except ValueError: |
| 38 | + dt = None |
| 39 | + if dt: |
| 40 | + offset = dt.utcoffset() |
| 41 | + if offset: |
| 42 | + dt -= offset |
| 43 | + return dt.replace(tzinfo=None) |
| 44 | + for fmt in formats: |
| 45 | + try: |
| 46 | + dt = datetime.strptime(candidate, fmt) |
| 47 | + except ValueError: |
| 48 | + continue |
| 49 | + return dt.astimezone(timezone.utc).replace(tzinfo=None) |
| 50 | + |
| 51 | + match = re.search( |
| 52 | + r"(\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}[+-]\d{2}:?\d{2})", |
| 53 | + cleaned, |
| 54 | + ) |
| 55 | + if match: |
| 56 | + candidate = normalize_offset(match.group(1)) |
| 57 | + # Prevent infinite recursion: only recurse if normalization changed |
| 58 | + # the matched string and we haven't exceeded a small depth cap. |
| 59 | + if ( |
| 60 | + candidate != match.group(1) |
| 61 | + and candidate != date_str |
| 62 | + and _depth < _max_depth |
| 63 | + ): |
| 64 | + return parse_git_datetime(candidate, _depth + 1, _max_depth) |
| 65 | + return None |
| 66 | + |
| 67 | + return None |
| 68 | + |
| 69 | + |
13 | 70 | class Git: |
14 | 71 | """Provides access to a local Git repository.""" |
15 | 72 |
|
@@ -70,18 +127,36 @@ def command(self, *argv: str, repo: Optional[str] = "") -> str: |
70 | 127 | proc = subprocess.run(args, text=True, capture_output=True) |
71 | 128 | out = proc.stdout.strip() |
72 | 129 | if proc.returncode: |
73 | | - error_msg = f"Git command '{self._sanitize_command(cmd)}' failed" |
| 130 | + err = proc.stderr.strip() |
74 | 131 | if out: |
75 | | - stdout = self._sanitize_command(out) |
76 | | - logger.error(f"stdout: {stdout}") |
77 | | - error_msg += f"\nstdout: {stdout}" |
78 | | - if proc.stderr: |
79 | | - stderr = self._sanitize_command(proc.stderr.strip()) |
80 | | - logger.error(f"stderr: {stderr}") |
81 | | - error_msg += f"\nstderr: {stderr}" |
82 | | - raise Abort(error_msg) |
| 132 | + logger.error(f"stdout: {self._sanitize_command(out)}") |
| 133 | + if err: |
| 134 | + logger.error(f"stderr: {self._sanitize_command(err)}") |
| 135 | + |
| 136 | + detail = err or out |
| 137 | + hint = self._hint_for_failure(detail) |
| 138 | + message = f"Git command '{self._sanitize_command(cmd)}' failed" |
| 139 | + if detail: |
| 140 | + message = f"{message}: {self._sanitize_command(detail)}" |
| 141 | + if hint: |
| 142 | + message = f"{message} ({hint})" |
| 143 | + raise Abort(message) |
83 | 144 | return out |
84 | 145 |
|
| 146 | + def _hint_for_failure(self, detail: str) -> Optional[str]: |
| 147 | + """Return a user-facing hint for common git errors.""" |
| 148 | + lowered = detail.casefold() |
| 149 | + if "permission to" in lowered and "denied" in lowered: |
| 150 | + return "use a PAT with contents:write or a deploy key" |
| 151 | + if "workflow" in lowered or "workflows" in lowered: |
| 152 | + if "refusing" in lowered or "permission" in lowered: |
| 153 | + return "provide workflow scope or avoid workflow changes" |
| 154 | + if "publickey" in lowered or "permission denied (publickey)" in lowered: |
| 155 | + return "configure SSH deploy key or switch to https with PAT" |
| 156 | + if "bad credentials" in lowered or "authentication failed" in lowered: |
| 157 | + return "token is invalid or lacks access" |
| 158 | + return None |
| 159 | + |
85 | 160 | def check(self, *argv: str, repo: Optional[str] = "") -> bool: |
86 | 161 | """Run a Git command, but only return its success status.""" |
87 | 162 | try: |
@@ -177,9 +252,10 @@ def time_of_commit(self, sha: str, repo: str = "") -> datetime: |
177 | 252 | """Get the time that a commit was made.""" |
178 | 253 | # The format %cI is "committer date, strict ISO 8601 format". |
179 | 254 | date = self.command("show", "-s", "--format=%cI", sha, repo=repo) |
180 | | - dt = datetime.fromisoformat(date) |
181 | | - # Convert to UTC and remove time zone information. |
182 | | - offset = dt.utcoffset() |
183 | | - if offset: |
184 | | - dt -= offset |
185 | | - return dt.replace(tzinfo=None) |
| 255 | + parsed = parse_git_datetime(date) |
| 256 | + if not parsed: |
| 257 | + logger.warning( |
| 258 | + "Could not parse git date '%s', using current UTC", date.strip() |
| 259 | + ) |
| 260 | + return datetime.now(timezone.utc).replace(tzinfo=None) |
| 261 | + return parsed |
0 commit comments