diff --git a/pyheadspace/__main__.py b/pyheadspace/__main__.py index 627a846..f958fa4 100644 --- a/pyheadspace/__main__.py +++ b/pyheadspace/__main__.py @@ -1,6 +1,7 @@ import logging import os import re +import random from datetime import date, datetime, timedelta from typing import List, Optional, Union @@ -83,7 +84,7 @@ "--duration", help="Duration or list of duration", type=int, - default=[15], + default=list(range(1, 31)), multiple=True, ), click.option("--out", default="", help="Download directory"), @@ -104,9 +105,12 @@ def check_bearer_id(bearer_id): return False return True +def sanitize_filename(filename): + # Replace invalid characters with underscores or other valid characters + return filename.strip().replace(":", "-").replace("/", "-").replace("?", "") def get_group_ids(): - params = {"category": "PACK_GROUP", "limit": "-1"} + params = {"limit": "-1"} response = request_url(GROUP_COLLECTION, params=params) data = response["included"] pack_ids = [] @@ -125,24 +129,55 @@ def request_url( if params is None: params = {} url = url.format(id) + log_entry = f"{url}_{params}\n" + + # Check if the request has already been logged + if "activities" in url: + log_entry = f"{url}_{params}\n" + + # Check if the request has already been logged + if os.path.exists("request_log.txt"): + with open("request_log.txt", "r") as log_file: + if log_entry in log_file.read(): + console.print(f"[yellow]Skipping request for {url} with params {params} (already logged).[/yellow]") + return None # Skip if the request is already logged + + # Log the request attempt before making the request + with open("request_log.txt", "a") as log_file: + log_file.write(log_entry) if not mute: logger.info("Sending GET request to {}".format(url)) response = session.get(url, params=params) + try: response_js: dict = response.json() except Exception as e: logger.critical(f"status code {response.status_code}") logger.critical(f"error: {e}") console.print(f"status code {response.status_code}") - raise click.Abort() + return None # Skip the request if it can't parse the response + + if response.status_code == 403: # Handle permission error + console.print( + f"[yellow]Skipping file due to permission error: {response_js.get('title', 'Forbidden')}[/yellow]" + ) + logger.warning(f"Permission error: {response_js.get('detail', '')}") + return None # Skip the forbidden request without raising an error + if response.status_code == 405: # Handle permission error + console.print( + f"[yellow]Skipping file due to Not Found error: {response_js.get('title', 'NotFound')}[/yellow]" + ) + logger.warning(f"Not Fund error: {response_js.get('detail', '')}") + return None # Skip the forbidden request without raising an error + if not response.ok: if "errors" in response_js.keys(): errors = response_js["errors"] logger.error(errors) if response.status_code == 401: console.print( - "\n[red]Unautorized : Unable to login to headspace account[/red]" + "\n[red]Unauthorized: Unable to login to headspace account[/red]" ) console.print("Run [green]headspace login[/green] first.") else: @@ -150,7 +185,8 @@ def request_url( else: console.print(response_js) logger.error(response_js) - raise click.UsageError(f"HTTP error: status-code = {response.status_code}") + return None # Skip other HTTP errors without crashing + return response_js @@ -187,12 +223,17 @@ def get_pack_attributes( author: Optional[int] = None, ): response = request_url(PACK_URL, id=pack_id) + if response is None: # Skip if the request failed + #console.print(f"[yellow]Skipping pack ID {pack_id} due to an error.[/yellow]") + #logger.warning(f"Skipping pack ID {pack_id} due to a failed request.") + return attributes: dict = response["data"]["attributes"] _pack_name: str = attributes["name"] # Because it's only used for filenames, and | is mostly not allowed in filenames _pack_name = _pack_name.replace("|", "-") - if all_: + # Only check the folder for duplicates if the author is unspecified + if all_ and author is None: exists = os.path.exists(os.path.join(out, _pack_name)) if exists: console.print(f"{_pack_name} already exists [red]skipping... [/red]") @@ -220,6 +261,9 @@ def get_pack_attributes( def get_signed_url(response: dict, duration: List[int]) -> dict: + if response is None: # Skip if the response is None + console.print("[yellow]Skipping due to a missing or invalid response.[/yellow]") + return None data = response["included"] signed_links = {} av_duration = [] @@ -240,7 +284,16 @@ def get_signed_url(response: dict, duration: List[int]) -> dict: sign_id = item["id"] # Getting signed URL - direct_url = request_url(SIGN_URL, id=sign_id)["url"] + direct_url_response = request_url(SIGN_URL, id=sign_id) + if direct_url_response is None: + console.print(f"[yellow]Skipping signed URL for {name} due to an error.[/yellow]") + logger.warning(f"Failed to get signed URL for sign ID {sign_id}.") + continue # Skip if URL signing fails + + direct_url = direct_url_response.get("url") + if direct_url is None: + console.print(f"[yellow]No URL found in the signed URL response for {name}. Skipping...[/yellow]") + continue if len(duration) > 1: name += f"({duration_in_min} minutes)" @@ -272,12 +325,36 @@ def download_pack_session( ): params = dict(authorId=author) if author else dict() response = request_url(AUDIO_URL, id=id, params=params) + if response is None: # Skip if the request fails (404, 403, etc.) + #console.print(f"[yellow]Skipping session ID {id} due to a failed request.[/yellow]") + return + + author_display_name = None + # Search for the author entry in the "included" section + for item in response.get("included", []): + if item["type"] == "authors" and item["id"] == str(author): + author_display_name = item["attributes"].get("displayName", None) + break # We found the correct author, no need to continue + + # If the author was not found, skip the session + if author and not author_display_name: + console.print(f"[yellow]Skipping session {id}: No display name found for author ID {author}[/yellow]") + return + author_suffix = f"_{sanitize_filename(author_display_name)}" if author_display_name else "" + signed_url = get_signed_url(response, duration=duration) + if signed_url is None: # Skip if there are no signed URLs + console.print(f"[yellow]No signed URL available for session ID {id}. Skipping...[/yellow]") + return for name, direct_url in signed_url.items(): - if filename_suffix: - name += filename_suffix - download(direct_url, name, filename=name, pack_name=pack_name, out=out) + sanitized_name = name.replace(pack_name, "").strip() # Remove duplication + filename_with_author = f"{pack_name}_{sanitized_name}{author_suffix}.mp3" + if has_been_downloaded(pack_name, name, author_display_name): + console.print(f"[yellow]Skipping {name}, already downloaded.[/yellow]") + continue + download(direct_url, filename_with_author, filename=filename_with_author, pack_name=pack_name, out=out) + save_to_log(pack_name, name, author_display_name) def download_pack_techniques( @@ -326,6 +403,7 @@ def download( media_type = media.headers.get("content-type").split("/")[-1] filename += f".{media_type}" + filename=sanitize_filename(filename) total_length = int(media.headers.get("content-length")) chunk_size = 1024 @@ -333,6 +411,7 @@ def download( raise click.BadOptionUsage("--out", f"'{out}' path not valid") if pack_name: + pack_name=sanitize_filename(pack_name) dir_path = os.path.join(out, pack_name) pattern = r"Session \d+ of (Level \d+)" level = re.findall(pattern, filename) @@ -528,7 +607,7 @@ def pack( logger.info("Downloading all packs") group_ids = get_group_ids() - + random.shuffle(group_ids) for pack_id in group_ids: if pack_id not in excluded: get_pack_attributes( @@ -538,6 +617,7 @@ def pack( no_meditation=no_meditation, no_techniques=no_techniques, all_=True, + author=author, ) else: logger.info(f"Skipping ID: {pack_id} as it is excluded") @@ -640,11 +720,29 @@ def everyday(_from: str, to: str, duration: Union[list, tuple], out: str): response = request_url(EVERYDAY_URL, params=params) signed_url = get_signed_url(response, duration=duration) + + # Increment the date before skipping + _from += timedelta(days=1) + + if signed_url is None: # Skip if there are no signed URLs + console.print(f"[yellow]No signed URL available for {_from}. Skipping...[/yellow]") + continue for name, direct_url in signed_url.items(): download(direct_url, name, filename=name, out=out) - _from += timedelta(days=1) +def save_to_log(pack_name, name, author_display_name): + log_entry = f"{pack_name}_{name}_{author_display_name}\n" + with open("download_log.txt", "a") as log_file: + log_file.write(log_entry) + +def has_been_downloaded(pack_name, name, author_display_name): + log_entry = f"{pack_name}_{name}_{author_display_name}\n" + if os.path.exists("download_log.txt"): + with open("download_log.txt", "r") as log_file: + if log_entry in log_file.read(): + return True + return False @cli.command("login") def login():