Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 110 additions & 12 deletions pyheadspace/__main__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
import re
import random
from datetime import date, datetime, timedelta
from typing import List, Optional, Union

Expand Down Expand Up @@ -83,7 +84,7 @@
"--duration",
help="Duration or list of duration",
type=int,
default=[15],
default=list(range(1, 31)),
multiple=True,
),
click.option("--out", default="", help="Download directory"),
Expand All @@ -104,9 +105,12 @@ def check_bearer_id(bearer_id):
return False
return True

def sanitize_filename(filename):
# Replace invalid characters with underscores or other valid characters
return filename.strip().replace(":", "-").replace("/", "-").replace("?", "")

def get_group_ids():
params = {"category": "PACK_GROUP", "limit": "-1"}
params = {"limit": "-1"}
response = request_url(GROUP_COLLECTION, params=params)
data = response["included"]
pack_ids = []
Expand All @@ -125,32 +129,64 @@ def request_url(
if params is None:
params = {}
url = url.format(id)
log_entry = f"{url}_{params}\n"

# Check if the request has already been logged
if "activities" in url:
log_entry = f"{url}_{params}\n"

# Check if the request has already been logged
if os.path.exists("request_log.txt"):
with open("request_log.txt", "r") as log_file:
if log_entry in log_file.read():
console.print(f"[yellow]Skipping request for {url} with params {params} (already logged).[/yellow]")
return None # Skip if the request is already logged

# Log the request attempt before making the request
with open("request_log.txt", "a") as log_file:
log_file.write(log_entry)
if not mute:
logger.info("Sending GET request to {}".format(url))

response = session.get(url, params=params)

try:
response_js: dict = response.json()
except Exception as e:
logger.critical(f"status code {response.status_code}")
logger.critical(f"error: {e}")
console.print(f"status code {response.status_code}")
raise click.Abort()
return None # Skip the request if it can't parse the response

if response.status_code == 403: # Handle permission error
console.print(
f"[yellow]Skipping file due to permission error: {response_js.get('title', 'Forbidden')}[/yellow]"
)
logger.warning(f"Permission error: {response_js.get('detail', '')}")
return None # Skip the forbidden request without raising an error
if response.status_code == 405: # Handle permission error
console.print(
f"[yellow]Skipping file due to Not Found error: {response_js.get('title', 'NotFound')}[/yellow]"
)
logger.warning(f"Not Fund error: {response_js.get('detail', '')}")
return None # Skip the forbidden request without raising an error

if not response.ok:
if "errors" in response_js.keys():
errors = response_js["errors"]
logger.error(errors)
if response.status_code == 401:
console.print(
"\n[red]Unautorized : Unable to login to headspace account[/red]"
"\n[red]Unauthorized: Unable to login to headspace account[/red]"
)
console.print("Run [green]headspace login[/green] first.")
else:
console.print(errors)
else:
console.print(response_js)
logger.error(response_js)
raise click.UsageError(f"HTTP error: status-code = {response.status_code}")
return None # Skip other HTTP errors without crashing

return response_js


Expand Down Expand Up @@ -187,12 +223,17 @@ def get_pack_attributes(
author: Optional[int] = None,
):
response = request_url(PACK_URL, id=pack_id)
if response is None: # Skip if the request failed
#console.print(f"[yellow]Skipping pack ID {pack_id} due to an error.[/yellow]")
#logger.warning(f"Skipping pack ID {pack_id} due to a failed request.")
return
attributes: dict = response["data"]["attributes"]
_pack_name: str = attributes["name"]
# Because it's only used for filenames, and | is mostly not allowed in filenames
_pack_name = _pack_name.replace("|", "-")

if all_:
# Only check the folder for duplicates if the author is unspecified
if all_ and author is None:
exists = os.path.exists(os.path.join(out, _pack_name))
if exists:
console.print(f"{_pack_name} already exists [red]skipping... [/red]")
Expand Down Expand Up @@ -220,6 +261,9 @@ def get_pack_attributes(


def get_signed_url(response: dict, duration: List[int]) -> dict:
if response is None: # Skip if the response is None
console.print("[yellow]Skipping due to a missing or invalid response.[/yellow]")
return None
data = response["included"]
signed_links = {}
av_duration = []
Expand All @@ -240,7 +284,16 @@ def get_signed_url(response: dict, duration: List[int]) -> dict:

sign_id = item["id"]
# Getting signed URL
direct_url = request_url(SIGN_URL, id=sign_id)["url"]
direct_url_response = request_url(SIGN_URL, id=sign_id)
if direct_url_response is None:
console.print(f"[yellow]Skipping signed URL for {name} due to an error.[/yellow]")
logger.warning(f"Failed to get signed URL for sign ID {sign_id}.")
continue # Skip if URL signing fails

direct_url = direct_url_response.get("url")
if direct_url is None:
console.print(f"[yellow]No URL found in the signed URL response for {name}. Skipping...[/yellow]")
continue
if len(duration) > 1:
name += f"({duration_in_min} minutes)"

Expand Down Expand Up @@ -272,12 +325,36 @@ def download_pack_session(
):
params = dict(authorId=author) if author else dict()
response = request_url(AUDIO_URL, id=id, params=params)
if response is None: # Skip if the request fails (404, 403, etc.)
#console.print(f"[yellow]Skipping session ID {id} due to a failed request.[/yellow]")
return

author_display_name = None
# Search for the author entry in the "included" section
for item in response.get("included", []):
if item["type"] == "authors" and item["id"] == str(author):
author_display_name = item["attributes"].get("displayName", None)
break # We found the correct author, no need to continue

# If the author was not found, skip the session
if author and not author_display_name:
console.print(f"[yellow]Skipping session {id}: No display name found for author ID {author}[/yellow]")
return

author_suffix = f"_{sanitize_filename(author_display_name)}" if author_display_name else ""

signed_url = get_signed_url(response, duration=duration)
if signed_url is None: # Skip if there are no signed URLs
console.print(f"[yellow]No signed URL available for session ID {id}. Skipping...[/yellow]")
return
for name, direct_url in signed_url.items():
if filename_suffix:
name += filename_suffix
download(direct_url, name, filename=name, pack_name=pack_name, out=out)
sanitized_name = name.replace(pack_name, "").strip() # Remove duplication
filename_with_author = f"{pack_name}_{sanitized_name}{author_suffix}.mp3"
if has_been_downloaded(pack_name, name, author_display_name):
console.print(f"[yellow]Skipping {name}, already downloaded.[/yellow]")
continue
download(direct_url, filename_with_author, filename=filename_with_author, pack_name=pack_name, out=out)
save_to_log(pack_name, name, author_display_name)


def download_pack_techniques(
Expand Down Expand Up @@ -326,13 +403,15 @@ def download(

media_type = media.headers.get("content-type").split("/")[-1]
filename += f".{media_type}"
filename=sanitize_filename(filename)
total_length = int(media.headers.get("content-length"))
chunk_size = 1024

if not os.path.exists(out) and os.path.isdir(out):
raise click.BadOptionUsage("--out", f"'{out}' path not valid")

if pack_name:
pack_name=sanitize_filename(pack_name)
dir_path = os.path.join(out, pack_name)
pattern = r"Session \d+ of (Level \d+)"
level = re.findall(pattern, filename)
Expand Down Expand Up @@ -528,7 +607,7 @@ def pack(
logger.info("Downloading all packs")

group_ids = get_group_ids()

random.shuffle(group_ids)
for pack_id in group_ids:
if pack_id not in excluded:
get_pack_attributes(
Expand All @@ -538,6 +617,7 @@ def pack(
no_meditation=no_meditation,
no_techniques=no_techniques,
all_=True,
author=author,
)
else:
logger.info(f"Skipping ID: {pack_id} as it is excluded")
Expand Down Expand Up @@ -640,11 +720,29 @@ def everyday(_from: str, to: str, duration: Union[list, tuple], out: str):
response = request_url(EVERYDAY_URL, params=params)

signed_url = get_signed_url(response, duration=duration)

# Increment the date before skipping
_from += timedelta(days=1)

if signed_url is None: # Skip if there are no signed URLs
console.print(f"[yellow]No signed URL available for {_from}. Skipping...[/yellow]")
continue

for name, direct_url in signed_url.items():
download(direct_url, name, filename=name, out=out)
_from += timedelta(days=1)

def save_to_log(pack_name, name, author_display_name):
log_entry = f"{pack_name}_{name}_{author_display_name}\n"
with open("download_log.txt", "a") as log_file:
log_file.write(log_entry)

def has_been_downloaded(pack_name, name, author_display_name):
log_entry = f"{pack_name}_{name}_{author_display_name}\n"
if os.path.exists("download_log.txt"):
with open("download_log.txt", "r") as log_file:
if log_entry in log_file.read():
return True
return False

@cli.command("login")
def login():
Expand Down