Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 84 additions & 9 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
import re
import sys
import time
from datetime import datetime, timezone
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Iterator, Optional
from urllib.parse import parse_qs, urlparse

Expand All @@ -23,6 +24,79 @@
BUG_RE = re.compile(r"\b(?:bug|b=)\s*#?(\d+)\b", re.I)


@dataclass(frozen=True)
class _AccessToken:
token: str
expires_at: datetime


_access_token_cache: Optional[_AccessToken] = None


def get_installation_access_token(
jwt: str,
repo: str,
github_api_url: Optional[str] = None,
) -> str:
"""
Get a GitHub App installation access token, returning a cached one if still valid.

Uses the JWT to look up the installation for the given repo, then exchanges
it for an installation access token (valid for 1 hour). The token is cached
and reused until it has less than 60 seconds remaining.

Args:
jwt: GitHub App JWT (stored in GITHUB_TOKEN env var)
repo: Repository in "owner/repo" format, used to look up the installation
github_api_url: Optional custom GitHub API URL (for testing/mocking)

Returns:
Installation access token string
"""
global _access_token_cache
logger = logging.getLogger(__name__)

now = datetime.now(timezone.utc)
if (
_access_token_cache is not None
and _access_token_cache.expires_at > now + timedelta(seconds=60)
):
return _access_token_cache.token

logger.info("Fetching new GitHub App installation access token")
api_base = github_api_url or "https://api.github.com"
headers = {
"Authorization": f"Bearer {jwt}",
"Accept": "application/vnd.github+json",
}

resp = requests.get(f"{api_base}/repos/{repo}/installation", headers=headers)
if resp.status_code != 200:
raise SystemExit(
f"Failed to get GitHub App installation for {repo}: "
f"{resp.status_code}: {resp.text}"
)
installation_id = resp.json()["id"]

resp = requests.post(
f"{api_base}/app/installations/{installation_id}/access_tokens",
headers=headers,
)
if resp.status_code != 201:
raise SystemExit(
f"Failed to get installation access token: "
f"{resp.status_code}: {resp.text}"
)

data = resp.json()
_access_token_cache = _AccessToken(
token=data["token"],
expires_at=datetime.fromisoformat(data["expires_at"]),
)
logger.info(f"Obtained new access token, expires at {_access_token_cache.expires_at}")
return _access_token_cache.token


def setup_logging() -> None:
"""Configure logging for the ETL process."""
logging.basicConfig(
Expand Down Expand Up @@ -479,11 +553,10 @@ def main() -> int:

logger.info("Starting GitHub ETL process with chunked processing")

github_token = os.environ.get("GITHUB_TOKEN")
if not github_token:
github_jwt = os.environ.get("GITHUB_TOKEN") or None
if not github_jwt:
logger.warning(
"Warning: No token provided. You will hit very low rate "
+ "limits and private repos won't work."
"GITHUB_TOKEN not set; proceeding without authentication (suitable for testing only)"
)

# Read BigQuery configuration
Expand All @@ -495,7 +568,7 @@ def main() -> int:
if not bigquery_dataset:
raise SystemExit("Environment variable BIGQUERY_DATASET is required")

# Setup GitHub session
# Setup GitHub session (auth header set per-repo using installation access token)
session = requests.Session()
session.headers.update(
{
Expand All @@ -504,9 +577,6 @@ def main() -> int:
}
)

if github_token:
session.headers["Authorization"] = f"Bearer {github_token}"

# Support custom GitHub API URL for testing/mocking
github_api_url = os.environ.get("GITHUB_API_URL")
if github_api_url:
Expand Down Expand Up @@ -538,6 +608,11 @@ def main() -> int:
total_processed = 0

for repo in github_repos:
# Get (or refresh) the installation access token before processing each repo
if github_jwt:
access_token = get_installation_access_token(github_jwt, repo, github_api_url)
session.headers["Authorization"] = f"Bearer {access_token}"

for chunk_count, chunk in enumerate(
extract_pull_requests(
session, repo, chunk_size=100, github_api_url=github_api_url
Expand Down
Loading