forked from luci/luci-py
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbase_handler.py
More file actions
executable file
·288 lines (226 loc) · 8.79 KB
/
base_handler.py
File metadata and controls
executable file
·288 lines (226 loc) · 8.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
#!/usr/bin/env python
# Copyright 2010 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base handler class for all mapreduce handlers."""
# pylint: disable=protected-access
# pylint: disable=g-bad-name
# pylint: disable=g-import-not-at-top
import httplib
import logging
try:
import json
except ImportError:
import simplejson as json
try:
from mapreduce import pipeline_base
except ImportError:
pipeline_base = None
try:
# Check if the full cloudstorage package exists. The stub part is in runtime.
import cloudstorage
if hasattr(cloudstorage, "_STUB"):
cloudstorage = None
except ImportError:
cloudstorage = None
from google.appengine.ext import webapp
from mapreduce import errors
from mapreduce import json_util
from mapreduce import model
from mapreduce import parameters
class Error(Exception):
"""Base-class for exceptions in this module."""
class BadRequestPathError(Error):
"""The request path for the handler is invalid."""
class TaskQueueHandler(webapp.RequestHandler):
"""Base class for handlers intended to be run only from the task queue.
Sub-classes should implement
1. the 'handle' method for all POST request.
2. '_preprocess' method for decoding or validations before handle.
3. '_drop_gracefully' method if _preprocess fails and the task has to
be dropped.
In Python27 runtime, webapp2 will automatically replace webapp.
"""
_DEFAULT_USER_AGENT = "App Engine Python MR"
def __init__(self, *args, **kwargs):
# webapp framework invokes initialize after __init__.
# webapp2 framework invokes initialize within __init__.
# Python27 runtime swap webapp with webapp2 underneath us.
# Since initialize will conditionally change this field,
# it needs to be set before calling super's __init__.
self._preprocess_success = False
super(TaskQueueHandler, self).__init__(*args, **kwargs)
if cloudstorage:
cloudstorage.set_default_retry_params(
cloudstorage.RetryParams(
min_retries=5,
max_retries=10,
urlfetch_timeout=parameters._GCS_URLFETCH_TIMEOUT_SEC,
save_access_token=True,
_user_agent=self._DEFAULT_USER_AGENT))
def initialize(self, request, response):
"""Initialize.
1. call webapp init.
2. check request is indeed from taskqueue.
3. check the task has not been retried too many times.
4. run handler specific processing logic.
5. run error handling logic if precessing failed.
Args:
request: a webapp.Request instance.
response: a webapp.Response instance.
"""
super(TaskQueueHandler, self).initialize(request, response)
# Check request is from taskqueue.
if "X-AppEngine-QueueName" not in self.request.headers:
logging.error(self.request.headers)
logging.error("Task queue handler received non-task queue request")
self.response.set_status(
403, message="Task queue handler received non-task queue request")
return
# Check task has not been retried too many times.
if self.task_retry_count() + 1 > parameters.config.TASK_MAX_ATTEMPTS:
logging.error(
"Task %s has been attempted %s times. Dropping it permanently.",
self.request.headers["X-AppEngine-TaskName"],
self.task_retry_count() + 1)
self._drop_gracefully()
return
try:
self._preprocess()
self._preprocess_success = True
# pylint: disable=bare-except
except:
self._preprocess_success = False
logging.error(
"Preprocess task %s failed. Dropping it permanently.",
self.request.headers["X-AppEngine-TaskName"])
self._drop_gracefully()
def post(self):
if self._preprocess_success:
self.handle()
def handle(self):
"""To be implemented by subclasses."""
raise NotImplementedError()
def _preprocess(self):
"""Preprocess.
This method is called after webapp initialization code has been run
successfully. It can thus access self.request, self.response and so on.
"""
pass
def _drop_gracefully(self):
"""Drop task gracefully.
When preprocess failed, this method is called before the task is dropped.
"""
pass
def task_retry_count(self):
"""Number of times this task has been retried."""
return int(self.request.headers.get("X-AppEngine-TaskExecutionCount", 0))
def retry_task(self):
"""Ask taskqueue to retry this task.
Even though raising an exception can cause a task retry, it
will flood logs with highly visible ERROR logs. Handlers should uses
this method to perform controlled task retries. Only raise exceptions
for those deserve ERROR log entries.
"""
self.response.set_status(httplib.SERVICE_UNAVAILABLE, "Retry task")
self.response.clear()
class JsonHandler(webapp.RequestHandler):
"""Base class for JSON handlers for user interface.
Sub-classes should implement the 'handle' method. They should put their
response data in the 'self.json_response' dictionary. Any exceptions raised
by the sub-class implementation will be sent in a JSON response with the
name of the error_class and the error_message.
"""
def __init__(self, *args):
"""Initializer."""
super(JsonHandler, self).__init__(*args)
self.json_response = {}
def base_path(self):
"""Base path for all mapreduce-related urls.
JSON handlers are mapped to /base_path/command/command_name thus they
require special treatment.
Raises:
BadRequestPathError: if the path does not end with "/command".
Returns:
The base path.
"""
path = self.request.path
base_path = path[:path.rfind("/")]
if not base_path.endswith("/command"):
raise BadRequestPathError(
"Json handlers should have /command path prefix")
return base_path[:base_path.rfind("/")]
def _handle_wrapper(self):
"""The helper method for handling JSON Post and Get requests."""
if self.request.headers.get("X-Requested-With") != "XMLHttpRequest":
logging.error("Got JSON request with no X-Requested-With header")
self.response.set_status(
403, message="Got JSON request with no X-Requested-With header")
return
self.json_response.clear()
try:
self.handle()
except errors.MissingYamlError:
logging.debug("Could not find 'mapreduce.yaml' file.")
self.json_response.clear()
self.json_response["error_class"] = "Notice"
self.json_response["error_message"] = "Could not find 'mapreduce.yaml'"
except Exception, e:
logging.exception("Error in JsonHandler, returning exception.")
# TODO(user): Include full traceback here for the end-user.
self.json_response.clear()
self.json_response["error_class"] = e.__class__.__name__
self.json_response["error_message"] = str(e)
self.response.headers["Content-Type"] = "text/javascript"
try:
output = json.dumps(self.json_response, cls=json_util.JsonEncoder)
# pylint: disable=broad-except
except Exception, e:
logging.exception("Could not serialize to JSON")
self.response.set_status(500, message="Could not serialize to JSON")
return
else:
self.response.out.write(output)
def handle(self):
"""To be implemented by sub-classes."""
raise NotImplementedError()
class PostJsonHandler(JsonHandler):
"""JSON handler that accepts POST requests."""
def post(self):
self._handle_wrapper()
class GetJsonHandler(JsonHandler):
"""JSON handler that accepts GET posts."""
def get(self):
self._handle_wrapper()
class HugeTaskHandler(TaskQueueHandler):
"""Base handler for processing HugeTasks."""
class _RequestWrapper(object):
"""Container of a request and associated parameters."""
def __init__(self, request):
self._request = request
self._params = model.HugeTask.decode_payload(request)
def get(self, name, default=""):
return self._params.get(name, default)
def set(self, name, value):
self._params[name] = value
def __getattr__(self, name):
return getattr(self._request, name)
def __init__(self, *args, **kwargs):
super(HugeTaskHandler, self).__init__(*args, **kwargs)
def _preprocess(self):
self.request = self._RequestWrapper(self.request)
if pipeline_base:
# For backward compatiblity.
PipelineBase = pipeline_base.PipelineBase
else:
PipelineBase = None