diff --git a/.gitignore b/.gitignore index 86effc14..26f4d36e 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,4 @@ substack_html_pages/* # Ignore substack_md_files directory /substack_md_files/ +substack_images/ \ No newline at end of file diff --git a/README.md b/README.md index c81f0df4..f32206db 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,25 @@ To scrape a specific number of posts: python substack_scraper.py --url https://example.substack.com --directory /path/to/save/posts --number 5 ``` +To scrape a single post: + +```bash +python substack_scraper.py --url https://example.substack.com/p/example-post --directory /path/to/save/posts +``` + +To scrape images and download them to a `substack_images/` folder locally: + +```bash +python substack_scraper.py --url https://example.substack.com --directory /path/to/save/posts --images +``` + +### Testing +Run tests using pytest: + +``` +python -m pytest +``` + ### Online Version For a hassle-free experience without any local setup: diff --git a/requirements.txt b/requirements.txt index c58926a7..3fe971b4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,4 @@ selenium==4.16.0 tqdm==4.66.1 webdriver_manager==4.0.1 Markdown==3.6 +pytest==8.3.4 \ No newline at end of file diff --git a/substack_scraper.py b/substack_scraper.py index 7644d6db..35002868 100644 --- a/substack_scraper.py +++ b/substack_scraper.py @@ -1,9 +1,14 @@ import argparse import json import os +import hashlib +import mimetypes +import re from abc import ABC, abstractmethod +from pathlib import Path from typing import List, Optional, Tuple from time import sleep +from urllib.parse import urlparse, unquote from bs4 import BeautifulSoup import html2text @@ -17,23 +22,134 @@ from webdriver_manager.microsoft import EdgeChromiumDriverManager from selenium.webdriver.edge.options import Options as EdgeOptions from selenium.webdriver.chrome.service import Service -from urllib.parse import urlparse from config import EMAIL, PASSWORD USE_PREMIUM: bool = False # Set to True if you want to login to Substack and convert paid for posts BASE_SUBSTACK_URL: str = "https://www.thefitzwilliam.com/" # Substack you want to convert to markdown BASE_MD_DIR: str = "substack_md_files" # Name of the directory we'll save the .md essay files BASE_HTML_DIR: str = "substack_html_pages" # Name of the directory we'll save the .html essay files +BASE_IMAGE_DIR: str = "substack_images" # Name of the directory we'll save images to if --images is passed in HTML_TEMPLATE: str = "author_template.html" # HTML template to use for the author page JSON_DATA_DIR: str = "data" NUM_POSTS_TO_SCRAPE: int = 3 # Set to 0 if you want all posts +def clean_linked_images(md_content: str) -> str: + """ + Converts markdown linked images to simple image references. + + Args: + md_content: String containing markdown content + + Returns: + String with cleaned markdown where linked images are converted to simple image references + + Example: + >>> md = "[](/img/test.png)" + >>> clean_linked_images(md) + '' + """ + # Pattern matches: [](/path/img.ext) + pattern = r'\[!\[(.*?)\]\((.*?)\)\]\(.*?\)' + + # Replace with:  + cleaned = re.sub(pattern, r'', md_content) + + return cleaned + +def count_images_in_markdown(md_content: str) -> int: + """ + Count number of image references in markdown content. + + Args: + md_content: Markdown content to analyze + + Returns: + Number of unique images found + """ + # First clean linked images + cleaned_content = clean_linked_images(md_content) + + # Then count remaining image references + pattern = r'!\[.*?\]\((.*?)\)' + matches = re.findall(pattern, cleaned_content) + return len(matches) + +def is_post_url(url: str) -> bool: + return "/p/" in url + +def get_publication_url(url: str) -> str: + parsed = urlparse(url) + return f"{parsed.scheme}://{parsed.netloc}/" def extract_main_part(url: str) -> str: parts = urlparse(url).netloc.split('.') # Parse the URL to get the netloc, and split on '.' return parts[1] if parts[0] == 'www' else parts[0] # Return the main part of the domain, while ignoring 'www' if # present +def get_post_slug(url: str) -> str: + match = re.search(r'/p/([^/]+)', url) + return match.group(1) if match else 'unknown_post' + +def sanitize_filename(url: str) -> str: + """Create a safe filename from URL or content.""" + # Extract original filename from CDN URL + if "substackcdn.com" in url: + # Get the actual image URL after the CDN parameters + original_url = unquote(url.split("https://")[1]) + filename = original_url.split("/")[-1] + else: + filename = url.split("/")[-1] + + # Remove invalid characters + filename = re.sub(r'[<>:"/\\|?*]', '', filename) + + # If filename is too long or empty, create hash-based name + if len(filename) > 100 or not filename: + hash_object = hashlib.md5(url.encode()) + ext = mimetypes.guess_extension(requests.head(url).headers.get('content-type', '')) or '.jpg' + filename = f"{hash_object.hexdigest()}{ext}" + + return filename + +def download_image(url: str, save_path: Path, pbar: Optional[tqdm] = None) -> Optional[str]: + """Download image from URL and save to path.""" + try: + response = requests.get(url, stream=True) + if response.status_code == 200: + save_path.parent.mkdir(parents=True, exist_ok=True) + with open(save_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + if pbar: + pbar.update(1) + return str(save_path) + except Exception as e: + if pbar: + pbar.write(f"Error downloading image {url}: {str(e)}") + else: + print(f"Error downloading image {url}: {str(e)}") + return None + +def process_markdown_images(md_content: str, author: str, post_slug: str, pbar: Optional[tqdm] = None) -> str: + """Process markdown content to download images and update references.""" + image_dir = Path(BASE_IMAGE_DIR) / author / post_slug + + # First clean up any linked images + md_content = clean_linked_images(md_content) + + def replace_image(match): + url = match.group(0).strip('()') + filename = sanitize_filename(url) + save_path = image_dir / filename + if not save_path.exists(): + download_image(url, save_path, pbar) + + rel_path = os.path.relpath(save_path, Path(BASE_MD_DIR) / author) + return f"({rel_path})" + + pattern = r'\(https://substackcdn\.com/image/fetch/[^\s\)]+\)' + return re.sub(pattern, replace_image, md_content) def generate_html_file(author_name: str) -> None: """ @@ -67,26 +183,26 @@ def generate_html_file(author_name: str) -> None: class BaseSubstackScraper(ABC): - def __init__(self, base_substack_url: str, md_save_dir: str, html_save_dir: str): - if not base_substack_url.endswith("/"): - base_substack_url += "/" - self.base_substack_url: str = base_substack_url - - self.writer_name: str = extract_main_part(base_substack_url) - md_save_dir: str = f"{md_save_dir}/{self.writer_name}" - - self.md_save_dir: str = md_save_dir - self.html_save_dir: str = f"{html_save_dir}/{self.writer_name}" - - if not os.path.exists(md_save_dir): - os.makedirs(md_save_dir) - print(f"Created md directory {md_save_dir}") - if not os.path.exists(self.html_save_dir): - os.makedirs(self.html_save_dir) - print(f"Created html directory {self.html_save_dir}") - - self.keywords: List[str] = ["about", "archive", "podcast"] - self.post_urls: List[str] = self.get_all_post_urls() + def __init__(self, base_substack_url: str, md_save_dir: str, html_save_dir: str, download_images: bool = False): + self.is_single_post: bool = is_post_url(base_substack_url) + self.base_substack_url: str = get_publication_url(base_substack_url) + self.writer_name: str = extract_main_part(self.base_substack_url) + self.post_slug: Optional[str] = get_post_slug(base_substack_url) if self.is_single_post else None + + self.md_save_dir: str = Path(md_save_dir) / self.writer_name + self.html_save_dir: str = Path(html_save_dir) / self.writer_name + self.image_dir: str = Path(BASE_IMAGE_DIR) / self.writer_name + self.download_images: bool = download_images + + for directory in [self.md_save_dir, self.html_save_dir]: + directory.mkdir(parents=True, exist_ok=True) + print(f"Created directory {directory}") + + if self.is_single_post: + self.post_urls = [base_substack_url] + else: + self.keywords: List[str] = ["about", "archive", "podcast"] + self.post_urls: List[str] = self.get_all_post_urls() def get_all_post_urls(self) -> List[str]: """ @@ -302,47 +418,62 @@ def scrape_posts(self, num_posts_to_scrape: int = 0) -> None: essays_data = [] count = 0 total = num_posts_to_scrape if num_posts_to_scrape != 0 else len(self.post_urls) - for url in tqdm(self.post_urls, total=total): - try: - md_filename = self.get_filename_from_url(url, filetype=".md") - html_filename = self.get_filename_from_url(url, filetype=".html") - md_filepath = os.path.join(self.md_save_dir, md_filename) - html_filepath = os.path.join(self.html_save_dir, html_filename) - - if not os.path.exists(md_filepath): - soup = self.get_url_soup(url) - if soup is None: - total += 1 - continue - title, subtitle, like_count, date, md = self.extract_post_data(soup) - self.save_to_file(md_filepath, md) - - # Convert markdown to HTML and save - html_content = self.md_to_html(md) - self.save_to_html_file(html_filepath, html_content) - - essays_data.append({ - "title": title, - "subtitle": subtitle, - "like_count": like_count, - "date": date, - "file_link": md_filepath, - "html_link": html_filepath - }) - else: - print(f"File already exists: {md_filepath}") - except Exception as e: - print(f"Error scraping post: {e}") - count += 1 - if num_posts_to_scrape != 0 and count == num_posts_to_scrape: - break + + with tqdm(total=total, desc="Scraping posts") as pbar: + for url in self.post_urls: + try: + post_slug = url.split('/')[-1] + md_filename = self.get_filename_from_url(url, filetype=".md") + html_filename = self.get_filename_from_url(url, filetype=".html") + md_filepath = os.path.join(self.md_save_dir, md_filename) + html_filepath = os.path.join(self.html_save_dir, html_filename) + + if not os.path.exists(md_filepath): + soup = self.get_url_soup(url) + if soup is None: + total += 1 + continue + + title, subtitle, like_count, date, md = self.extract_post_data(soup) + + if self.download_images: + # Count images before downloading + total_images = count_images_in_markdown(md) + post_slug = url.split("/p/")[-1].split("/")[0] + + with tqdm(total=total_images, desc=f"Downloading images for {post_slug}", leave=False) as img_pbar: + md = process_markdown_images(md, self.writer_name, post_slug, img_pbar) + + self.save_to_file(md_filepath, md) + html_content = self.md_to_html(md) + self.save_to_html_file(html_filepath, html_content) + + essays_data.append({ + "title": title, + "subtitle": subtitle, + "like_count": like_count, + "date": date, + "file_link": md_filepath, + "html_link": html_filepath + }) + else: + pbar.write(f"File already exists: {md_filepath}") + + except Exception as e: + pbar.write(f"Error scraping post: {e}") + + count += 1 + pbar.update(1) + if num_posts_to_scrape != 0 and count == num_posts_to_scrape: + break + self.save_essays_data_to_json(essays_data=essays_data) generate_html_file(author_name=self.writer_name) class SubstackScraper(BaseSubstackScraper): - def __init__(self, base_substack_url: str, md_save_dir: str, html_save_dir: str): - super().__init__(base_substack_url, md_save_dir, html_save_dir) + def __init__(self, base_substack_url: str, md_save_dir: str, html_save_dir: str, download_images: bool = False): + super().__init__(base_substack_url, md_save_dir, html_save_dir, download_images) def get_url_soup(self, url: str) -> Optional[BeautifulSoup]: """ @@ -368,9 +499,10 @@ def __init__( headless: bool = False, edge_path: str = '', edge_driver_path: str = '', - user_agent: str = '' + user_agent: str = '', + download_images: bool = False, ) -> None: - super().__init__(base_substack_url, md_save_dir, html_save_dir) + super().__init__(base_substack_url, md_save_dir, html_save_dir, download_images) options = EdgeOptions() if headless: @@ -438,7 +570,7 @@ def get_url_soup(self, url: str) -> BeautifulSoup: def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="Scrape a Substack site.") + parser = argparse.ArgumentParser(description="Scrape a Substack site or individual post.") parser.add_argument( "-u", "--url", type=str, help="The base URL of the Substack site to scrape." ) @@ -488,16 +620,19 @@ def parse_args() -> argparse.Namespace: type=str, help="The directory to save scraped posts as HTML files.", ) - + parser.add_argument( + "--images", + action="store_true", + help="Download images and update markdown to use local paths" + ) return parser.parse_args() - def main(): args = parse_args() - + if args.directory is None: args.directory = BASE_MD_DIR - + if args.html_directory is None: args.html_directory = BASE_HTML_DIR @@ -507,13 +642,15 @@ def main(): args.url, headless=args.headless, md_save_dir=args.directory, - html_save_dir=args.html_directory + html_save_dir=args.html_directory, + download_images=args.images ) else: scraper = SubstackScraper( args.url, md_save_dir=args.directory, - html_save_dir=args.html_directory + html_save_dir=args.html_directory, + download_images=args.images ) scraper.scrape_posts(args.number) @@ -524,16 +661,19 @@ def main(): md_save_dir=args.directory, html_save_dir=args.html_directory, edge_path=args.edge_path, - edge_driver_path=args.edge_driver_path + edge_driver_path=args.edge_driver_path, + download_images=args.images ) else: scraper = SubstackScraper( base_substack_url=BASE_SUBSTACK_URL, md_save_dir=args.directory, - html_save_dir=args.html_directory + html_save_dir=args.html_directory, + download_images=args.images ) scraper.scrape_posts(num_posts_to_scrape=NUM_POSTS_TO_SCRAPE) - + + scraper.scrape_posts(args.number) if __name__ == "__main__": main() diff --git a/tests/test_substack_scraper.py b/tests/test_substack_scraper.py new file mode 100644 index 00000000..627918f9 --- /dev/null +++ b/tests/test_substack_scraper.py @@ -0,0 +1,332 @@ +import os +import shutil +import pytest +from pathlib import Path +from unittest.mock import Mock, patch + +from substack_scraper import ( + BASE_IMAGE_DIR, + SubstackScraper, + clean_linked_images, + count_images_in_markdown, + sanitize_filename, + process_markdown_images, +) + +@pytest.fixture +def mock_html_content(): + return """ + +
+Test content with image:
+