From ee337902d48c2b8bd35c367141ebdd25d271b4db Mon Sep 17 00:00:00 2001 From: Howchie Date: Fri, 10 Jan 2025 21:35:49 +0800 Subject: [PATCH] Add files via upload I don't know how to do pull requests and stuff but I've modified the file to address a few key issues. First, it now adds the author display name to files when an author is selected, and allows you to download more than one author (it skips the duplicate folder check, but still checks for filename duplicates). I removed a category check that limited the packs that were found. I also added some better error handling for the reported cases where things don't work. It still won't download them, but it will keep looking for other packs. By default I've also set duration to be a list of all possible durations but I think it will still function as previous if you manually use a fixed duration argument. There were also some packs that had problematic filenames (at least for windows) so I added a sanitize_filename code as well to strip things like colons and question marks. --- pyheadspace/__main__.py | 122 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 110 insertions(+), 12 deletions(-) diff --git a/pyheadspace/__main__.py b/pyheadspace/__main__.py index 627a846..f958fa4 100644 --- a/pyheadspace/__main__.py +++ b/pyheadspace/__main__.py @@ -1,6 +1,7 @@ import logging import os import re +import random from datetime import date, datetime, timedelta from typing import List, Optional, Union @@ -83,7 +84,7 @@ "--duration", help="Duration or list of duration", type=int, - default=[15], + default=list(range(1, 31)), multiple=True, ), click.option("--out", default="", help="Download directory"), @@ -104,9 +105,12 @@ def check_bearer_id(bearer_id): return False return True +def sanitize_filename(filename): + # Replace invalid characters with underscores or other valid characters + return filename.strip().replace(":", "-").replace("/", "-").replace("?", "") def get_group_ids(): - params = {"category": "PACK_GROUP", "limit": "-1"} + params = {"limit": "-1"} response = request_url(GROUP_COLLECTION, params=params) data = response["included"] pack_ids = [] @@ -125,24 +129,55 @@ def request_url( if params is None: params = {} url = url.format(id) + log_entry = f"{url}_{params}\n" + + # Check if the request has already been logged + if "activities" in url: + log_entry = f"{url}_{params}\n" + + # Check if the request has already been logged + if os.path.exists("request_log.txt"): + with open("request_log.txt", "r") as log_file: + if log_entry in log_file.read(): + console.print(f"[yellow]Skipping request for {url} with params {params} (already logged).[/yellow]") + return None # Skip if the request is already logged + + # Log the request attempt before making the request + with open("request_log.txt", "a") as log_file: + log_file.write(log_entry) if not mute: logger.info("Sending GET request to {}".format(url)) response = session.get(url, params=params) + try: response_js: dict = response.json() except Exception as e: logger.critical(f"status code {response.status_code}") logger.critical(f"error: {e}") console.print(f"status code {response.status_code}") - raise click.Abort() + return None # Skip the request if it can't parse the response + + if response.status_code == 403: # Handle permission error + console.print( + f"[yellow]Skipping file due to permission error: {response_js.get('title', 'Forbidden')}[/yellow]" + ) + logger.warning(f"Permission error: {response_js.get('detail', '')}") + return None # Skip the forbidden request without raising an error + if response.status_code == 405: # Handle permission error + console.print( + f"[yellow]Skipping file due to Not Found error: {response_js.get('title', 'NotFound')}[/yellow]" + ) + logger.warning(f"Not Fund error: {response_js.get('detail', '')}") + return None # Skip the forbidden request without raising an error + if not response.ok: if "errors" in response_js.keys(): errors = response_js["errors"] logger.error(errors) if response.status_code == 401: console.print( - "\n[red]Unautorized : Unable to login to headspace account[/red]" + "\n[red]Unauthorized: Unable to login to headspace account[/red]" ) console.print("Run [green]headspace login[/green] first.") else: @@ -150,7 +185,8 @@ def request_url( else: console.print(response_js) logger.error(response_js) - raise click.UsageError(f"HTTP error: status-code = {response.status_code}") + return None # Skip other HTTP errors without crashing + return response_js @@ -187,12 +223,17 @@ def get_pack_attributes( author: Optional[int] = None, ): response = request_url(PACK_URL, id=pack_id) + if response is None: # Skip if the request failed + #console.print(f"[yellow]Skipping pack ID {pack_id} due to an error.[/yellow]") + #logger.warning(f"Skipping pack ID {pack_id} due to a failed request.") + return attributes: dict = response["data"]["attributes"] _pack_name: str = attributes["name"] # Because it's only used for filenames, and | is mostly not allowed in filenames _pack_name = _pack_name.replace("|", "-") - if all_: + # Only check the folder for duplicates if the author is unspecified + if all_ and author is None: exists = os.path.exists(os.path.join(out, _pack_name)) if exists: console.print(f"{_pack_name} already exists [red]skipping... [/red]") @@ -220,6 +261,9 @@ def get_pack_attributes( def get_signed_url(response: dict, duration: List[int]) -> dict: + if response is None: # Skip if the response is None + console.print("[yellow]Skipping due to a missing or invalid response.[/yellow]") + return None data = response["included"] signed_links = {} av_duration = [] @@ -240,7 +284,16 @@ def get_signed_url(response: dict, duration: List[int]) -> dict: sign_id = item["id"] # Getting signed URL - direct_url = request_url(SIGN_URL, id=sign_id)["url"] + direct_url_response = request_url(SIGN_URL, id=sign_id) + if direct_url_response is None: + console.print(f"[yellow]Skipping signed URL for {name} due to an error.[/yellow]") + logger.warning(f"Failed to get signed URL for sign ID {sign_id}.") + continue # Skip if URL signing fails + + direct_url = direct_url_response.get("url") + if direct_url is None: + console.print(f"[yellow]No URL found in the signed URL response for {name}. Skipping...[/yellow]") + continue if len(duration) > 1: name += f"({duration_in_min} minutes)" @@ -272,12 +325,36 @@ def download_pack_session( ): params = dict(authorId=author) if author else dict() response = request_url(AUDIO_URL, id=id, params=params) + if response is None: # Skip if the request fails (404, 403, etc.) + #console.print(f"[yellow]Skipping session ID {id} due to a failed request.[/yellow]") + return + + author_display_name = None + # Search for the author entry in the "included" section + for item in response.get("included", []): + if item["type"] == "authors" and item["id"] == str(author): + author_display_name = item["attributes"].get("displayName", None) + break # We found the correct author, no need to continue + + # If the author was not found, skip the session + if author and not author_display_name: + console.print(f"[yellow]Skipping session {id}: No display name found for author ID {author}[/yellow]") + return + author_suffix = f"_{sanitize_filename(author_display_name)}" if author_display_name else "" + signed_url = get_signed_url(response, duration=duration) + if signed_url is None: # Skip if there are no signed URLs + console.print(f"[yellow]No signed URL available for session ID {id}. Skipping...[/yellow]") + return for name, direct_url in signed_url.items(): - if filename_suffix: - name += filename_suffix - download(direct_url, name, filename=name, pack_name=pack_name, out=out) + sanitized_name = name.replace(pack_name, "").strip() # Remove duplication + filename_with_author = f"{pack_name}_{sanitized_name}{author_suffix}.mp3" + if has_been_downloaded(pack_name, name, author_display_name): + console.print(f"[yellow]Skipping {name}, already downloaded.[/yellow]") + continue + download(direct_url, filename_with_author, filename=filename_with_author, pack_name=pack_name, out=out) + save_to_log(pack_name, name, author_display_name) def download_pack_techniques( @@ -326,6 +403,7 @@ def download( media_type = media.headers.get("content-type").split("/")[-1] filename += f".{media_type}" + filename=sanitize_filename(filename) total_length = int(media.headers.get("content-length")) chunk_size = 1024 @@ -333,6 +411,7 @@ def download( raise click.BadOptionUsage("--out", f"'{out}' path not valid") if pack_name: + pack_name=sanitize_filename(pack_name) dir_path = os.path.join(out, pack_name) pattern = r"Session \d+ of (Level \d+)" level = re.findall(pattern, filename) @@ -528,7 +607,7 @@ def pack( logger.info("Downloading all packs") group_ids = get_group_ids() - + random.shuffle(group_ids) for pack_id in group_ids: if pack_id not in excluded: get_pack_attributes( @@ -538,6 +617,7 @@ def pack( no_meditation=no_meditation, no_techniques=no_techniques, all_=True, + author=author, ) else: logger.info(f"Skipping ID: {pack_id} as it is excluded") @@ -640,11 +720,29 @@ def everyday(_from: str, to: str, duration: Union[list, tuple], out: str): response = request_url(EVERYDAY_URL, params=params) signed_url = get_signed_url(response, duration=duration) + + # Increment the date before skipping + _from += timedelta(days=1) + + if signed_url is None: # Skip if there are no signed URLs + console.print(f"[yellow]No signed URL available for {_from}. Skipping...[/yellow]") + continue for name, direct_url in signed_url.items(): download(direct_url, name, filename=name, out=out) - _from += timedelta(days=1) +def save_to_log(pack_name, name, author_display_name): + log_entry = f"{pack_name}_{name}_{author_display_name}\n" + with open("download_log.txt", "a") as log_file: + log_file.write(log_entry) + +def has_been_downloaded(pack_name, name, author_display_name): + log_entry = f"{pack_name}_{name}_{author_display_name}\n" + if os.path.exists("download_log.txt"): + with open("download_log.txt", "r") as log_file: + if log_entry in log_file.read(): + return True + return False @cli.command("login") def login():