|
1 | 1 | import re |
2 | | -import urllib.parse |
3 | 2 | from datetime import timedelta |
4 | | -from json import JSONDecodeError |
5 | 3 | from types import TracebackType |
6 | 4 | from typing import Any, BinaryIO, Iterator, Type |
7 | 5 | from urllib.parse import urlencode |
|
12 | 10 | from .config import * |
13 | 11 | # To preserve backwards compatibility (as these definitions were previously in this module) |
14 | 12 | from .credentials_provider import * |
15 | | -from .errors import DatabricksError, error_mapper |
16 | | -from .errors.private_link import _is_private_link_redirect |
| 13 | +from .errors import DatabricksError, get_api_error |
| 14 | +from .logger import RoundTrip |
17 | 15 | from .oauth import retrieve_token |
18 | 16 | from .retries import retried |
19 | 17 |
|
@@ -262,134 +260,23 @@ def _perform(self, |
262 | 260 | auth=auth, |
263 | 261 | stream=raw, |
264 | 262 | timeout=self._http_timeout_seconds) |
265 | | - try: |
266 | | - self._record_request_log(response, raw=raw or data is not None or files is not None) |
267 | | - if not response.ok: # internally calls response.raise_for_status() |
268 | | - # TODO: experiment with traceback pruning for better readability |
269 | | - # See https://stackoverflow.com/a/58821552/277035 |
270 | | - payload = response.json() |
271 | | - raise self._make_nicer_error(response=response, **payload) from None |
272 | | - # Private link failures happen via a redirect to the login page. From a requests-perspective, the request |
273 | | - # is successful, but the response is not what we expect. We need to handle this case separately. |
274 | | - if _is_private_link_redirect(response): |
275 | | - raise self._make_nicer_error(response=response) from None |
276 | | - return response |
277 | | - except requests.exceptions.JSONDecodeError: |
278 | | - message = self._make_sense_from_html(response.text) |
279 | | - if not message: |
280 | | - message = response.reason |
281 | | - raise self._make_nicer_error(response=response, message=message) from None |
282 | | - |
283 | | - @staticmethod |
284 | | - def _make_sense_from_html(txt: str) -> str: |
285 | | - matchers = [r'<pre>(.*)</pre>', r'<title>(.*)</title>'] |
286 | | - for attempt in matchers: |
287 | | - expr = re.compile(attempt, re.MULTILINE) |
288 | | - match = expr.search(txt) |
289 | | - if not match: |
290 | | - continue |
291 | | - return match.group(1).strip() |
292 | | - return txt |
293 | | - |
294 | | - def _make_nicer_error(self, *, response: requests.Response, **kwargs) -> DatabricksError: |
295 | | - status_code = response.status_code |
296 | | - message = kwargs.get('message', 'request failed') |
297 | | - is_http_unauthorized_or_forbidden = status_code in (401, 403) |
298 | | - is_too_many_requests_or_unavailable = status_code in (429, 503) |
299 | | - if is_http_unauthorized_or_forbidden: |
300 | | - message = self._cfg.wrap_debug_info(message) |
301 | | - if is_too_many_requests_or_unavailable: |
302 | | - kwargs['retry_after_secs'] = self._parse_retry_after(response) |
303 | | - kwargs['message'] = message |
304 | | - return error_mapper(response, kwargs) |
305 | | - |
306 | | - def _record_request_log(self, response: requests.Response, raw=False): |
| 263 | + self._record_request_log(response, raw=raw or data is not None or files is not None) |
| 264 | + error = get_api_error(response) |
| 265 | + if error is not None: |
| 266 | + status_code = response.status_code |
| 267 | + is_http_unauthorized_or_forbidden = status_code in (401, 403) |
| 268 | + is_too_many_requests_or_unavailable = status_code in (429, 503) |
| 269 | + if is_http_unauthorized_or_forbidden: |
| 270 | + error.message = self._cfg.wrap_debug_info(error.message) |
| 271 | + if is_too_many_requests_or_unavailable: |
| 272 | + error.retry_after_secs = self._parse_retry_after(response) |
| 273 | + raise error from None |
| 274 | + return response |
| 275 | + |
| 276 | + def _record_request_log(self, response: requests.Response, raw: bool = False) -> None: |
307 | 277 | if not logger.isEnabledFor(logging.DEBUG): |
308 | 278 | return |
309 | | - request = response.request |
310 | | - url = urllib.parse.urlparse(request.url) |
311 | | - query = '' |
312 | | - if url.query: |
313 | | - query = f'?{urllib.parse.unquote(url.query)}' |
314 | | - sb = [f'{request.method} {urllib.parse.unquote(url.path)}{query}'] |
315 | | - if self._cfg.debug_headers: |
316 | | - if self._cfg.host: |
317 | | - sb.append(f'> * Host: {self._cfg.host}') |
318 | | - for k, v in request.headers.items(): |
319 | | - sb.append(f'> * {k}: {self._only_n_bytes(v, self._debug_truncate_bytes)}') |
320 | | - if request.body: |
321 | | - sb.append("> [raw stream]" if raw else self._redacted_dump("> ", request.body)) |
322 | | - sb.append(f'< {response.status_code} {response.reason}') |
323 | | - if raw and response.headers.get('Content-Type', None) != 'application/json': |
324 | | - # Raw streams with `Transfer-Encoding: chunked` do not have `Content-Type` header |
325 | | - sb.append("< [raw stream]") |
326 | | - elif response.content: |
327 | | - sb.append(self._redacted_dump("< ", response.content)) |
328 | | - logger.debug("\n".join(sb)) |
329 | | - |
330 | | - @staticmethod |
331 | | - def _mask(m: Dict[str, any]): |
332 | | - for k in m: |
333 | | - if k in {'bytes_value', 'string_value', 'token_value', 'value', 'content'}: |
334 | | - m[k] = "**REDACTED**" |
335 | | - |
336 | | - @staticmethod |
337 | | - def _map_keys(m: Dict[str, any]) -> List[str]: |
338 | | - keys = list(m.keys()) |
339 | | - keys.sort() |
340 | | - return keys |
341 | | - |
342 | | - @staticmethod |
343 | | - def _only_n_bytes(j: str, num_bytes: int = 96) -> str: |
344 | | - diff = len(j.encode('utf-8')) - num_bytes |
345 | | - if diff > 0: |
346 | | - return f"{j[:num_bytes]}... ({diff} more bytes)" |
347 | | - return j |
348 | | - |
349 | | - def _recursive_marshal_dict(self, m, budget) -> dict: |
350 | | - out = {} |
351 | | - self._mask(m) |
352 | | - for k in sorted(m.keys()): |
353 | | - raw = self._recursive_marshal(m[k], budget) |
354 | | - out[k] = raw |
355 | | - budget -= len(str(raw)) |
356 | | - return out |
357 | | - |
358 | | - def _recursive_marshal_list(self, s, budget) -> list: |
359 | | - out = [] |
360 | | - for i in range(len(s)): |
361 | | - if i > 0 >= budget: |
362 | | - out.append("... (%d additional elements)" % (len(s) - len(out))) |
363 | | - break |
364 | | - raw = self._recursive_marshal(s[i], budget) |
365 | | - out.append(raw) |
366 | | - budget -= len(str(raw)) |
367 | | - return out |
368 | | - |
369 | | - def _recursive_marshal(self, v: any, budget: int) -> any: |
370 | | - if isinstance(v, dict): |
371 | | - return self._recursive_marshal_dict(v, budget) |
372 | | - elif isinstance(v, list): |
373 | | - return self._recursive_marshal_list(v, budget) |
374 | | - elif isinstance(v, str): |
375 | | - return self._only_n_bytes(v, self._debug_truncate_bytes) |
376 | | - else: |
377 | | - return v |
378 | | - |
379 | | - def _redacted_dump(self, prefix: str, body: str) -> str: |
380 | | - if len(body) == 0: |
381 | | - return "" |
382 | | - try: |
383 | | - # Unmarshal body into primitive types. |
384 | | - tmp = json.loads(body) |
385 | | - max_bytes = 96 |
386 | | - if self._debug_truncate_bytes > max_bytes: |
387 | | - max_bytes = self._debug_truncate_bytes |
388 | | - # Re-marshal body taking redaction and character limit into account. |
389 | | - raw = self._recursive_marshal(tmp, max_bytes) |
390 | | - return "\n".join([f'{prefix}{line}' for line in json.dumps(raw, indent=2).split("\n")]) |
391 | | - except JSONDecodeError: |
392 | | - return f'{prefix}[non-JSON document of {len(body)} bytes]' |
| 279 | + logger.debug(RoundTrip(response, self._cfg.debug_headers, self._debug_truncate_bytes, raw).generate()) |
393 | 280 |
|
394 | 281 |
|
395 | 282 | class StreamingResponse(BinaryIO): |
|
0 commit comments