|
| 1 | +from datetime import datetime, timedelta |
| 2 | +import jwt |
| 3 | +import os |
| 4 | +import re |
| 5 | +import requests |
| 6 | +import time |
| 7 | +from gitconsensusservice import app |
| 8 | + |
| 9 | + |
| 10 | +def get_jwt(): |
| 11 | + if not os.path.isfile(app.config['GITHUB_PRIVATE_KEY']): |
| 12 | + raise ValueError('Github Application Key not present') |
| 13 | + |
| 14 | + with open(app.config['GITHUB_PRIVATE_KEY'], 'r') as keyfile: |
| 15 | + private_key = keyfile.read() |
| 16 | + |
| 17 | + now = int(time.time()) |
| 18 | + payload = { |
| 19 | + # issued at time |
| 20 | + "iat": now, |
| 21 | + # JWT expiration time (10 minute maximum) |
| 22 | + "exp": now + (10 * 60), |
| 23 | + # GitHub App's identifier |
| 24 | + "iss": app.config['GITHUB_APP_ID'] |
| 25 | + } |
| 26 | + return jwt.encode(payload, private_key, algorithm='RS256').decode("utf-8") |
| 27 | + |
| 28 | + |
| 29 | +def request(url, method='GET'): |
| 30 | + if method == 'GET': |
| 31 | + requestfunc = requests.get |
| 32 | + elif method == 'POST': |
| 33 | + requestfunc = requests.post |
| 34 | + apptoken = get_jwt() |
| 35 | + |
| 36 | + headers = { |
| 37 | + 'Authorization': 'Bearer %s' % (apptoken,), |
| 38 | + 'Accept': 'application/vnd.github.machine-man-preview+json' |
| 39 | + } |
| 40 | + response = requestfunc('https://api.github.com/%s' % (url,), headers=headers) |
| 41 | + response.raise_for_status() |
| 42 | + retobj = response.json() |
| 43 | + if 'Link' in response.headers: |
| 44 | + regex = r"\<https://api.github.com/([^>]*)\>; rel=\"([a-z]*)\"" |
| 45 | + groups = re.findall(regex, response.headers['Link']) |
| 46 | + for group in groups: |
| 47 | + if group[1] == 'next': |
| 48 | + nextresults = request(group[1]) |
| 49 | + retobj += nextresults |
| 50 | + break |
| 51 | + return retobj |
| 52 | + |
| 53 | + |
| 54 | +def get_app(): |
| 55 | + return request('app') |
| 56 | + |
| 57 | + |
| 58 | +def get_installations(): |
| 59 | + installs_url = 'app/installations' |
| 60 | + installations = request(installs_url) |
| 61 | + return [i['id'] for i in installations] |
| 62 | + |
| 63 | + |
| 64 | +def get_installation(installation_id): |
| 65 | + return request('app/installations/%s' % (installation_id,)) |
| 66 | + |
| 67 | + |
| 68 | +tokens = {} |
| 69 | + |
| 70 | + |
| 71 | +def get_installation_token(installation_id): |
| 72 | + if installation_id in tokens: |
| 73 | + expiration = tokens[installation_id]['expires_at'] |
| 74 | + testtime = datetime.now() - timedelta(minutes=3) |
| 75 | + exptime = datetime.strptime(expiration, "%Y-%m-%dT%H:%M:%SZ") |
| 76 | + if exptime < testtime: |
| 77 | + return tokens[installation_id]['token'] |
| 78 | + |
| 79 | + url = "installations/%s/access_tokens" % (installation_id,) |
| 80 | + tokens[installation_id] = request(url, 'POST') |
| 81 | + return tokens[installation_id]['token'] |
0 commit comments