|
1 | 1 | #!/usr/bin/python3 |
2 | | - |
| 2 | +import argparse |
3 | 3 | import datetime |
| 4 | +import os |
| 5 | +import re |
| 6 | +from concurrent.futures import ThreadPoolExecutor, as_completed |
4 | 7 |
|
5 | 8 | import requests |
6 | 9 | from bs4 import BeautifulSoup |
7 | | -from cli import get_args |
8 | | -from config import * |
9 | | -from scraper import Scraper |
| 10 | +from fake_useragent import FakeUserAgent |
| 11 | +from tqdm import tqdm |
| 12 | + |
| 13 | +ua = FakeUserAgent() |
| 14 | + |
| 15 | +FOLDER = os.path.expanduser("~") + '/dox/med' |
| 16 | +HEADERS = { |
| 17 | + "User-Agent": ua.random, |
| 18 | + "Accept-Encoding": "identity" |
| 19 | + } |
| 20 | +DECOR = ' \033[34;1m::\033[0m ' |
| 21 | +EXTENSIONS = ["pdf", "ppt", "doc"] |
| 22 | + |
| 23 | + |
| 24 | +def get_args(): |
| 25 | + parser = argparse.ArgumentParser( |
| 26 | + description='API to download lectures off msc-mu.com') |
| 27 | + parser.add_argument( |
| 28 | + '-t', '--category', type=int, metavar='', help='to specify category number' |
| 29 | + ) |
| 30 | + parser.add_argument( |
| 31 | + '-c', '--course', type=int, metavar='', help='to specify course number' |
| 32 | + ) |
| 33 | + parser.add_argument( |
| 34 | + '-f', '--folder', type=str, metavar='', help='to specify destination folder' |
| 35 | + ) |
| 36 | + parser.add_argument( |
| 37 | + '-d', '--default-folder', action='store_true', help='Use default folder' |
| 38 | + ) |
| 39 | + parser.add_argument( |
| 40 | + '-v', '--verbose', action='store_true', help='Increase Verbosity' |
| 41 | + ) |
| 42 | + return parser.parse_args() |
| 43 | + |
| 44 | + |
| 45 | + |
| 46 | + |
| 47 | +class Scraper: |
| 48 | + downloaded_count = 0 |
| 49 | + |
| 50 | + def __init__(self, args): |
| 51 | + self.args = args |
| 52 | + self.session = requests.session() |
| 53 | + |
| 54 | + def choose_category(self): |
| 55 | + categories = [ |
| 56 | + [1, 'Sha3\'af', 'https://msc-mu.com/level/18'], |
| 57 | + [2, 'Athar', 'https://msc-mu.com/level/17'], |
| 58 | + [3, 'Rou7', 'https://msc-mu.com/level/16'], |
| 59 | + [4, 'Wateen', 'https://msc-mu.com/level/15'], |
| 60 | + [5, 'Nabed', 'https://msc-mu.com/level/14'], |
| 61 | + [6, 'Wareed', 'https://msc-mu.com/level/13'], |
| 62 | + [7, 'Minors', 'https://msc-mu.com/level/10'], |
| 63 | + [8, 'Majors', 'https://msc-mu.com/level/9'] |
| 64 | + ] |
| 65 | + if self.args.category: |
| 66 | + category_url = categories[self.args.category - 1][2] |
| 67 | + print(DECOR + 'Searching', categories[self.args.category - 1][1] + '\'s category...') |
| 68 | + return category_url |
| 69 | + print('\n') |
| 70 | + for category in categories: |
| 71 | + print(str(category[0]) + ') ' + category[1]) |
| 72 | + selected_category = input('\n' + DECOR + 'Choose a category.\n\n>> ') |
| 73 | + try: |
| 74 | + selected_category = int(selected_category) |
| 75 | + for category in categories: |
| 76 | + if selected_category == category[0]: |
| 77 | + print('\n' + DECOR + 'Searching', category[1] + '\'s category...\n') |
| 78 | + category_url = categories[selected_category - 1][2] |
| 79 | + return category_url |
| 80 | + break |
| 81 | + raise Exception |
| 82 | + except Exception: |
| 83 | + print('\n' + DECOR + 'Invalid Input\n') |
| 84 | + return self.choose_category() |
| 85 | + |
| 86 | + |
| 87 | + def find_courses(self, url): |
| 88 | + page = self.session.get(url, headers=HEADERS) |
| 89 | + doc = BeautifulSoup(page.text, 'html.parser') |
| 90 | + subject = doc.find_all('h6') |
| 91 | + courses = [] |
| 92 | + for x, i in enumerate(subject): |
| 93 | + parent = i.parent.parent.parent |
| 94 | + course_number = re.findall('href="https://msc-mu.com/courses/(.*)">', parent.decode())[0] |
| 95 | + course_name = i.string.strip() |
| 96 | + courses.append([x + 1, course_name, course_number]) |
| 97 | + return courses |
| 98 | + |
| 99 | + |
| 100 | + def choose_course(self, courses): |
| 101 | + if self.args.course: |
| 102 | + course_number = str(courses[self.args.course - 1][2]) |
| 103 | + print(DECOR + 'Alright, ', courses[self.args.course - 1][1]) |
| 104 | + return course_number |
| 105 | + for course in courses: |
| 106 | + print(str(course[0]) + ') ' + course[1]) |
| 107 | + selected_course = input('\n' + DECOR + 'Which course would you like to download?\n\n>> ') |
| 108 | + list_index = None |
| 109 | + try: |
| 110 | + selected_course = int(selected_course) |
| 111 | + for course in courses: |
| 112 | + if selected_course == course[0]: |
| 113 | + list_index = selected_course - 1 |
| 114 | + print('\n' + DECOR + 'Alright, ', course[1]) |
| 115 | + course_number = str(courses[list_index][2]) |
| 116 | + return course_number |
| 117 | + except Exception: |
| 118 | + print('\n' + DECOR + 'Invalid Input\n') |
| 119 | + return self.choose_course(courses) |
| 120 | + |
| 121 | + |
| 122 | + def choose_folder(self): |
| 123 | + folder = FOLDER |
| 124 | + # TODO let the system figure out the directory. |
| 125 | + if self.args.folder or self.args.default_folder: |
| 126 | + if not self.args.default_folder: |
| 127 | + folder = self.args.folder |
| 128 | + |
| 129 | + if '~' in folder: |
| 130 | + folder = os.path.expanduser(folder) |
| 131 | + if not folder[-1] == os.path.sep: |
| 132 | + folder = folder + os.path.sep |
| 133 | + if os.path.isdir(folder): |
| 134 | + return folder |
| 135 | + else: |
| 136 | + print('\n' + DECOR + 'Folder Not found! ', end='') |
| 137 | + quit() |
| 138 | + else: |
| 139 | + answer = input(DECOR + 'Your default destination is ' + folder + '\n' + DECOR + ' Do you want to keep that (Y/n): ') |
| 140 | + if answer == 'n' or answer == 'no' or answer == 'N': |
| 141 | + valid_folder = False |
| 142 | + while not valid_folder: |
| 143 | + selected_folder = input('\n' + DECOR + 'Enter the Folder you want to save material in.\n\n>> ') |
| 144 | + # Adds a seperator at the end if the user didn't |
| 145 | + if not selected_folder.endswith(os.path.sep): |
| 146 | + selected_folder = selected_folder + os.path.sep |
| 147 | + selected_folder = os.path.expanduser(selected_folder) |
| 148 | + if os.path.isdir(selected_folder): |
| 149 | + folder = selected_folder |
| 150 | + valid_folder = True |
| 151 | + else: |
| 152 | + print('\n' + DECOR + 'Folder Not found! ', end='') |
| 153 | + if not folder[-1] == os.path.sep: |
| 154 | + folder = folder + os.path.sep |
| 155 | + return folder |
| 156 | + |
| 157 | + |
| 158 | + def create_nav_links_dictionary(self, soup): |
| 159 | + navigate_dict = {} |
| 160 | + nav_links = soup.find_all('li', attrs={"class": "nav-item"}) |
| 161 | + for navigate_link in nav_links: |
| 162 | + if navigate_link.h5: |
| 163 | + nav_name = navigate_link.h5.text.strip() |
| 164 | + nav_number = navigate_link.a.get('aria-controls') |
| 165 | + navigate_dict[nav_number] = nav_name |
| 166 | + return navigate_dict |
| 167 | + |
| 168 | + |
| 169 | + def make_course_folder(self, courses, index, folder): |
| 170 | + course_name = None |
| 171 | + for course in courses: |
| 172 | + if course[2] == index: |
| 173 | + course_name = course[1] |
| 174 | + break |
| 175 | + # Replace any invalid characters with an underscore ( Sanitize course name ) |
| 176 | + safe_course_name = re.sub(r'[\/:*?"<>|]', '_', course_name) |
| 177 | + new_folder = folder + safe_course_name + os.path.sep |
| 178 | + |
| 179 | + if not os.path.isdir(new_folder): |
| 180 | + os.mkdir(new_folder) |
| 181 | + folder = new_folder |
| 182 | + return folder |
| 183 | + |
| 184 | + |
| 185 | + def find_files_paths_and_links(self, navigation_dict, soup): |
| 186 | + file_tags = [] |
| 187 | + for extension in EXTENSIONS: |
| 188 | + file_tags += soup.find_all('a', string=lambda text: text and f'.{extension}' in text) |
| 189 | + files_list = [] |
| 190 | + path = [] |
| 191 | + associated_nav_link_id = '' |
| 192 | + for file_tag in file_tags: |
| 193 | + current_tag = file_tag |
| 194 | + if not current_tag: |
| 195 | + # TODO fix this print to include all EXTENSIONS |
| 196 | + print('no pdf or pptx files!') |
| 197 | + quit() |
| 198 | + while True: |
| 199 | + current_tag = current_tag.parent |
| 200 | + if current_tag.name == 'div' and 'mb-3' in current_tag.get('class', []): |
| 201 | + path.append(current_tag.h6.text.strip()) |
| 202 | + if current_tag.name == 'div' and 'tab-pane' in current_tag.get('class', []): |
| 203 | + associated_nav_link_id = current_tag.get('id') |
| 204 | + if not current_tag.parent: |
| 205 | + break |
| 206 | + path.append(navigation_dict[associated_nav_link_id]) |
| 207 | + path.reverse() |
| 208 | + basename = file_tag.text |
| 209 | + file_path = "/".join(path) + os.path.sep |
| 210 | + path.clear() |
| 211 | + |
| 212 | + file_link = file_tag.get('href') |
| 213 | + files_list.append([file_path, file_link, basename]) |
| 214 | + return files_list |
| 215 | + |
| 216 | + |
| 217 | + def _download_single_file(self, data): |
| 218 | + path, link, name, folder = data |
| 219 | + safe_name = re.sub(r'[\/:*?"<>|]', '_', name) |
| 220 | + full_path = os.path.join(folder, path) |
| 221 | + file_location = os.path.join(full_path, safe_name) |
| 222 | + |
| 223 | + if os.path.isfile(file_location): |
| 224 | + return # Skip existing |
| 225 | + |
| 226 | + if not os.path.isdir(full_path): |
| 227 | + os.makedirs(full_path, exist_ok=True) |
| 228 | + |
| 229 | + try: |
| 230 | + with self.session.get(link, headers=HEADERS, stream=True) as r: |
| 231 | + total_size = int(r.headers.get("content-length", 0)) |
| 232 | + with tqdm(total=total_size, unit="B", unit_scale=True, |
| 233 | + desc=f"{safe_name:14.14}", leave=False, ascii='-#', ncols=86) as file_bar: |
| 234 | + |
| 235 | + with open(file_location, 'wb') as file: |
| 236 | + for chunk in r.iter_content(chunk_size=128 * 1024): |
| 237 | + file.write(chunk) |
| 238 | + file_bar.update(len(chunk)) |
| 239 | + |
| 240 | + Scraper.downloaded_count += 1 |
| 241 | + |
| 242 | + except Exception as e: |
| 243 | + # Using tqdm.write prevents breaking the progress bars |
| 244 | + tqdm.write(f"Error on {safe_name}: {e}") |
| 245 | + |
| 246 | + |
| 247 | + |
| 248 | + def download_from_dict(self, path_link_dict, folder): |
| 249 | + print(DECOR + "Starting threaded download...") |
| 250 | + tasks = [(path, link, name, folder) for path, link, name in path_link_dict] |
| 251 | + |
| 252 | + # 1. Start executor manually (No 'with' statement) |
| 253 | + executor = ThreadPoolExecutor(max_workers=5) |
| 254 | + futures = [executor.submit(self._download_single_file, t) for t in tasks] |
| 255 | + |
| 256 | + try: |
| 257 | + # 2. Process downloads |
| 258 | + with tqdm(total=len(tasks), desc="Total Progress", position=0, ncols=86, ascii='-#') as overall_bar: |
| 259 | + for _ in as_completed(futures): |
| 260 | + overall_bar.update(1) |
10 | 261 |
|
| 262 | + except KeyboardInterrupt: |
| 263 | + # 3. Force kill threads immediately on Ctrl+C |
| 264 | + print('\n' + DECOR + 'Stopping threads...') |
| 265 | + executor.shutdown(wait=False) |
| 266 | + raise # Re-raise error so the main block handles the exit |
11 | 267 |
|
| 268 | + # 4. Normal cleanup if no error |
| 269 | + executor.shutdown(wait=True) |
12 | 270 | def main(): |
13 | 271 | # main function should use the scraper class |
14 | 272 | start = datetime.datetime.now() |
|
0 commit comments