|
| 1 | +# 05.07.24 |
| 2 | + |
| 3 | +import re |
| 4 | +import logging |
| 5 | + |
| 6 | + |
| 7 | +# External libraries |
| 8 | +import httpx |
| 9 | +import jsbeautifier |
| 10 | +from bs4 import BeautifulSoup |
| 11 | + |
| 12 | + |
| 13 | +# Internal utilities |
| 14 | +from StreamingCommunity.Util.config_json import config_manager |
| 15 | +from StreamingCommunity.Util.headers import get_userAgent |
| 16 | + |
| 17 | + |
| 18 | +# Variable |
| 19 | +MAX_TIMEOUT = config_manager.get_int("REQUESTS", "timeout") |
| 20 | + |
| 21 | + |
| 22 | +class VideoSource: |
| 23 | + STAYONLINE_BASE_URL = "https://stayonline.pro" |
| 24 | + MIXDROP_BASE_URL = "https://mixdrop.sb" |
| 25 | + |
| 26 | + def __init__(self, url: str): |
| 27 | + self.url = url |
| 28 | + self.redirect_url: str | None = None |
| 29 | + self._init_headers() |
| 30 | + |
| 31 | + def _init_headers(self) -> None: |
| 32 | + """Initialize the base headers used for requests.""" |
| 33 | + self.headers = { |
| 34 | + 'origin': self.STAYONLINE_BASE_URL, |
| 35 | + 'user-agent': get_userAgent(), |
| 36 | + } |
| 37 | + |
| 38 | + def _get_mixdrop_headers(self) -> dict: |
| 39 | + """Get headers specifically for MixDrop requests.""" |
| 40 | + return { |
| 41 | + 'referer': 'https://mixdrop.club/', |
| 42 | + 'user-agent': get_userAgent() |
| 43 | + } |
| 44 | + |
| 45 | + def get_redirect_url(self) -> str: |
| 46 | + """Extract the stayonline redirect URL from the initial page.""" |
| 47 | + try: |
| 48 | + response = httpx.get(self.url, headers=self.headers, follow_redirects=True, timeout=MAX_TIMEOUT) |
| 49 | + response.raise_for_status() |
| 50 | + soup = BeautifulSoup(response.text, "html.parser") |
| 51 | + |
| 52 | + for link in soup.find_all('a'): |
| 53 | + href = link.get('href') |
| 54 | + if href and 'stayonline' in href: |
| 55 | + self.redirect_url = href |
| 56 | + logging.info(f"Redirect URL: {self.redirect_url}") |
| 57 | + return self.redirect_url |
| 58 | + |
| 59 | + raise ValueError("Stayonline URL not found") |
| 60 | + |
| 61 | + except Exception as e: |
| 62 | + logging.error(f"Error getting redirect URL: {e}") |
| 63 | + raise |
| 64 | + |
| 65 | + def get_link_id(self) -> str: |
| 66 | + """Extract the link ID from the redirect page.""" |
| 67 | + if not self.redirect_url: |
| 68 | + raise ValueError("Redirect URL not set. Call get_redirect_url first.") |
| 69 | + |
| 70 | + try: |
| 71 | + response = httpx.get(self.redirect_url, headers=self.headers, follow_redirects=True, timeout=MAX_TIMEOUT) |
| 72 | + response.raise_for_status() |
| 73 | + soup = BeautifulSoup(response.text, "html.parser") |
| 74 | + |
| 75 | + for script in soup.find_all('script'): |
| 76 | + match = re.search(r'var\s+linkId\s*=\s*"([^"]+)"', script.text) |
| 77 | + if match: |
| 78 | + return match.group(1) |
| 79 | + |
| 80 | + raise ValueError("LinkId not found") |
| 81 | + |
| 82 | + except Exception as e: |
| 83 | + logging.error(f"Error getting link ID: {e}") |
| 84 | + raise |
| 85 | + |
| 86 | + def get_final_url(self, link_id: str) -> str: |
| 87 | + """Get the final URL using the link ID.""" |
| 88 | + try: |
| 89 | + self.headers['referer'] = f'{self.STAYONLINE_BASE_URL}/l/{link_id}/' |
| 90 | + data = {'id': link_id, 'ref': ''} |
| 91 | + |
| 92 | + response = httpx.post(f'{self.STAYONLINE_BASE_URL}/ajax/linkView.php', headers=self.headers, data=data, timeout=MAX_TIMEOUT) |
| 93 | + response.raise_for_status() |
| 94 | + return response.json()['data']['value'] |
| 95 | + |
| 96 | + except Exception as e: |
| 97 | + logging.error(f"Error getting final URL: {e}") |
| 98 | + raise |
| 99 | + |
| 100 | + def _extract_video_id(self, final_url: str) -> str: |
| 101 | + """Extract video ID from the final URL.""" |
| 102 | + parts = final_url.split('/') |
| 103 | + if len(parts) < 5: |
| 104 | + raise ValueError("Invalid final URL format") |
| 105 | + return parts[4] |
| 106 | + |
| 107 | + def _extract_delivery_url(self, script_text: str) -> str: |
| 108 | + """Extract delivery URL from beautified JavaScript.""" |
| 109 | + beautified = jsbeautifier.beautify(script_text) |
| 110 | + for line in beautified.splitlines(): |
| 111 | + if 'MDCore.wurl' in line: |
| 112 | + url = line.split('= ')[1].strip('"').strip(';') |
| 113 | + return f"https:{url}" |
| 114 | + raise ValueError("Delivery URL not found in script") |
| 115 | + |
| 116 | + def get_playlist(self) -> str: |
| 117 | + """ |
| 118 | + Execute the entire flow to obtain the final video URL. |
| 119 | + Returns: |
| 120 | + str: The final video delivery URL |
| 121 | + """ |
| 122 | + self.get_redirect_url() |
| 123 | + link_id = self.get_link_id() |
| 124 | + |
| 125 | + final_url = self.get_final_url(link_id) |
| 126 | + video_id = self._extract_video_id(final_url) |
| 127 | + |
| 128 | + response = httpx.get( |
| 129 | + f'{self.MIXDROP_BASE_URL}/e/{video_id}', |
| 130 | + headers=self._get_mixdrop_headers(), |
| 131 | + timeout=MAX_TIMEOUT |
| 132 | + ) |
| 133 | + response.raise_for_status() |
| 134 | + soup = BeautifulSoup(response.text, "html.parser") |
| 135 | + |
| 136 | + script_text = next( |
| 137 | + (script.text for script in soup.find_all('script') |
| 138 | + if "eval" in str(script.text)), |
| 139 | + None |
| 140 | + ) |
| 141 | + |
| 142 | + if not script_text: |
| 143 | + raise ValueError("Required script not found") |
| 144 | + |
| 145 | + return self._extract_delivery_url(script_text).replace('"', '') |
0 commit comments