Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
normalize_url,
)


async def run_scraping(
base_url: str,
discovery_mode: bool,
force_scrape_method: str,
output_format: str,
include_urls: bool = False,
essential_metadata_only: bool = True,
) -> Tuple[Dict[str, Any], int]:
"""
Run the web scraping process.
Expand All @@ -40,6 +43,8 @@ async def run_scraping(
discovery_mode (bool): Whether to scrape the entire site or just the base URL.
force_scrape_method (str): Method to force for scraping ('req' or 'sel').
output_format (str): The desired output format ('csv' or 'json').
include_urls (bool): Whether to include discovered URLs in the output.
essential_metadata_only (bool): Whether to include only essential metadata fields.

Returns:
Tuple[Dict[str, Any], int]: A tuple containing the formatted output
Expand All @@ -59,7 +64,14 @@ async def run_scraping(

results = await run_scrapers(base_url, discovery_mode, force_scrape_method)

formatted_output = format_output(results, output_format)
# Pass both include_urls and essential_metadata_only parameters
formatted_output = format_output(
results,
output_format,
include_urls=include_urls,
essential_metadata_only=essential_metadata_only
)

total_urls_scraped = len(results)

if output_format == 'json':
Expand Down Expand Up @@ -106,6 +118,16 @@ def main() -> None:
choices=['req', 'sel'],
help="Force scraping with either requests or selenium"
)
parser.add_argument(
"--include-urls",
action="store_true",
help="Include discovered URLs in the output (useful for debugging, but not recommended for LLM context)"
)
parser.add_argument(
"--full-metadata",
action="store_true",
help="Include all metadata fields (by default, only essential fields like url, title, and content_type are included)"
)
args = parser.parse_args()

base_url = normalize_url(args.url)
Expand All @@ -118,6 +140,8 @@ def main() -> None:
"log_level": args.log,
"output_format": args.format,
"save_directory": args.savename or get_domain(base_url),
"include_urls": args.include_urls,
"essential_metadata_only": not args.full_metadata,
}

# Set up logging
Expand All @@ -137,7 +161,14 @@ def main() -> None:
logging.info("Starting web scraping process...")

formatted_output, total_urls_scraped = asyncio.run(
run_scraping(base_url, args.discovery, args.force, args.format)
run_scraping(
base_url,
args.discovery,
args.force,
args.format,
include_urls=args.include_urls,
essential_metadata_only=not args.full_metadata
)
)

filename = set_filename(args.format, now)
Expand Down
26 changes: 21 additions & 5 deletions modules/processors/url_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,34 @@ def is_valid_url(url: str, base_url: str) -> bool:

def normalize_url(url: str) -> str:
"""
Normalize a URL by removing trailing slashes and standardizing the scheme.
Normalize a URL by removing trailing slashes and standardizing the scheme
while preserving case in the path component.

Args:
url (str): The URL to normalize.

Returns:
str: The normalized URL.
"""
parsed = urlparse(url.lower())
scheme = parsed.scheme or 'https' # Default to https if no scheme is provided
path = parsed.path.rstrip('/') # Remove trailing slash from path
return f"{scheme}://{parsed.netloc}{path}"
parsed = urlparse(url)
# Normalize scheme (case-insensitive)
scheme = parsed.scheme.lower() or 'https' # Default to https if no scheme is provided
# Normalize netloc (domain is case-insensitive)
netloc = parsed.netloc.lower()
# Preserve case in path but remove trailing slash
path = parsed.path.rstrip('/')
# Preserve query and fragment
query = parsed.query
fragment = parsed.fragment

# Reconstruct the URL
normalized_url = f"{scheme}://{netloc}{path}"
if query:
normalized_url += f"?{query}"
if fragment:
normalized_url += f"#{fragment}"

return normalized_url

def url_matches_base(url: str, base_url: str) -> bool:
"""
Expand Down
50 changes: 39 additions & 11 deletions modules/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,16 @@ def is_image_content_type(url):
logging.error(f"Error checking content type for {url}")
return False

def format_output(results, output_format):
def format_output(results, output_format, include_urls=False, essential_metadata_only=True):
"""
Format the scraped results according to the specified output format.

Args:
results (dict): Dictionary of scraped results with URLs as keys and
dictionaries containing 'content', 'discovered_urls', and 'metadata' as values
output_format (str): Desired output format ('csv' or 'json')
sitemap_urls (set): Set of URLs from the sitemap
include_urls (bool, optional): Whether to include discovered URLs in the output. Defaults to False.
essential_metadata_only (bool, optional): Whether to include only essential metadata. Defaults to True.

Returns:
list or dict: Formatted data ready for output. For CSV, a list of lists where the first row
Expand All @@ -101,19 +102,46 @@ def format_output(results, output_format):
ValueError: If an invalid output format is specified
"""
sorted_results = dict(sorted(results.items()))

# Filter metadata if requested
if essential_metadata_only:
for url, data in sorted_results.items():
if 'metadata' in data:
# Keep only essential metadata fields
essential_fields = ['url', 'title', 'content_type']
data['metadata'] = {k: v for k, v in data['metadata'].items() if k in essential_fields}

if output_format == 'csv':
csv_data = [['URL', 'Content', 'Discovered URLs', 'Metadata']]
for url, data in sorted_results.items():
metadata_str = json.dumps(data.get('metadata', {}))
csv_data.append([
url,
data['content'],
', '.join(data['discovered_urls']),
metadata_str
])
if include_urls:
csv_data = [['URL', 'Content', 'Discovered URLs', 'Metadata']]
for url, data in sorted_results.items():
metadata_str = json.dumps(data.get('metadata', {}))
csv_data.append([
url,
data['content'],
', '.join(data.get('discovered_urls', [])),
metadata_str
])
else:
csv_data = [['URL', 'Content', 'Metadata']]
for url, data in sorted_results.items():
metadata_str = json.dumps(data.get('metadata', {}))
csv_data.append([
url,
data['content'],
metadata_str
])
return csv_data
elif output_format == 'json':
if not include_urls:
# Create a copy without the discovered_urls for each entry
clean_results = {}
for url, data in sorted_results.items():
clean_results[url] = {
'metadata': data.get('metadata', {}),
'content': data['content']
}
return clean_results
return sorted_results
else:
raise ValueError(f"Invalid output format: {output_format}")
Expand Down