-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi.py
More file actions
150 lines (121 loc) · 5.1 KB
/
api.py
File metadata and controls
150 lines (121 loc) · 5.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# api.py
import asyncio
import logging
import time
import aiohttp
from tqdm import tqdm
from config import Config
from exceptions import APIError
class RateLimiter:
def __init__(self, rate, interval):
self.rate = rate
self.interval = interval
self.allowance = rate
self.last_check = time.time()
async def acquire(self):
current = time.time()
time_passed = current - self.last_check
self.last_check = current
self.allowance += time_passed * (self.rate / self.interval)
if self.allowance > self.rate:
self.allowance = self.rate
if self.allowance < 1:
await asyncio.sleep(1 - self.allowance)
self.allowance = 0
else:
self.allowance -= 1
class PivotalTrackerAPI:
def __init__(self):
self.base_url = Config.API_BASE_URL
self.headers = {
"X-TrackerToken": Config.API_TOKEN,
"Content-Type": "application/json",
}
self.session = None
self.rate_limiter = RateLimiter(rate=6, interval=5) # 1.2 requests per second
self.global_semaphore = asyncio.Semaphore(4) # Max 4 concurrent requests
async def __aenter__(self):
self.session = aiohttp.ClientSession(headers=self.headers)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def _request(self, method, endpoint, **kwargs):
url = f"{self.base_url}/{endpoint}"
async with self.global_semaphore:
await self.rate_limiter.acquire()
try:
async with self.session.request(method, url, **kwargs) as response:
response.raise_for_status()
return await response.json()
except aiohttp.ClientResponseError as e:
logging.error(f"API request failed: {e}")
raise APIError(f"API request failed: {e}")
async def get_all_projects(self):
return await self._request("GET", "projects")
async def get_project(self, project_id):
return await self._request("GET", f"projects/{project_id}")
async def get_stories(self, project_id):
return await self._paginate(f"projects/{project_id}/stories")
async def get_comments(self, project_id, story_id):
return await self._request(
"GET",
f"projects/{project_id}/stories/{story_id}/comments?fields=id,story_id,text,person_id,created_at,updated_at,file_attachments",
)
async def get_blockers(self, project_id, story_id):
return await self._request(
"GET", f"projects/{project_id}/stories/{story_id}/blockers"
)
async def get_tasks(self, project_id, story_id):
return await self._paginate(f"projects/{project_id}/stories/{story_id}/tasks")
async def get_labels(self, project_id):
return await self._request("GET", f"projects/{project_id}/labels")
async def get_epics(self, project_id):
return await self._request("GET", f"projects/{project_id}/epics")
async def get_project_memberships(self, project_id):
return await self._request("GET", f"projects/{project_id}/memberships")
async def get_project_current_velocity(self, project_id):
return await self._request(
"GET", f"projects/{project_id}?fields=current_velocity"
)
async def get_tasks(self, project_id, story_id):
return await self._request(
"GET", f"projects/{project_id}/stories/{story_id}/tasks"
)
async def get_iterations(self, project_id):
return await self._paginate(
f"projects/{project_id}/iterations?fields=number,start,finish,kind,velocity,team_strength,stories"
)
async def _paginate(self, endpoint, params=None):
if params is None:
params = {}
params["limit"] = 100
params["offset"] = 0
all_items = []
pbar = tqdm(desc=f"Fetching {endpoint.split('/')[-1]} ", unit="items")
while True:
items = await self._request("GET", endpoint, params=params)
if isinstance(items, list):
all_items.extend(items)
pbar.update(len(items))
if len(items) < params["limit"]:
break
params["offset"] += params["limit"]
else:
# If the response is not a list, it's probably a single object
# or doesn't support pagination
pbar.close()
return items
pbar.close()
return all_items
async def download_file(self, url):
async with self.global_semaphore:
await self.rate_limiter.acquire()
try:
async with self.session.get(
f"https://www.pivotaltracker.com{url}"
) as response:
response.raise_for_status()
return await response.read()
except aiohttp.ClientResponseError as e:
logging.error(f"File download failed: {e}")
raise APIError(f"File download failed: {e}")