-
Notifications
You must be signed in to change notification settings - Fork 356
Expand file tree
/
Copy pathtasks.py
More file actions
232 lines (212 loc) · 12.1 KB
/
tasks.py
File metadata and controls
232 lines (212 loc) · 12.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
import asyncio
from http.client import HTTPException
import logging
import time
from asgiref.sync import async_to_sync, sync_to_async
from boaapi.boa_client import BoaClient, BoaException
from boaapi.status import CompilerStatus, ExecutionStatus
from urllib import request
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode
from addons.boa import settings as boa_settings
from addons.boa.boa_error_code import BoaErrorCode
from framework import sentry
from framework.celery_tasks import app as celery_app
from osf.models import OSFUser, NotificationTypeEnum
from osf.utils.fields import ensure_str, ensure_bytes
from website import settings as osf_settings
logger = logging.getLogger(__name__)
@celery_app.task(name='addons.boa.tasks.submit_to_boa')
def submit_to_boa(host, username, password, user_guid, project_guid,
query_dataset, query_file_name, file_size, file_full_path,
query_download_url, output_upload_url):
"""
Download Boa query file, submit it to Boa API, wait for Boa to finish the job
and upload result output to OSF. Send success / failure emails notifications.
A few Notes:
* All the parameters must be verified by the caller.
* Both the ``query_download_url`` and ``output_upload_url`` must be WB URL for two reasons:
* It generates fewer requests between OSF and WB;
* It has authentication passed via the headers securely.
* Running asyncio in celery is tricky. Refer to the discussion below for details:
* https://stackoverflow.com/questions/39815771/how-to-combine-celery-with-asyncio
"""
return async_to_sync(submit_to_boa_async)(host, username, password, user_guid, project_guid,
query_dataset, query_file_name, file_size, file_full_path,
query_download_url, output_upload_url)
async def submit_to_boa_async(host, username, password, user_guid, project_guid,
query_dataset, query_file_name, file_size, file_full_path,
query_download_url, output_upload_url):
"""
Download Boa query file, submit it to Boa API, wait for Boa to finish the job
and upload result output to OSF. Send success / failure emails notifications.
A couple of notes:
* This is the async function that must be wrapped with ``async_to_sync`` by the caller
* See notes in ``submit_to_boa()`` for details.
"""
logger.debug('>>>>>>>> Task begins')
user = await sync_to_async(OSFUser.objects.get)(guids___id=user_guid)
cookie_value = (await sync_to_async(user.get_or_create_cookie)()).decode()
project_url = f'{osf_settings.DOMAIN}{project_guid}/'
output_file_name = query_file_name.replace('.boa', boa_settings.OUTPUT_FILE_SUFFIX)
if file_size > boa_settings.MAX_SUBMISSION_SIZE:
message = f'Boa query file too large to submit: user=[{user_guid}], project=[{project_guid}], ' \
f'file_name=[{query_file_name}], file_size=[{file_size}], ' \
f'full_path=[{file_full_path}], url=[{query_download_url}] ...'
await sync_to_async(handle_boa_error)(message, BoaErrorCode.FILE_TOO_LARGE_ERROR,
user.username, user.fullname, project_url, file_full_path,
query_file_name=query_file_name, file_size=file_size)
return BoaErrorCode.FILE_TOO_LARGE_ERROR
logger.debug(f'Downloading Boa query file: user=[{user_guid}], project=[{project_guid}], '
f'file_name=[{query_file_name}], full_path=[{file_full_path}], url=[{query_download_url}] ...')
download_request = request.Request(query_download_url)
download_request.add_header('Cookie', f'{osf_settings.COOKIE_NAME}={cookie_value}')
try:
boa_query = ensure_str(request.urlopen(download_request).read())
except (ValueError, HTTPError, URLError, HTTPException):
message = f'Failed to download Boa query file: user=[{user_guid}], project=[{project_guid}], ' \
f'file_name=[{query_file_name}], full_path=[{file_full_path}], url=[{query_download_url}] ...'
await sync_to_async(handle_boa_error)(message, BoaErrorCode.UNKNOWN, user.username, user.fullname,
project_url, file_full_path, query_file_name=query_file_name)
return BoaErrorCode.UNKNOWN
logger.info('Boa query successfully downloaded.')
logger.debug(f'Boa query:\n########\n{boa_query}\n########')
logger.debug('Boa client opened.')
client = BoaClient(endpoint=host)
logger.debug(f'Checking Boa credentials: boa_username=[{username}], boa_host=[{host}] ...')
try:
client.login(username, password)
except BoaException:
# Don't call `client.close()`, since it will fail with `BoaException` if `client.login()` fails
message = f'Boa login failed: boa_username=[{username}], boa_host=[{host}]!'
await sync_to_async(handle_boa_error)(message, BoaErrorCode.AUTHN_ERROR, user.username, user.fullname,
project_url, file_full_path, query_file_name=query_file_name)
return BoaErrorCode.AUTHN_ERROR
logger.info('Boa login completed.')
logger.debug(f'Retrieving Boa dataset: dataset=[{query_dataset}] ...')
try:
dataset = client.get_dataset(query_dataset)
except BoaException:
client.close()
message = f'Failed to retrieve or verify the target Boa dataset: dataset=[{query_dataset}]!'
await sync_to_async(handle_boa_error)(message, BoaErrorCode.UNKNOWN, user.username, user.fullname,
project_url, file_full_path, query_file_name=query_file_name)
return BoaErrorCode.UNKNOWN
logger.info('Boa dataset retrieved.')
logger.debug(f'Submitting the query to Boa API: boa_host=[{host}], dataset=[{query_dataset}] ...')
try:
boa_job = client.query(boa_query, dataset)
start_time = time.time()
except BoaException:
client.close()
message = f'Failed to submit the query to Boa API: : boa_host=[{host}], dataset=[{query_dataset}]!'
await sync_to_async(handle_boa_error)(message, BoaErrorCode.UNKNOWN, user.username, user.fullname,
project_url, file_full_path, query_file_name=query_file_name)
return BoaErrorCode.UNKNOWN
logger.info('Query successfully submitted.')
logger.debug(f'Waiting for job to finish: job_id=[{str(boa_job.id)}] ...')
while boa_job.is_running():
if time.time() - start_time > boa_settings.MAX_JOB_WAITING_TIME:
client.close()
message = f'Boa job did not complete in time: job_id=[{str(boa_job.id)}]!'
await sync_to_async(handle_boa_error)(message, BoaErrorCode.JOB_TIME_OUT_ERROR,
user.username, user.fullname, project_url, file_full_path,
query_file_name=query_file_name, job_id=boa_job.id)
return BoaErrorCode.JOB_TIME_OUT_ERROR
logger.debug(f'Boa job still running, waiting 10s: job_id=[{str(boa_job.id)}] ...')
boa_job.refresh()
await asyncio.sleep(boa_settings.REFRESH_JOB_INTERVAL)
if boa_job.compiler_status is CompilerStatus.ERROR:
client.close()
message = f'Boa job failed with compile error: job_id=[{str(boa_job.id)}]!'
await sync_to_async(handle_boa_error)(message, BoaErrorCode.QUERY_ERROR, user.username,
user.fullname, project_url, file_full_path,
query_file_name=query_file_name, job_id=boa_job.id)
return BoaErrorCode.QUERY_ERROR
elif boa_job.exec_status is ExecutionStatus.ERROR:
client.close()
message = f'Boa job failed with execution error: job_id=[{str(boa_job.id)}]!'
await sync_to_async(handle_boa_error)(message, BoaErrorCode.QUERY_ERROR, user.username,
user.fullname, project_url, file_full_path,
query_file_name=query_file_name, job_id=boa_job.id)
return BoaErrorCode.QUERY_ERROR
else:
try:
boa_job_output = boa_job.output()
except BoaException:
client.close()
message = f'Boa job output is not available: job_id=[{str(boa_job.id)}]!'
await sync_to_async(handle_boa_error)(message, BoaErrorCode.OUTPUT_ERROR, user.username,
user.fullname, project_url, file_full_path,
query_file_name=query_file_name, job_id=boa_job.id)
return BoaErrorCode.OUTPUT_ERROR
logger.info('Boa job finished.')
logger.debug(f'Boa job output: job_id=[{str(boa_job.id)}]\n########\n{boa_job_output}\n########')
client.close()
logger.debug('Boa client closed.')
logger.debug(f'Uploading Boa query output to OSF: name=[{output_file_name}], upload_url=[{output_upload_url}] ...')
try:
output_query_param = urlencode({'name': output_file_name})
upload_request = request.Request(f'{output_upload_url}&{output_query_param}')
upload_request.method = 'PUT'
upload_request.data = ensure_bytes(boa_job_output)
upload_request.add_header('Cookie', f'{osf_settings.COOKIE_NAME}={cookie_value}')
request.urlopen(upload_request)
except (ValueError, HTTPError, URLError, HTTPException) as e:
message = f'Failed to upload query output file to OSF: ' \
f'name=[{output_file_name}], user=[{user_guid}], url=[{output_upload_url}]!'
error_code = BoaErrorCode.UPLOAD_ERROR_OTHER
if isinstance(e, HTTPError):
message += f', http_error=[{e.code}: {e.reason}]'
if e.code == 409:
error_code = BoaErrorCode.UPLOAD_ERROR_CONFLICT
await sync_to_async(handle_boa_error)(message, error_code, user.username, user.fullname, project_url,
file_full_path, query_file_name=query_file_name,
output_file_name=output_file_name, job_id=boa_job.id)
return error_code
logger.info('Successfully uploaded query output to OSF.')
logger.debug('Task ends <<<<<<<<')
NotificationTypeEnum.ADDONS_BOA_JOB_COMPLETE.instance.emit(
user=user,
event_context={
'user_fullname': user.fullname,
'query_file_name': query_file_name,
'query_file_full_path': file_full_path,
'output_file_name': output_file_name,
'job_id': boa_job.id,
'project_url': project_url,
'boa_job_list_url': boa_settings.BOA_JOB_LIST_URL,
'boa_support_email': boa_settings.BOA_SUPPORT_EMAIL,
'osf_support_email': osf_settings.OSF_SUPPORT_EMAIL,
}
)
return BoaErrorCode.NO_ERROR
def handle_boa_error(message, code, username, fullname, project_url, query_file_full_path,
query_file_name=None, file_size=None, output_file_name=None, job_id=None):
"""Handle Boa and WB API errors and send emails.
"""
logger.error(message)
try:
sentry.log_message(message, skip_session=True)
except Exception:
pass
NotificationTypeEnum.ADDONS_BOA_JOB_FAILURE.instance.emit(
destination_address=username,
event_context={
'user_fullname': fullname,
'code': code,
'query_file_name': query_file_name,
'file_size': file_size,
'message': message,
'max_file_size': boa_settings.MAX_SUBMISSION_SIZE,
'query_file_full_path': query_file_full_path,
'output_file_name': output_file_name,
'job_id': job_id,
'max_job_wait_hours': boa_settings.MAX_JOB_WAITING_TIME / 3600,
'project_url': project_url,
'boa_job_list_url': boa_settings.BOA_JOB_LIST_URL,
'boa_support_email': boa_settings.BOA_SUPPORT_EMAIL,
'osf_support_email': osf_settings.OSF_SUPPORT_EMAIL,
}
)
return code