Skip to content

Commit de84213

Browse files
Add global rate limiter with coordinated backoff
- Add GitHubRateLimiter class for thread-safe API throttling - Share single GitHub client across all workers - Proactively pause when quota drops below 100 - Coordinate all workers to pause together on rate limit hit - Reduce MAX_CONCURRENCY from 10 to 5 - Remove unused itertools and Mapping imports
1 parent 0b46b42 commit de84213

File tree

1 file changed

+97
-23
lines changed

1 file changed

+97
-23
lines changed

gfi/populate.py

Lines changed: 97 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
3-
import itertools
43
import json
54
import random
65
import re
6+
import threading
77
import time
88
from collections import Counter
99
from concurrent.futures import ThreadPoolExecutor
1010
from operator import itemgetter
1111
from os import getenv, path
12-
from typing import TypedDict, Dict, Union, Sequence, Optional, Mapping
12+
from typing import TypedDict, Dict, Union, Sequence, Optional
1313

1414
import toml
1515

@@ -19,7 +19,7 @@
1919
from slugify import slugify
2020
from loguru import logger
2121

22-
MAX_CONCURRENCY = 10 # max number of requests to make to GitHub in parallel (reduced to avoid rate limits)
22+
MAX_CONCURRENCY = 5 # max number of requests to make to GitHub in parallel
2323
REPO_DATA_FILE = "data/repositories.toml"
2424
REPO_GENERATED_DATA_FILE = "data/generated.json"
2525
TAGS_GENERATED_DATA_FILE = "data/tags.json"
@@ -44,6 +44,72 @@ class RepoNotFoundException(Exception):
4444
"""Exception class for repo not found."""
4545

4646

47+
class GitHubRateLimiter:
48+
"""Thread-safe rate limiter for GitHub API requests."""
49+
50+
def __init__(self, client, requests_per_second=1.0):
51+
self._client = client
52+
self._lock = threading.Lock()
53+
self._min_interval = 1.0 / requests_per_second
54+
self._last_request_time = 0.0
55+
self._remaining = None
56+
self._reset_time = None
57+
self._paused_until = 0.0
58+
59+
def acquire(self):
60+
"""Block until it's safe to make an API request."""
61+
with self._lock:
62+
# Check for coordinated pause
63+
if time.time() < self._paused_until:
64+
wait_time = self._paused_until - time.time()
65+
logger.info("Waiting {:.0f}s for rate limit reset", wait_time)
66+
time.sleep(wait_time)
67+
68+
# Refresh rate limit info periodically
69+
if self._remaining is None or self._remaining % 100 == 0:
70+
self._update_rate_limit()
71+
72+
# Proactive pause if quota low
73+
if self._remaining is not None and self._remaining < 100:
74+
if self._reset_time:
75+
wait_time = max(0, self._reset_time - time.time() + 5)
76+
if wait_time > 0:
77+
logger.warning("Low quota ({}). Pausing {:.0f}s", self._remaining, wait_time)
78+
self._paused_until = time.time() + wait_time
79+
time.sleep(wait_time)
80+
self._remaining = None
81+
82+
# Enforce minimum interval
83+
elapsed = time.time() - self._last_request_time
84+
if elapsed < self._min_interval:
85+
time.sleep(self._min_interval - elapsed)
86+
87+
self._last_request_time = time.time()
88+
if self._remaining:
89+
self._remaining -= 1
90+
91+
def _update_rate_limit(self):
92+
try:
93+
info = self._client.rate_limit()['resources']['core']
94+
self._remaining = info['remaining']
95+
self._reset_time = info['reset']
96+
logger.debug("Rate limit: {}/{}", self._remaining, info['limit'])
97+
except Exception as e:
98+
logger.warning("Failed to check rate limit: {}", e)
99+
100+
def report_rate_limit_hit(self):
101+
"""Call when ForbiddenError received - triggers coordinated pause."""
102+
with self._lock:
103+
self._update_rate_limit()
104+
if self._reset_time:
105+
wait_time = max(60, self._reset_time - time.time() + 5)
106+
else:
107+
wait_time = 60
108+
self._paused_until = time.time() + wait_time
109+
self._remaining = 0
110+
logger.warning("Rate limit hit. All workers pause for {:.0f}s", wait_time)
111+
112+
47113
def parse_github_url(url: str) -> dict:
48114
"""Take the GitHub repo URL and return a tuple with owner login and repo name."""
49115
match = GH_URL_PATTERN.search(url)
@@ -60,36 +126,37 @@ class RepositoryIdentifier(TypedDict):
60126
RepositoryInfo = Dict["str", Union[str, int, Sequence]]
61127

