This repository was archived by the owner on Jun 23, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathutils.py
More file actions
100 lines (81 loc) · 3.17 KB
/
utils.py
File metadata and controls
100 lines (81 loc) · 3.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
"""
Module containing util functions to serve pushing telemetry artifacts under
maven.mozilla.org
"""
import json
import mimetypes
import sys
import asyncio
import aiohttp
import boto3
from constants import MIME_MAP, CACHE_CONTROL_MAXAGE
def setup_mimetypes():
mimetypes.init()
# in py3 we must exhaust the map so that add_type is actually invoked
list(map(
lambda ext_mimetype: mimetypes.add_type(ext_mimetype[1], ext_mimetype[0]), MIME_MAP.items()
))
def load_json_or_yaml(string, is_path=False, file_type='json',
message="Failed to load %(file_type)s: %(exc)s"):
_load_fh = json.load
_load_str = json.loads
if is_path:
with open(string, 'r') as fh:
contents = _load_fh(fh)
else:
contents = _load_str(string)
return contents
async def _handle_asyncio_loop(async_main, context):
async with aiohttp.ClientSession() as session:
context.session = session
await async_main(context)
async def _process_future_exceptions(tasks, raise_at_first_error):
succeeded_results = []
error_results = []
if tasks:
await asyncio.wait(tasks)
for task in tasks:
exc = task.exception()
if exc:
if raise_at_first_error:
raise exc
else:
error_results.append(exc)
else:
succeeded_results.append(task.result())
return succeeded_results, error_results
async def raise_future_exceptions(tasks):
succeeded_results, _ = await _process_future_exceptions(tasks, raise_at_first_error=True)
return succeeded_results
async def put(context, url, headers, abs_filename, session=None):
session = session or context.session
with open(abs_filename, "rb") as fh:
async with session.put(url, data=fh, headers=headers, compress=False) as resp:
print(f"put {abs_filename}: {resp.status}")
response_text = await resp.text()
if response_text:
print(response_text)
if resp.status not in (200, 204):
raise Exception(f"Bad status {resp.status}")
return resp
async def upload_to_s3(context, s3_key, path):
mime_type = mimetypes.guess_type(path)[0]
if not mime_type:
raise Exception(f"Unable to discover valid mime-type for path ({path}), "
"mimetypes.guess_type() returned {mime_type}")
api_kwargs = {
'Bucket': context.config['bucket_config'][context.bucket]['buckets']['telemetry'],
'Key': s3_key,
'ContentType': mime_type,
}
headers = {
'Content-Type': mime_type,
'Cache-Control': 'public, max-age=%d' % CACHE_CONTROL_MAXAGE,
}
creds = context.config['bucket_config'][context.bucket]['credentials']
s3 = boto3.client('s3', aws_access_key_id=creds['id'], aws_secret_access_key=creds['key'],)
url = s3.generate_presigned_url('put_object', api_kwargs, ExpiresIn=1800, HttpMethod='PUT')
# FIXME: add proper logging
print(f"upload_to_s3: {path} -> s3://{context.bucket}/{s3_key}")
if not context.dry_run:
await put(context, url, headers, path, session=context.session)