1111import re
1212import sys
1313import time
14- from datetime import datetime , timezone
14+ from dataclasses import dataclass
15+ from datetime import datetime , timedelta , timezone
1516from typing import Iterator , Optional
1617from urllib .parse import parse_qs , urlparse
1718
2223
2324BUG_RE = re .compile (r"\b(?:bug|b=)\s*#?(\d+)\b" , re .I )
2425
26+ logger = logging .getLogger (__name__ )
27+
28+
29+ @dataclass (frozen = True )
30+ class AccessToken :
31+ token : str
32+ expires_at : datetime
33+
34+
35+ access_token_cache : dict [int , AccessToken ] = {}
36+ repo_installation_cache : dict [str , int ] = {}
37+
38+
39+ def get_installation_access_token (
40+ jwt : str ,
41+ repo : str ,
42+ github_api_url : str ,
43+ ) -> str :
44+ """
45+ Get a GitHub App installation access token, returning a cached one if still valid.
46+
47+ Uses the JWT to look up the installation for the given repo, then exchanges
48+ it for an installation access token (valid for 1 hour). Tokens are cached
49+ per installation ID so that repos sharing an installation reuse the same token,
50+ while repos on different installations each get their own. The repo->installation
51+ ID mapping is also cached since it never changes.
52+
53+ Args:
54+ jwt: GitHub App JWT (stored in GITHUB_TOKEN env var)
55+ repo: Repository in "owner/repo" format, used to look up the installation
56+ github_api_url: GitHub API base URL
57+
58+ Returns:
59+ Installation access token string
60+ """
61+
62+ session = requests .Session ()
63+ session .headers .update (
64+ {
65+ "Authorization" : f"Bearer { jwt } " ,
66+ "Accept" : "application/vnd.github+json" ,
67+ "X-GitHub-Api-Version" : "2022-11-28" ,
68+ }
69+ )
70+
71+ installation_id = repo_installation_cache .get (repo )
72+ if installation_id is None :
73+ resp = session .get (f"{ github_api_url } /repos/{ repo } /installation" )
74+ if (
75+ resp .status_code == 403
76+ and int (resp .headers .get ("X-RateLimit-Remaining" , "1" )) == 0
77+ ):
78+ sleep_for_rate_limit (resp )
79+ resp = session .get (f"{ github_api_url } /repos/{ repo } /installation" )
80+ if resp .status_code != 200 :
81+ raise RuntimeError (
82+ f"Failed to get GitHub App installation for { repo } : "
83+ f"{ resp .status_code } : { resp .text } "
84+ )
85+ try :
86+ installation_id = resp .json ()["id" ]
87+ except (requests .exceptions .JSONDecodeError , KeyError ) as e :
88+ raise RuntimeError (
89+ f"Failed to parse installation response for { repo } : { e } : { resp .text } "
90+ )
91+ repo_installation_cache [repo ] = installation_id
92+
93+ now = datetime .now (timezone .utc )
94+ cached = access_token_cache .get (installation_id )
95+ if cached is not None and cached .expires_at > now + timedelta (seconds = 60 ):
96+ logger .info (
97+ f"Reusing cached access token for installation { installation_id } , "
98+ f"expires at { cached .expires_at } "
99+ )
100+ return cached .token
101+
102+ logger .info (
103+ f"Fetching new GitHub App installation access token for installation { installation_id } "
104+ )
105+ resp = session .post (
106+ f"{ github_api_url } /app/installations/{ installation_id } /access_tokens" ,
107+ )
108+ if (
109+ resp .status_code == 403
110+ and int (resp .headers .get ("X-RateLimit-Remaining" , "1" )) == 0
111+ ):
112+ sleep_for_rate_limit (resp )
113+ resp = session .post (
114+ f"{ github_api_url } /app/installations/{ installation_id } /access_tokens" ,
115+ )
116+ if resp .status_code != 201 :
117+ raise RuntimeError (
118+ f"Failed to get installation access token: { resp .status_code } : { resp .text } "
119+ )
120+
121+ try :
122+ data = resp .json ()
123+ except requests .exceptions .JSONDecodeError as e :
124+ raise RuntimeError (f"Failed to parse access token response: { e } : { resp .text } " )
125+ try :
126+ access_token = AccessToken (
127+ token = data ["token" ],
128+ expires_at = datetime .fromisoformat (data ["expires_at" ]),
129+ )
130+ except KeyError as e :
131+ raise RuntimeError (
132+ f"Unexpected access token response structure, missing key { e } : { resp .text } "
133+ )
134+ except ValueError as e :
135+ raise RuntimeError (f"Invalid expires_at format in access token response: { e } " )
136+ access_token_cache [installation_id ] = access_token
137+ logger .info (f"Obtained new access token, expires at { access_token .expires_at } " )
138+ return access_token .token
139+
25140
26141def setup_logging () -> None :
27142 """Configure logging for the ETL process."""
@@ -37,7 +152,7 @@ def extract_pull_requests(
37152 session : requests .Session ,
38153 repo : str ,
39154 chunk_size : int = 100 ,
40- github_api_url : Optional [ str ] = None ,
155+ github_api_url : str = "https://api.github.com" ,
41156) -> Iterator [list [dict ]]:
42157 """
43158 Extract data from GitHub repositories in chunks.
@@ -48,17 +163,14 @@ def extract_pull_requests(
48163 session: Authenticated requests session
49164 repo: GitHub repository name
50165 chunk_size: Number of PRs to yield per chunk (default: 100)
51- github_api_url: Optional custom GitHub API URL (for testing/mocking)
166+ github_api_url: GitHub API base URL
52167
53168 Yields:
54169 List of pull request dictionaries (up to chunk_size items)
55170 """
56- logger = logging .getLogger (__name__ )
57171 logger .info ("Starting data extraction from GitHub repositories" )
58172
59- # Support custom API URL for mocking/testing
60- api_base = github_api_url or "https://api.github.com"
61- base_url = f"{ api_base } /repos/{ repo } /pulls"
173+ base_url = f"{ github_api_url } /repos/{ repo } /pulls"
62174 params : dict = {
63175 "state" : "all" ,
64176 "per_page" : chunk_size ,
@@ -132,7 +244,7 @@ def extract_commits(
132244 session : requests .Session ,
133245 repo : str ,
134246 pr_number : int ,
135- github_api_url : Optional [ str ] = None ,
247+ github_api_url : str ,
136248) -> list [dict ]:
137249 """
138250 Extract commits and files for a specific pull request.
@@ -141,16 +253,13 @@ def extract_commits(
141253 session: Authenticated requests session
142254 repo: GitHub repository name
143255 pr_number: Pull request number
144- github_api_url: Optional custom GitHub API URL (for testing/mocking)
256+ github_api_url: GitHub API base URL
145257 Returns:
146258 List of commit dictionaries for the pull request
147259 """
148- logger = logging .getLogger (__name__ )
149260 logger .info (f"Extracting commits for PR #{ pr_number } " )
150261
151- # Support custom API URL for mocking/testing
152- api_base = github_api_url or "https://api.github.com"
153- commits_url = f"{ api_base } /repos/{ repo } /pulls/{ pr_number } /commits"
262+ commits_url = f"{ github_api_url } /repos/{ repo } /pulls/{ pr_number } /commits"
154263
155264 logger .info (f"Commits URL: { commits_url } " )
156265
@@ -159,7 +268,7 @@ def extract_commits(
159268 commits = resp .json ()
160269 for commit in commits :
161270 commit_sha = commit .get ("sha" )
162- commit_url = f"{ api_base } /repos/{ repo } /commits/{ commit_sha } "
271+ commit_url = f"{ github_api_url } /repos/{ repo } /commits/{ commit_sha } "
163272 commit_data = github_get (session , commit_url ).json ()
164273 commit ["files" ] = commit_data .get ("files" , [])
165274
@@ -171,7 +280,7 @@ def extract_reviewers(
171280 session : requests .Session ,
172281 repo : str ,
173282 pr_number : int ,
174- github_api_url : Optional [ str ] = None ,
283+ github_api_url : str ,
175284) -> list [dict ]:
176285 """
177286 Extract reviewers for a specific pull request.
@@ -180,16 +289,13 @@ def extract_reviewers(
180289 session: Authenticated requests session
181290 repo: GitHub repository name
182291 pr_number: Pull request number
183- github_api_url: Optional custom GitHub API URL (for testing/mocking)
292+ github_api_url: GitHub API base URL
184293 Returns:
185294 List of reviewer dictionaries for the pull request
186295 """
187- logger = logging .getLogger (__name__ )
188296 logger .info (f"Extracting reviewers for PR #{ pr_number } " )
189297
190- # Support custom API URL for mocking/testing
191- api_base = github_api_url or "https://api.github.com"
192- reviewers_url = f"{ api_base } /repos/{ repo } /pulls/{ pr_number } /reviews"
298+ reviewers_url = f"{ github_api_url } /repos/{ repo } /pulls/{ pr_number } /reviews"
193299
194300 logger .info (f"Reviewers URL: { reviewers_url } " )
195301
@@ -203,7 +309,7 @@ def extract_comments(
203309 session : requests .Session ,
204310 repo : str ,
205311 pr_number : int ,
206- github_api_url : Optional [ str ] = None ,
312+ github_api_url : str ,
207313) -> list [dict ]:
208314 """
209315 Extract comments for a specific pull request.
@@ -212,16 +318,13 @@ def extract_comments(
212318 session: Authenticated requests session
213319 repo: GitHub repository name
214320 pr_number: Pull request number
215- github_api_url: Optional custom GitHub API URL (for testing/mocking)
321+ github_api_url: GitHub API base URL
216322 Returns:
217323 List of comment dictionaries for the pull request
218324 """
219- logger = logging .getLogger (__name__ )
220325 logger .info (f"Extracting comments for PR #{ pr_number } " )
221326
222- # Support custom API URL for mocking/testing
223- api_base = github_api_url or "https://api.github.com"
224- comments_url = f"{ api_base } /repos/{ repo } /issues/{ pr_number } /comments"
327+ comments_url = f"{ github_api_url } /repos/{ repo } /issues/{ pr_number } /comments"
225328
226329 logger .info (f"Comments URL: { comments_url } " )
227330
@@ -286,7 +389,6 @@ def transform_data(raw_data: list[dict], repo: str) -> dict:
286389 Returns:
287390 List of transformed pull requests, commits, reviewers, and comments ready for BigQuery
288391 """
289- logger = logging .getLogger (__name__ )
290392 logger .info (f"Starting data transformation for { len (raw_data )} PRs" )
291393
292394 transformed_data : dict = {
@@ -417,7 +519,6 @@ def load_data(
417519 transformed_data: Dictionary containing tables ('pull_requests',
418520 'commits', 'reviewers', 'comments') mapped to lists of row dictionaries
419521 """
420- logger = logging .getLogger (__name__ )
421522
422523 if not transformed_data :
423524 logger .warning ("No data to load, skipping" )
@@ -466,27 +567,34 @@ def main() -> int:
466567 4. Repeat until no more data
467568 """
468569 setup_logging ()
469- logger = logging .getLogger (__name__ )
570+ try :
571+ return _main ()
572+ except RuntimeError as e :
573+ logger .error (str (e ))
574+ return 1
575+
470576
577+ def _main () -> int :
471578 logger .info ("Starting GitHub ETL process with chunked processing" )
472579
473- github_token = os .environ .get ("GITHUB_TOKEN" )
474- if not github_token :
580+ github_jwt = os .environ .get ("GITHUB_TOKEN" ) or None
581+ if not github_jwt :
475582 logger .warning (
476- "Warning: No token provided. You will hit very low rate "
477- + "limits and private repos won't work. "
583+ "GITHUB_TOKEN (expected to be a GitHub App JWT, not a personal access token) "
584+ "is not set; proceeding without authentication (suitable for testing only) "
478585 )
479586
480587 # Read BigQuery configuration
481588 bigquery_project = os .environ .get ("BIGQUERY_PROJECT" )
482589 bigquery_dataset = os .environ .get ("BIGQUERY_DATASET" )
483590
484591 if not bigquery_project :
485- raise SystemExit ("Environment variable BIGQUERY_PROJECT is required" )
592+ raise RuntimeError ("Environment variable BIGQUERY_PROJECT is required" )
486593 if not bigquery_dataset :
487- raise SystemExit ("Environment variable BIGQUERY_DATASET is required" )
594+ raise RuntimeError ("Environment variable BIGQUERY_DATASET is required" )
488595
489- # Setup GitHub session
596+ # Setup GitHub session; the Authorization header is updated before each repo using
597+ # an installation access token (which may be cached)
490598 session = requests .Session ()
491599 session .headers .update (
492600 {
@@ -495,12 +603,8 @@ def main() -> int:
495603 }
496604 )
497605
498- if github_token :
499- session .headers ["Authorization" ] = f"Bearer { github_token } "
500-
501- # Support custom GitHub API URL for testing/mocking
502- github_api_url = os .environ .get ("GITHUB_API_URL" )
503- if github_api_url :
606+ github_api_url = os .environ .get ("GITHUB_API_URL" , "https://api.github.com" )
607+ if os .environ .get ("GITHUB_API_URL" ):
504608 logger .info (f"Using custom GitHub API URL: { github_api_url } " )
505609
506610 # Setup BigQuery client
@@ -522,13 +626,20 @@ def main() -> int:
522626 if github_repos_str :
523627 github_repos = github_repos_str .split ("," )
524628 else :
525- raise SystemExit (
629+ raise RuntimeError (
526630 "Environment variable GITHUB_REPOS is required (format: 'owner/repo,owner/repo')"
527631 )
528632
529633 total_processed = 0
530634
531635 for repo in github_repos :
636+ # Get (or refresh) the installation access token before processing each repo
637+ if github_jwt :
638+ access_token = get_installation_access_token (
639+ github_jwt , repo , github_api_url
640+ )
641+ session .headers ["Authorization" ] = f"Bearer { access_token } "
642+
532643 for chunk_count , chunk in enumerate (
533644 extract_pull_requests (
534645 session , repo , chunk_size = 100 , github_api_url = github_api_url
0 commit comments