62128

63-
def get_repository_info(identifier: RepositoryIdentifier) -> Optional[RepositoryInfo]:
129+
def get_repository_info(
130+
identifier: RepositoryIdentifier,
131+
client,
132+
rate_limiter: GitHubRateLimiter
133+
) -> Optional[RepositoryInfo]:
64134
"""Get the relevant information needed for the repository from its owner login and name."""
65135
owner, name = identifier["owner"], identifier["name"]
66136

67137
logger.info("Getting info for {}/{}", owner, name)
68138

69-
# create a logged in GitHub client
70-
client = login(token=getenv("GH_ACCESS_TOKEN"))
71-
72139
max_retries = 3
73140

74141
for attempt in range(max_retries):
75142
try:
143+
rate_limiter.acquire()
76144
repository = client.repository(owner, name)
77145
# Don't find issues inside archived repos.
78146
if repository.archived:
79147
return None
80148

81-
good_first_issues = set(
82-
itertools.chain.from_iterable(
83-
repository.issues(
84-
labels=label,
85-
state=ISSUE_STATE,
86-
number=ISSUE_LIMIT,
87-
sort=ISSUE_SORT,
88-
direction=ISSUE_SORT_DIRECTION,
89-
)
90-
for label in ISSUE_LABELS
149+
good_first_issues = set()
150+
for label in ISSUE_LABELS:
151+
rate_limiter.acquire()
152+
issues_for_label = repository.issues(
153+
labels=label,
154+
state=ISSUE_STATE,
155+
number=ISSUE_LIMIT,
156+
sort=ISSUE_SORT,
157+
direction=ISSUE_SORT_DIRECTION,
91158
)
92-
)
159+
good_first_issues.update(issues_for_label)
93160
logger.info("\t found {} good first issues", len(good_first_issues))
94161
# check if repo has at least one good first issue
95162
if good_first_issues and repository.language:
@@ -126,11 +193,10 @@ def get_repository_info(identifier: RepositoryIdentifier) -> Optional[Repository
126193
return None
127194

128195
except exceptions.ForbiddenError:
196+
rate_limiter.report_rate_limit_hit()
129197
if attempt < max_retries - 1:
130-
wait_time = 60 * (attempt + 1) # 60s, 120s, 180s
131-
logger.warning("Rate limited on {}/{}. Waiting {}s before retry...",
132-
owner, name, wait_time)
133-
time.sleep(wait_time)
198+
logger.warning("Rate limited on {}/{}. Retrying after coordinated pause...",
199+
owner, name)
134200
else:
135201
logger.error("Rate limit exceeded after {} retries: {}/{}",
136202
max_retries, owner, name)
@@ -175,8 +241,16 @@ def get_repository_info(identifier: RepositoryIdentifier) -> Optional[Repository
175241
# shuffle the order of the repositories
176242
random.shuffle(repositories)
177243

244+
# Create shared client and rate limiter
245+
client = login(token=getenv("GH_ACCESS_TOKEN"))
246+
rate_limiter = GitHubRateLimiter(client, requests_per_second=1.0)
247+
248+
# Wrapper to pass shared dependencies
249+
def process_repo(identifier):
250+
return get_repository_info(identifier, client, rate_limiter)
251+
178252
with ThreadPoolExecutor(max_workers=MAX_CONCURRENCY) as executor:
179-
results = executor.map(get_repository_info, repositories)
253+
results = executor.map(process_repo, repositories)
180254

181255
# filter out repositories with valid data and increment tag counts
182256
for result in results:

0 commit comments

Comments
 (0)