-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscraper.py
More file actions
320 lines (261 loc) · 10.9 KB
/
scraper.py
File metadata and controls
320 lines (261 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
import requests
from bs4 import BeautifulSoup
import os
from urllib.parse import urljoin, urlparse
import time
from pathlib import Path
import re
class ResourceScraper:
def __init__(self, base_url, download_folder="downloaded_resources"):
self.base_url = base_url
self.download_folder = download_folder
self.session = requests.Session()
# Add headers to avoid being blocked
self.session.headers.update(
{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
)
# Create download folder
Path(self.download_folder).mkdir(exist_ok=True)
# Track downloaded files to avoid duplicates
self.downloaded_files = set()
def clean_filename(self, text, max_length=100):
"""Clean and format text to be a valid filename"""
if not text or not text.strip():
return None
# Remove HTML tags if any
text = re.sub(r"<[^>]+>", "", text)
# Clean up the text
text = text.strip()
# Convert to lowercase
text = text.lower()
# Replace invalid filename characters
invalid_chars = r'[<>:"/\\|?*\x00-\x1f]'
text = re.sub(invalid_chars, "_", text)
# Replace multiple spaces/underscores with single underscore
text = re.sub(r"[\s_]+", "_", text)
# Remove leading/trailing underscores and dots
text = text.strip("_.")
# Truncate if too long
if len(text) > max_length:
text = text[:max_length].rstrip("_.")
return text if text else None
def get_file_extension(self, url):
"""Extract file extension from URL"""
parsed = urlparse(url)
path = parsed.path.lower()
if path.endswith(".pdf"):
return ".pdf"
elif path.endswith(".ppt"):
return ".ppt"
elif path.endswith(".pptx"):
return ".pptx"
elif path.endswith(".docx"):
return ".docx"
elif path.endswith(".xls"):
return ".xls"
elif path.endswith(".mp4"):
return ".mp4"
elif path.endswith(".doc"):
return ".doc"
else:
# Try to detect from URL parameters or guess
if "pdf" in url.lower():
return ".pdf"
elif "ppt" in url.lower():
return ".pptx"
elif "docx" in url.lower():
return ".docx"
elif "xls" in url.lower():
return ".xls"
elif "mp4" in url.lower():
return ".mp4"
elif "doc" in url.lower():
return ".doc"
else:
return ".pdf" # Default fallback
def generate_filename(self, link_text, original_url, fallback_name=None):
"""Generate a clean filename from link text"""
extension = self.get_file_extension(original_url)
# Try to use link text first
clean_text = self.clean_filename(link_text)
if clean_text and len(clean_text) > 2: # Ensure meaningful name
filename = f"{clean_text}{extension}"
elif fallback_name:
# Use provided fallback
clean_fallback = self.clean_filename(fallback_name)
filename = (
f"{clean_fallback}{extension}"
if clean_fallback
else f"document{extension}"
)
else:
# Use original filename from URL
original_name = os.path.basename(urlparse(original_url).path)
if original_name and "." in original_name:
filename = original_name
else:
filename = f"document{extension}"
return filename
def get_page_content(self, url):
"""Fetch and parse a webpage"""
try:
response = self.session.get(url)
response.raise_for_status()
return BeautifulSoup(response.content, "html.parser")
except requests.RequestException as e:
print(f"Error fetching {url}: {e}")
return None
def find_resources(self, soup, page_url):
"""Find all PDF and PPT resources on a page with their link text"""
resources = []
# Look for direct links to PDFs and PPTs
for link in soup.find_all("a", href=True):
href = link["href"]
full_url = urljoin(page_url, href)
# Check if it's a PDF or PPT file
if any(
full_url.lower().endswith(ext)
for ext in [".pdf", ".ppt", ".pptx", ".docx", ".xls", ".mp4", ".doc"]
) or any(
ext in full_url.lower()
for ext in ["pdf", "ppt", "xls", "docx", "mp4", "doc"]
):
# Get link text and clean it
link_text = link.get_text(strip=True)
# Also check for title attribute as fallback
title_attr = link.get("title", "")
# Generate filename based on link text
filename = self.generate_filename(
link_text, full_url, fallback_name=title_attr
)
resources.append(
{
"url": full_url,
"filename": filename,
"link_text": link_text,
"title": title_attr,
"original_filename": os.path.basename(urlparse(full_url).path),
}
)
# Also check embedded objects and iframes
for embed in soup.find_all(["embed", "object", "iframe"]):
src = embed.get("src") or embed.get("data")
if src:
full_url = urljoin(page_url, src)
if any(
full_url.lower().endswith(ext)
for ext in [
".pdf",
".ppt",
".pptx",
".docx",
".xls",
".mp4",
".doc",
]
):
# For embedded content, try to find nearby text or title
title_text = embed.get("title", "") or embed.get("alt", "")
filename = self.generate_filename(
title_text, full_url, fallback_name="Embedded_Resource"
)
resources.append(
{
"url": full_url,
"filename": filename,
"link_text": title_text or "Embedded resource",
"title": title_text,
"original_filename": os.path.basename(
urlparse(full_url).path
),
}
)
return resources
def download_file(self, url, filename):
"""Download a single file"""
if url in self.downloaded_files:
print(f"Already downloaded: {filename}")
return True
try:
print(f"Downloading: {filename}")
response = self.session.get(url, stream=True)
response.raise_for_status()
filepath = os.path.join(self.download_folder, filename)
# Handle duplicate filenames
counter = 1
original_filepath = filepath
while os.path.exists(filepath):
name, ext = os.path.splitext(original_filepath)
filepath = f"{name}_{counter}{ext}"
counter += 1
with open(filepath, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
self.downloaded_files.add(url)
print(f"✓ Downloaded: {os.path.basename(filepath)}")
return True
except requests.RequestException as e:
print(f"✗ Failed to download {filename}: {e}")
return False
def scrape_page(self, page_url, preview_only=False):
"""Scrape a single page for resources"""
print(f"\n--- Scraping: {page_url} ---")
soup = self.get_page_content(page_url)
if not soup:
return []
resources = self.find_resources(soup, page_url)
print(f"Found {len(resources)} resources on this page")
if preview_only:
print("\n--- PREVIEW MODE - Files that would be downloaded: ---")
for i, resource in enumerate(resources, 1):
print(f"{i}. Link text: '{resource['link_text']}'")
print(f" Generated filename: {resource['filename']}")
print(f" Original filename: {resource['original_filename']}")
print(f" URL: {resource['url']}")
print()
return resources
downloaded = []
for resource in resources:
print(f"\nFound: '{resource['link_text']}'")
print(f"Generated filename: {resource['filename']}")
print(f"Original filename: {resource['original_filename']}")
if self.download_file(resource["url"], resource["filename"]):
downloaded.append(resource)
# Be respectful - add small delay between downloads
time.sleep(0.5)
return downloaded
def scrape_multiple_pages(self, page_urls, preview_only=False):
"""Scrape multiple pages"""
all_downloaded = []
for i, url in enumerate(page_urls, 1):
print(f"\n=== Processing Page {i}/{len(page_urls)} ===")
downloaded = self.scrape_page(url, preview_only=preview_only)
all_downloaded.extend(downloaded)
# Delay between pages to be respectful
if not preview_only:
time.sleep(1)
return all_downloaded
# Usage Example
if __name__ == "__main__":
# Initialize scraper
scraper = ResourceScraper("https://www.sabbathschoolpersonalministries.org")
# List of pages to scrape (add your actual URLs)
pages_to_scrape = [
"https://www.sabbathschoolpersonalministries.org/acs_iicd",
# Add more pages as needed
]
# PREVIEW MODE - See what files would be downloaded and their names
print("=== PREVIEW MODE ===")
scraper.scrape_multiple_pages(pages_to_scrape, preview_only=True)
# Uncomment the lines below to actually download after previewing
# Actual download
print("\n=== STARTING ACTUAL DOWNLOAD ===")
downloaded_resources = scraper.scrape_multiple_pages(pages_to_scrape)
print(f"\n=== SUMMARY ===")
print(f"Total resources downloaded: {len(downloaded_resources)}")
print(f"Files saved to: {scraper.download_folder}")
# Print list of downloaded files with their link text
for resource in downloaded_resources:
print(f"- {resource['filename']} (from: '{resource['link_text']}')")