|
| 1 | +import copy |
| 2 | +import logging |
| 3 | +import re |
| 4 | +import traceback |
| 5 | +from functools import reduce |
| 6 | +from typing import List, Set |
| 7 | +from urllib.parse import urljoin, urlparse, ParseResult, urlsplit, urlunparse |
| 8 | + |
| 9 | +import html2text as ht |
| 10 | +import requests |
| 11 | +from bs4 import BeautifulSoup |
| 12 | + |
| 13 | +requests.packages.urllib3.disable_warnings() |
| 14 | + |
| 15 | + |
| 16 | +class ChildLink: |
| 17 | + def __init__(self, url, tag): |
| 18 | + self.url = url |
| 19 | + self.tag = copy.deepcopy(tag) |
| 20 | + |
| 21 | + |
| 22 | +class ForkManage: |
| 23 | + def __init__(self, base_url: str, selector_list: List[str]): |
| 24 | + self.base_url = base_url |
| 25 | + self.selector_list = selector_list |
| 26 | + |
| 27 | + def fork(self, level: int, exclude_link_url: Set[str], fork_handler): |
| 28 | + self.fork_child(ChildLink(self.base_url, None), self.selector_list, level, exclude_link_url, fork_handler) |
| 29 | + |
| 30 | + @staticmethod |
| 31 | + def fork_child(child_link: ChildLink, selector_list: List[str], level: int, exclude_link_url: Set[str], |
| 32 | + fork_handler): |
| 33 | + if level < 0: |
| 34 | + return |
| 35 | + else: |
| 36 | + child_link.url = remove_fragment(child_link.url) |
| 37 | + child_url = child_link.url[:-1] if child_link.url.endswith('/') else child_link.url |
| 38 | + if not exclude_link_url.__contains__(child_url): |
| 39 | + exclude_link_url.add(child_url) |
| 40 | + response = Fork(child_link.url, selector_list).fork() |
| 41 | + fork_handler(child_link, response) |
| 42 | + for child_link in response.child_link_list: |
| 43 | + child_url = child_link.url[:-1] if child_link.url.endswith('/') else child_link.url |
| 44 | + if not exclude_link_url.__contains__(child_url): |
| 45 | + ForkManage.fork_child(child_link, selector_list, level - 1, exclude_link_url, fork_handler) |
| 46 | + |
| 47 | + |
| 48 | +def remove_fragment(url: str) -> str: |
| 49 | + parsed_url = urlparse(url) |
| 50 | + modified_url = ParseResult(scheme=parsed_url.scheme, netloc=parsed_url.netloc, path=parsed_url.path, |
| 51 | + params=parsed_url.params, query=parsed_url.query, fragment=None) |
| 52 | + return urlunparse(modified_url) |
| 53 | + |
| 54 | + |
| 55 | +class Fork: |
| 56 | + class Response: |
| 57 | + def __init__(self, content: str, child_link_list: List[ChildLink], status, message: str): |
| 58 | + self.content = content |
| 59 | + self.child_link_list = child_link_list |
| 60 | + self.status = status |
| 61 | + self.message = message |
| 62 | + |
| 63 | + @staticmethod |
| 64 | + def success(html_content: str, child_link_list: List[ChildLink]): |
| 65 | + return Fork.Response(html_content, child_link_list, 200, '') |
| 66 | + |
| 67 | + @staticmethod |
| 68 | + def error(message: str): |
| 69 | + return Fork.Response('', [], 500, message) |
| 70 | + |
| 71 | + def __init__(self, base_fork_url: str, selector_list: List[str]): |
| 72 | + base_fork_url = remove_fragment(base_fork_url) |
| 73 | + self.base_fork_url = urljoin(base_fork_url if base_fork_url.endswith("/") else base_fork_url + '/', '.') |
| 74 | + parsed = urlsplit(base_fork_url) |
| 75 | + query = parsed.query |
| 76 | + self.base_fork_url = self.base_fork_url[:-1] |
| 77 | + if query is not None and len(query) > 0: |
| 78 | + self.base_fork_url = self.base_fork_url + '?' + query |
| 79 | + self.selector_list = [selector for selector in selector_list if selector is not None and len(selector) > 0] |
| 80 | + self.urlparse = urlparse(self.base_fork_url) |
| 81 | + self.base_url = ParseResult(scheme=self.urlparse.scheme, netloc=self.urlparse.netloc, path='', params='', |
| 82 | + query='', |
| 83 | + fragment='').geturl() |
| 84 | + |
| 85 | + def get_child_link_list(self, bf: BeautifulSoup): |
| 86 | + pattern = "^((?!(http:|https:|tel:/|#|mailto:|javascript:))|" + self.base_fork_url + "|/).*" |
| 87 | + link_list = bf.find_all(name='a', href=re.compile(pattern)) |
| 88 | + result = [ChildLink(link.get('href'), link) if link.get('href').startswith(self.base_url) else ChildLink( |
| 89 | + self.base_url + link.get('href'), link) for link in link_list] |
| 90 | + result = [row for row in result if row.url.startswith(self.base_fork_url)] |
| 91 | + return result |
| 92 | + |
| 93 | + def get_content_html(self, bf: BeautifulSoup): |
| 94 | + if self.selector_list is None or len(self.selector_list) == 0: |
| 95 | + return str(bf) |
| 96 | + params = reduce(lambda x, y: {**x, **y}, |
| 97 | + [{'class_': selector.replace('.', '')} if selector.startswith('.') else |
| 98 | + {'id': selector.replace("#", "")} if selector.startswith("#") else {'name': selector} for |
| 99 | + selector in |
| 100 | + self.selector_list], {}) |
| 101 | + f = bf.find_all(**params) |
| 102 | + return "\n".join([str(row) for row in f]) |
| 103 | + |
| 104 | + @staticmethod |
| 105 | + def reset_url(tag, field, base_fork_url): |
| 106 | + field_value: str = tag[field] |
| 107 | + if field_value.startswith("/"): |
| 108 | + result = urlparse(base_fork_url) |
| 109 | + result_url = ParseResult(scheme=result.scheme, netloc=result.netloc, path=field_value, params='', query='', |
| 110 | + fragment='').geturl() |
| 111 | + else: |
| 112 | + result_url = urljoin( |
| 113 | + base_fork_url + '/' + (field_value if field_value.endswith('/') else field_value + '/'), |
| 114 | + ".") |
| 115 | + result_url = result_url[:-1] if result_url.endswith('/') else result_url |
| 116 | + tag[field] = result_url |
| 117 | + |
| 118 | + def reset_beautiful_soup(self, bf: BeautifulSoup): |
| 119 | + reset_config_list = [ |
| 120 | + { |
| 121 | + 'field': 'href', |
| 122 | + }, |
| 123 | + { |
| 124 | + 'field': 'src', |
| 125 | + } |
| 126 | + ] |
| 127 | + for reset_config in reset_config_list: |
| 128 | + field = reset_config.get('field') |
| 129 | + tag_list = bf.find_all(**{field: re.compile('^(?!(http:|https:|tel:/|#|mailto:|javascript:)).*')}) |
| 130 | + for tag in tag_list: |
| 131 | + self.reset_url(tag, field, self.base_fork_url) |
| 132 | + return bf |
| 133 | + |
| 134 | + @staticmethod |
| 135 | + def get_beautiful_soup(response): |
| 136 | + encoding = response.encoding if response.encoding is not None and response.encoding != 'ISO-8859-1' else response.apparent_encoding |
| 137 | + html_content = response.content.decode(encoding) |
| 138 | + beautiful_soup = BeautifulSoup(html_content, "html.parser") |
| 139 | + meta_list = beautiful_soup.find_all('meta') |
| 140 | + charset_list = [meta.attrs.get('charset') for meta in meta_list if |
| 141 | + meta.attrs is not None and 'charset' in meta.attrs] |
| 142 | + if len(charset_list) > 0: |
| 143 | + charset = charset_list[0] |
| 144 | + if charset != encoding: |
| 145 | + try: |
| 146 | + html_content = response.content.decode(charset) |
| 147 | + except Exception as e: |
| 148 | + logging.getLogger("max_kb").error(f'{e}') |
| 149 | + return BeautifulSoup(html_content, "html.parser") |
| 150 | + return beautiful_soup |
| 151 | + |
| 152 | + def fork(self): |
| 153 | + try: |
| 154 | + |
| 155 | + headers = { |
| 156 | + 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36' |
| 157 | + } |
| 158 | + |
| 159 | + logging.getLogger("max_kb").info(f'fork:{self.base_fork_url}') |
| 160 | + response = requests.get(self.base_fork_url, verify=False, headers=headers) |
| 161 | + if response.status_code != 200: |
| 162 | + logging.getLogger("max_kb").error(f"url: {self.base_fork_url} code:{response.status_code}") |
| 163 | + return Fork.Response.error(f"url: {self.base_fork_url} code:{response.status_code}") |
| 164 | + bf = self.get_beautiful_soup(response) |
| 165 | + except Exception as e: |
| 166 | + logging.getLogger("max_kb_error").error(f'{str(e)}:{traceback.format_exc()}') |
| 167 | + return Fork.Response.error(str(e)) |
| 168 | + bf = self.reset_beautiful_soup(bf) |
| 169 | + link_list = self.get_child_link_list(bf) |
| 170 | + content = self.get_content_html(bf) |
| 171 | + r = ht.html2text(content) |
| 172 | + return Fork.Response.success(r, link_list) |
| 173 | + |
| 174 | + |
| 175 | +def handler(base_url, response: Fork.Response): |
| 176 | + print(base_url.url, base_url.tag.text if base_url.tag else None, response.content) |
| 177 | + |
| 178 | +# ForkManage('https://bbs.fit2cloud.com/c/de/6', ['.md-content']).fork(3, set(), handler) |
0 commit comments