|
| 1 | +import json |
| 2 | +import logging |
| 3 | +from http.client import HTTPResponse |
| 4 | +from typing import Dict, Union |
| 5 | +from urllib.error import HTTPError |
| 6 | +from urllib.request import Request, urlopen |
| 7 | + |
| 8 | +from slack.errors import SlackRequestError |
| 9 | +from .webhook_response import WebhookResponse |
| 10 | +from ..web import convert_bool_to_0_or_1, get_user_agent |
| 11 | +from ..web.classes.blocks import Block |
| 12 | + |
| 13 | + |
| 14 | +class WebhookClient: |
| 15 | + logger = logging.getLogger(__name__) |
| 16 | + |
| 17 | + def __init__( |
| 18 | + self, url: str, default_headers: Dict[str, str] = {}, |
| 19 | + ): |
| 20 | + """urllib-based API client. |
| 21 | + :param default_headers: request headers to add to all requests |
| 22 | + """ |
| 23 | + self.url = url |
| 24 | + self.default_headers = default_headers |
| 25 | + |
| 26 | + def send( |
| 27 | + self, body: Dict[str, any], additional_headers: Dict[str, str] = {}, |
| 28 | + ) -> WebhookResponse: |
| 29 | + """Performs a Slack API request and returns the result. |
| 30 | + :param url: a complete URL (e.g., https://hooks.slack.com/XXX) |
| 31 | + :param json_body: json data structure (it's still a dict at this point), |
| 32 | + if you give this argument, body_params and files will be skipped |
| 33 | + :param body_params: form params |
| 34 | + :param additional_headers: request headers to append |
| 35 | + :return: API response |
| 36 | + """ |
| 37 | + |
| 38 | + body = convert_bool_to_0_or_1(body) |
| 39 | + self._parse_blocks(body) |
| 40 | + if self.logger.level <= logging.DEBUG: |
| 41 | + self.logger.debug( |
| 42 | + f"Slack API Request - url: {self.url}, " |
| 43 | + f"body: {body}, " |
| 44 | + f"additional_headers: {additional_headers}" |
| 45 | + ) |
| 46 | + |
| 47 | + request_headers = self._build_request_headers( |
| 48 | + has_json=json is not None, additional_headers=additional_headers, |
| 49 | + ) |
| 50 | + args = { |
| 51 | + "headers": request_headers, |
| 52 | + "body": body, |
| 53 | + } |
| 54 | + return self._perform_http_request(url=self.url, args=args) |
| 55 | + |
| 56 | + def _perform_http_request( |
| 57 | + self, *, url: str, args: Dict[str, Dict[str, any]] |
| 58 | + ) -> WebhookResponse: |
| 59 | + """Performs an HTTP request and parses the response. |
| 60 | + :param url: a complete URL (e.g., https://www.slack.com/api/chat.postMessage) |
| 61 | + :param args: args has "headers", "data", "params", and "json" |
| 62 | + "headers": Dict[str, str] |
| 63 | + "params": Dict[str, str], |
| 64 | + "json": Dict[str, any], |
| 65 | + :return: a tuple (HTTP response and its body) |
| 66 | + """ |
| 67 | + headers = args["headers"] |
| 68 | + body = json.dumps(args["body"]).encode("utf-8") |
| 69 | + headers["Content-Type"] = "application/json;charset=utf-8" |
| 70 | + |
| 71 | + try: |
| 72 | + if url.lower().startswith("http"): |
| 73 | + req = Request(method="POST", url=url, data=body, headers=headers) |
| 74 | + else: |
| 75 | + raise SlackRequestError(f"Invalid URL detected: {url}") |
| 76 | + resp: HTTPResponse = urlopen(req) |
| 77 | + charset = resp.headers.get_content_charset() or "utf-8" |
| 78 | + return WebhookResponse( |
| 79 | + url=self.url, |
| 80 | + status_code=resp.status, |
| 81 | + body=resp.read().decode(charset), |
| 82 | + headers=resp.headers, |
| 83 | + ) |
| 84 | + except HTTPError as e: |
| 85 | + charset = e.headers.get_content_charset() or "utf-8" |
| 86 | + resp = WebhookResponse( |
| 87 | + url=self.url, |
| 88 | + status_code=e.code, |
| 89 | + body=e.read().decode(charset), |
| 90 | + headers=e.headers, |
| 91 | + ) |
| 92 | + if e.code == 429: |
| 93 | + resp.headers["Retry-After"] = resp.headers["retry-after"] |
| 94 | + return resp |
| 95 | + |
| 96 | + except Exception as err: |
| 97 | + self.logger.error(f"Failed to send a request to Slack API server: {err}") |
| 98 | + raise err |
| 99 | + |
| 100 | + def _build_request_headers( |
| 101 | + self, has_json: bool, additional_headers: dict, |
| 102 | + ): |
| 103 | + headers = { |
| 104 | + "User-Agent": get_user_agent(), |
| 105 | + "Content-Type": "application/x-www-form-urlencoded;charset=utf-8", |
| 106 | + } |
| 107 | + headers.update(self.default_headers) |
| 108 | + if additional_headers: |
| 109 | + headers.update(additional_headers) |
| 110 | + if has_json: |
| 111 | + headers.update({"Content-Type": "application/json;charset=utf-8"}) |
| 112 | + return headers |
| 113 | + |
| 114 | + @staticmethod |
| 115 | + def _parse_blocks(body): |
| 116 | + blocks = body.get("blocks", None) |
| 117 | + |
| 118 | + def to_dict(b: Union[Dict, Block]): |
| 119 | + if isinstance(b, Block): |
| 120 | + return b.to_dict() |
| 121 | + return b |
| 122 | + |
| 123 | + if blocks is not None and isinstance(blocks, list): |
| 124 | + dict_blocks = [to_dict(b) for b in blocks] |
| 125 | + body.update({"blocks": dict_blocks}) |
0 commit comments