|
| 1 | +import hashlib |
| 2 | +import logging |
| 3 | +import re |
| 4 | +from typing import TYPE_CHECKING, Any |
| 5 | +from xml.etree.ElementTree import Element |
| 6 | + |
| 7 | +from app.library.cache import Cache |
| 8 | +from app.library.Tasks import Task, TaskFailure, TaskItem, TaskResult |
| 9 | +from app.library.Utils import extract_info, get_archive_id |
| 10 | + |
| 11 | +from ._base_handler import BaseHandler |
| 12 | + |
| 13 | +if TYPE_CHECKING: |
| 14 | + from xml.etree.ElementTree import Element |
| 15 | + |
| 16 | +LOG: logging.Logger = logging.getLogger(__name__) |
| 17 | +CACHE: Cache = Cache() |
| 18 | + |
| 19 | + |
| 20 | +class RssGenericHandler(BaseHandler): |
| 21 | + FEED_PATTERN: re.Pattern[str] = re.compile( |
| 22 | + r"\.(rss|atom)(\?.*)?$|handler=rss", |
| 23 | + re.IGNORECASE, |
| 24 | + ) |
| 25 | + |
| 26 | + @staticmethod |
| 27 | + def can_handle(task: Task) -> bool: |
| 28 | + LOG.debug(f"'{task.name}': Checking if task URL is parsable RSS feed: {task.url}") |
| 29 | + return RssGenericHandler.parse(task.url) is not None |
| 30 | + |
| 31 | + @staticmethod |
| 32 | + async def _get( |
| 33 | + task: Task, |
| 34 | + params: dict, |
| 35 | + parsed: dict[str, str], |
| 36 | + ) -> tuple[str, list[dict[str, str]], int]: |
| 37 | + """ |
| 38 | + Fetch the feed and return raw entries. |
| 39 | +
|
| 40 | + Args: |
| 41 | + task (Task): The task containing the feed URL. |
| 42 | + params (dict): The ytdlp options. |
| 43 | + parsed (dict): The parsed URL components (contains 'url' key). |
| 44 | +
|
| 45 | + Returns: |
| 46 | + tuple[str, list[dict[str, str]], int]: The feed URL, list of entry dictionaries, and entry count. |
| 47 | +
|
| 48 | + """ |
| 49 | + from defusedxml.ElementTree import fromstring |
| 50 | + |
| 51 | + feed_url: str = parsed["url"] |
| 52 | + LOG.debug(f"'{task.name}': Fetching RSS/Atom feed from {feed_url}") |
| 53 | + |
| 54 | + response = await RssGenericHandler.request(url=feed_url, ytdlp_opts=params) |
| 55 | + response.raise_for_status() |
| 56 | + |
| 57 | + root: Element = fromstring(response.text) |
| 58 | + |
| 59 | + # Define namespaces for different feed formats |
| 60 | + ns: dict[str, str] = { |
| 61 | + "atom": "http://www.w3.org/2005/Atom", |
| 62 | + "rss": "http://www.rssboard.org/specification", |
| 63 | + "content": "http://purl.org/rss/1.0/modules/content/", |
| 64 | + "media": "http://search.yahoo.com/mrss/", |
| 65 | + } |
| 66 | + |
| 67 | + items: list[dict[str, str]] = [] |
| 68 | + real_count = 0 |
| 69 | + |
| 70 | + # Try to parse as Atom feed first |
| 71 | + entries = root.findall("atom:entry", ns) |
| 72 | + if entries: |
| 73 | + LOG.debug(f"'{task.name}': Detected Atom feed format with {len(entries)} entries") |
| 74 | + for entry in entries: |
| 75 | + link_elem: Element | None = entry.find("atom:link[@rel='alternate']", ns) |
| 76 | + if link_elem is None: |
| 77 | + link_elem = entry.find("atom:link", ns) |
| 78 | + |
| 79 | + url: str = "" |
| 80 | + if link_elem is not None and link_elem.get("href"): |
| 81 | + url = link_elem.get("href", "") |
| 82 | + |
| 83 | + if not url: |
| 84 | + LOG.warning(f"'{task.name}': Atom entry missing URL. Skipping.") |
| 85 | + continue |
| 86 | + |
| 87 | + title_elem: Element | None = entry.find("atom:title", ns) |
| 88 | + title: str = title_elem.text if title_elem is not None and title_elem.text else "" |
| 89 | + |
| 90 | + pub_elem: Element | None = entry.find("atom:published", ns) |
| 91 | + published: str = pub_elem.text if pub_elem is not None and pub_elem.text else "" |
| 92 | + |
| 93 | + real_count += 1 |
| 94 | + items.append({"url": url, "title": title, "published": published}) |
| 95 | + else: |
| 96 | + # Try to parse as RSS feed |
| 97 | + rss_items = root.findall(".//item") |
| 98 | + LOG.debug(f"'{task.name}': Detected RSS feed format with {len(rss_items)} items") |
| 99 | + |
| 100 | + for item in rss_items: |
| 101 | + # Try different link element names (link, url, media:content) |
| 102 | + url: str = "" |
| 103 | + |
| 104 | + link_elem = item.find("link") |
| 105 | + if link_elem is not None and link_elem.text: |
| 106 | + url = link_elem.text |
| 107 | + else: |
| 108 | + # Try media:content |
| 109 | + media_elem = item.find("media:content", ns) |
| 110 | + if media_elem is not None and media_elem.get("url"): |
| 111 | + url = media_elem.get("url", "") |
| 112 | + else: |
| 113 | + # Try enclosure |
| 114 | + enclosure_elem = item.find("enclosure") |
| 115 | + if enclosure_elem is not None and enclosure_elem.get("url"): |
| 116 | + url = enclosure_elem.get("url", "") |
| 117 | + |
| 118 | + if not url: |
| 119 | + LOG.warning(f"'{task.name}': RSS item missing URL. Skipping.") |
| 120 | + continue |
| 121 | + |
| 122 | + title_elem = item.find("title") |
| 123 | + title: str = title_elem.text if title_elem is not None and title_elem.text else "" |
| 124 | + |
| 125 | + pub_elem = item.find("pubDate") |
| 126 | + published: str = pub_elem.text if pub_elem is not None and pub_elem.text else "" |
| 127 | + |
| 128 | + real_count += 1 |
| 129 | + items.append({"url": url, "title": title, "published": published}) |
| 130 | + |
| 131 | + return feed_url, items, real_count |
| 132 | + |
| 133 | + @staticmethod |
| 134 | + async def extract(task: Task) -> TaskResult | TaskFailure: |
| 135 | + """ |
| 136 | + Extract items from an RSS/Atom feed. |
| 137 | +
|
| 138 | + Args: |
| 139 | + task (Task): The task containing the feed URL. |
| 140 | +
|
| 141 | + Returns: |
| 142 | + TaskResult | TaskFailure: Extraction result with parsed items or failure information. |
| 143 | +
|
| 144 | + """ |
| 145 | + parsed: dict[str, str] | None = RssGenericHandler.parse(task.url) |
| 146 | + if not parsed: |
| 147 | + return TaskFailure(message="Unrecognized RSS/Atom feed URL.") |
| 148 | + |
| 149 | + params: dict = task.get_ytdlp_opts().get_all() |
| 150 | + |
| 151 | + try: |
| 152 | + feed_url, items, real_count = await RssGenericHandler._get(task, params, parsed) |
| 153 | + except Exception as exc: |
| 154 | + LOG.exception(exc) |
| 155 | + return TaskFailure(message="Failed to fetch RSS/Atom feed.", error=str(exc)) |
| 156 | + |
| 157 | + task_items: list[TaskItem] = [] |
| 158 | + |
| 159 | + for entry in items: |
| 160 | + if not (url := entry.get("url")): |
| 161 | + continue |
| 162 | + |
| 163 | + # Try to get static archive ID first |
| 164 | + id_dict: dict[str, str | None] = get_archive_id(url=url) |
| 165 | + archive_id: str | None = id_dict.get("archive_id") |
| 166 | + |
| 167 | + # If static archive_id fails, try to fetch it via yt-dlp (like generic.py) |
| 168 | + if not archive_id: |
| 169 | + cache_key: str = hashlib.sha256(f"{task.name}-{url}".encode()).hexdigest() |
| 170 | + |
| 171 | + if CACHE.has(cache_key): |
| 172 | + archive_id = CACHE.get(cache_key) |
| 173 | + if not archive_id: |
| 174 | + LOG.debug(f"'{task.name}': Cached failure for URL '{url}'. Skipping.") |
| 175 | + continue |
| 176 | + else: |
| 177 | + LOG.warning( |
| 178 | + f"'{task.name}': Unable to generate static archive ID for '{url}' in feed. " |
| 179 | + "Doing real request to fetch yt-dlp archive ID." |
| 180 | + ) |
| 181 | + |
| 182 | + info = extract_info( |
| 183 | + config=params, |
| 184 | + url=url, |
| 185 | + no_archive=True, |
| 186 | + no_log=True, |
| 187 | + ) |
| 188 | + |
| 189 | + if not info: |
| 190 | + LOG.error( |
| 191 | + f"'{task.name}': Failed to extract info for URL '{url}' to generate archive ID. Skipping." |
| 192 | + ) |
| 193 | + CACHE.set(cache_key, None) |
| 194 | + continue |
| 195 | + |
| 196 | + if not info.get("id") or not info.get("extractor_key"): |
| 197 | + LOG.error( |
| 198 | + f"'{task.name}': Incomplete info extracted for URL '{url}' to generate archive ID. Skipping." |
| 199 | + ) |
| 200 | + CACHE.set(cache_key, None) |
| 201 | + continue |
| 202 | + |
| 203 | + archive_id = f"{str(info.get('extractor_key', '')).lower()} {info.get('id')}" |
| 204 | + CACHE.set(cache_key, archive_id) |
| 205 | + |
| 206 | + metadata: dict[str, Any] = {k: v for k, v in entry.items() if k not in {"url", "title", "published"}} |
| 207 | + |
| 208 | + task_items.append( |
| 209 | + TaskItem( |
| 210 | + url=url, |
| 211 | + title=entry.get("title"), |
| 212 | + archive_id=archive_id, |
| 213 | + metadata={"published": entry.get("published"), **metadata}, |
| 214 | + ) |
| 215 | + ) |
| 216 | + |
| 217 | + return TaskResult( |
| 218 | + items=task_items, |
| 219 | + metadata={"feed_url": feed_url, "entry_count": real_count}, |
| 220 | + ) |
| 221 | + |
| 222 | + @staticmethod |
| 223 | + def parse(url: str) -> dict[str, str] | None: |
| 224 | + """ |
| 225 | + Parse URL for valid RSS/Atom feed. |
| 226 | +
|
| 227 | + Args: |
| 228 | + url (str): The URL to parse. |
| 229 | +
|
| 230 | + Returns: |
| 231 | + dict[str, str] | None: A dictionary with 'url' key if valid RSS/Atom feed, None otherwise. |
| 232 | +
|
| 233 | + """ |
| 234 | + if not isinstance(url, str) or not url: |
| 235 | + return None |
| 236 | + |
| 237 | + return {"url": url} if RssGenericHandler.FEED_PATTERN.search(url) else None |
| 238 | + |
| 239 | + @staticmethod |
| 240 | + def tests() -> list[tuple[str, bool]]: |
| 241 | + """ |
| 242 | + Test cases for the URL parser. |
| 243 | +
|
| 244 | + Returns: |
| 245 | + list[tuple[str, bool]]: A list of tuples containing the URL and expected result. |
| 246 | +
|
| 247 | + """ |
| 248 | + return [ |
| 249 | + ("https://www.example.com/test.rss", True), |
| 250 | + ("https://www.example.com/test.atom", True), |
| 251 | + ("https://www.example.com/test.atom#handler=rss", True), |
| 252 | + ("https://www.example.com/test.atom?handler=rss", True), |
| 253 | + ("https://www.example.com/feed.rss?version=2.0", True), |
| 254 | + ("https://www.example.com/test.xml", False), |
| 255 | + ("https://www.example.com/channel/UC_x5XG1OV2P6uZZ5FSM9Ttw", False), |
| 256 | + ("https://www.example.com/playlist?list=PLBCF2DAC6FFB574DE", False), |
| 257 | + ("https://www.example.com/user/SomeUser", False), |
| 258 | + ("https://example.com/feed.ATOM", True), |
| 259 | + ("https://example.com/feed.RSS", True), |
| 260 | + ] |
0 commit comments