From b8dddd959be52326f346a2f46fe919ee7b2af38b Mon Sep 17 00:00:00 2001 From: Cas Teeuwen Date: Wed, 12 Nov 2025 21:01:17 +0100 Subject: [PATCH 1/5] WIP: typing rework (formatting too) --- src/spotify_to_tidal/sync.py | 508 +++++++++++++++++++++++++---------- 1 file changed, 365 insertions(+), 143 deletions(-) diff --git a/src/spotify_to_tidal/sync.py b/src/spotify_to_tidal/sync.py index 4d0c482..5055913 100755 --- a/src/spotify_to_tidal/sync.py +++ b/src/spotify_to_tidal/sync.py @@ -1,150 +1,222 @@ #!/usr/bin/env python3 import asyncio -from .cache import failure_cache, track_match_cache import datetime -from difflib import SequenceMatcher -from functools import partial -from typing import Callable, List, Sequence, Set, Mapping import math -import requests import sys -import spotipy -import tidalapi -from .tidalapi_patch import add_multiple_tracks_to_playlist, clear_tidal_playlist, get_all_favorites, get_all_playlists, get_all_playlist_tracks import time -from tqdm.asyncio import tqdm as atqdm -from tqdm import tqdm import traceback import unicodedata -import math +from difflib import SequenceMatcher +from typing import Callable, List, Mapping, Sequence, Set +import requests +import spotipy +import tidalapi +import tidalapi.exceptions as tidal_exceptions +from tidalapi.user import LoggedInUser +from tqdm import tqdm +from tqdm.asyncio import tqdm as atqdm + +from .cache import failure_cache, track_match_cache +from .tidalapi_patch import ( + add_multiple_tracks_to_playlist, + clear_tidal_playlist, + get_all_favorites, + get_all_playlist_tracks, + get_all_playlists, +) from .type import spotify as t_spotify + def normalize(s) -> str: - return unicodedata.normalize('NFD', s).encode('ascii', 'ignore').decode('ascii') + return unicodedata.normalize("NFD", s).encode("ascii", "ignore").decode("ascii") + def simple(input_string: str) -> str: # only take the first part of a string before any hyphens or brackets to account for different versions - return input_string.split('-')[0].strip().split('(')[0].strip().split('[')[0].strip() + return ( + input_string.split("-")[0].strip().split("(")[0].strip().split("[")[0].strip() + ) + def isrc_match(tidal_track: tidalapi.Track, spotify_track) -> bool: if "isrc" in spotify_track["external_ids"]: return tidal_track.isrc == spotify_track["external_ids"]["isrc"] return False + def duration_match(tidal_track: tidalapi.Track, spotify_track, tolerance=2) -> bool: # the duration of the two tracks must be the same to within 2 seconds - return abs(tidal_track.duration - spotify_track['duration_ms']/1000) < tolerance + return abs(tidal_track.duration - spotify_track["duration_ms"] / 1000) < tolerance + def name_match(tidal_track, spotify_track) -> bool: - def exclusion_rule(pattern: str, tidal_track: tidalapi.Track, spotify_track: t_spotify.SpotifyTrack): - spotify_has_pattern = pattern in spotify_track['name'].lower() - tidal_has_pattern = pattern in tidal_track.name.lower() or (not tidal_track.version is None and (pattern in tidal_track.version.lower())) + def exclusion_rule( + pattern: str, tidal_track: tidalapi.Track, spotify_track: t_spotify.SpotifyTrack + ): + spotify_has_pattern = pattern in spotify_track["name"].lower() + if tidal_track.name is None: + return False + tidal_has_pattern = pattern in tidal_track.name.lower() or ( + tidal_track.version is not None and (pattern in tidal_track.version.lower()) + ) return spotify_has_pattern != tidal_has_pattern # handle some edge cases - if exclusion_rule("instrumental", tidal_track, spotify_track): return False - if exclusion_rule("acapella", tidal_track, spotify_track): return False - if exclusion_rule("remix", tidal_track, spotify_track): return False + if exclusion_rule("instrumental", tidal_track, spotify_track): + return False + if exclusion_rule("acapella", tidal_track, spotify_track): + return False + if exclusion_rule("remix", tidal_track, spotify_track): + return False # the simplified version of the Spotify track name must be a substring of the Tidal track name # Try with both un-normalized and then normalized - simple_spotify_track = simple(spotify_track['name'].lower()).split('feat.')[0].strip() - return simple_spotify_track in tidal_track.name.lower() or normalize(simple_spotify_track) in normalize(tidal_track.name.lower()) + simple_spotify_track = ( + simple(spotify_track["name"].lower()).split("feat.")[0].strip() + ) + return simple_spotify_track in tidal_track.name.lower() or normalize( + simple_spotify_track + ) in normalize(tidal_track.name.lower()) + def artist_match(tidal: tidalapi.Track | tidalapi.Album, spotify) -> bool: def split_artist_name(artist: str) -> Sequence[str]: - if '&' in artist: - return artist.split('&') - elif ',' in artist: - return artist.split(',') - else: - return [artist] - - def get_tidal_artists(tidal: tidalapi.Track | tidalapi.Album, do_normalize=False) -> Set[str]: + if "&" in artist: + return artist.split("&") + elif "," in artist: + return artist.split(",") + else: + return [artist] + + def get_tidal_artists( + tidal: tidalapi.Track | tidalapi.Album, do_normalize=False + ) -> Set[str]: result: list[str] = [] + if tidal.artists is None: + return set() + for artist in tidal.artists: if do_normalize: artist_name = normalize(artist.name) else: - artist_name = artist.name + artist_name: str = artist.name if artist.name is not None else "" result.extend(split_artist_name(artist_name)) return set([simple(x.strip().lower()) for x in result]) def get_spotify_artists(spotify, do_normalize=False) -> Set[str]: result: list[str] = [] - for artist in spotify['artists']: + for artist in spotify["artists"]: if do_normalize: - artist_name = normalize(artist['name']) + artist_name = normalize(artist["name"]) else: - artist_name = artist['name'] + artist_name = artist["name"] result.extend(split_artist_name(artist_name)) return set([simple(x.strip().lower()) for x in result]) + # There must be at least one overlapping artist between the Tidal and Spotify track # Try with both un-normalized and then normalized if get_tidal_artists(tidal).intersection(get_spotify_artists(spotify)) != set(): return True - return get_tidal_artists(tidal, True).intersection(get_spotify_artists(spotify, True)) != set() + return ( + get_tidal_artists(tidal, True).intersection(get_spotify_artists(spotify, True)) + != set() + ) + def match(tidal_track, spotify_track) -> bool: - if not spotify_track['id']: return False + if not spotify_track["id"]: + return False return isrc_match(tidal_track, spotify_track) or ( duration_match(tidal_track, spotify_track) and name_match(tidal_track, spotify_track) and artist_match(tidal_track, spotify_track) ) + def test_album_similarity(spotify_album, tidal_album, threshold=0.6): - return SequenceMatcher(None, simple(spotify_album['name']), simple(tidal_album.name)).ratio() >= threshold and artist_match(tidal_album, spotify_album) + return SequenceMatcher( + None, simple(spotify_album["name"]), simple(tidal_album.name) + ).ratio() >= threshold and artist_match(tidal_album, spotify_album) -async def tidal_search(spotify_track, rate_limiter, tidal_session: tidalapi.Session) -> tidalapi.Track | None: + +async def tidal_search( + spotify_track, rate_limiter, tidal_session: tidalapi.Session +) -> tidalapi.Track | None: def _search_for_track_in_album(): # search for album name and first album artist - if 'album' in spotify_track and 'artists' in spotify_track['album'] and len(spotify_track['album']['artists']): - query = simple(spotify_track['album']['name']) + " " + simple(spotify_track['album']['artists'][0]['name']) + if ( + "album" in spotify_track + and "artists" in spotify_track["album"] + and len(spotify_track["album"]["artists"]) + ): + query = ( + simple(spotify_track["album"]["name"]) + + " " + + simple(spotify_track["album"]["artists"][0]["name"]) + ) album_result = tidal_session.search(query, models=[tidalapi.album.Album]) - for album in album_result['albums']: - if album.num_tracks >= spotify_track['track_number'] and test_album_similarity(spotify_track['album'], album): + for album in album_result["albums"]: + if album.num_tracks >= spotify_track[ + "track_number" + ] and test_album_similarity(spotify_track["album"], album): album_tracks = album.tracks() - if len(album_tracks) < spotify_track['track_number']: - assert( not len(album_tracks) == album.num_tracks ) # incorrect metadata :( + if len(album_tracks) < spotify_track["track_number"]: + assert ( + not len(album_tracks) == album.num_tracks + ) # incorrect metadata :( continue - track = album_tracks[spotify_track['track_number'] - 1] + track = album_tracks[spotify_track["track_number"] - 1] if match(track, spotify_track): - failure_cache.remove_match_failure(spotify_track['id']) + failure_cache.remove_match_failure(spotify_track["id"]) return track def _search_for_standalone_track(): # if album search fails then search for track name and first artist - query = simple(spotify_track['name']) + ' ' + simple(spotify_track['artists'][0]['name']) - for track in tidal_session.search(query, models=[tidalapi.media.Track])['tracks']: + query = ( + simple(spotify_track["name"]) + + " " + + simple(spotify_track["artists"][0]["name"]) + ) + for track in tidal_session.search(query, models=[tidalapi.media.Track])[ + "tracks" + ]: if match(track, spotify_track): - failure_cache.remove_match_failure(spotify_track['id']) + failure_cache.remove_match_failure(spotify_track["id"]) return track + await rate_limiter.acquire() - album_search = await asyncio.to_thread( _search_for_track_in_album ) + album_search = await asyncio.to_thread(_search_for_track_in_album) if album_search: return album_search await rate_limiter.acquire() - track_search = await asyncio.to_thread( _search_for_standalone_track ) + track_search = await asyncio.to_thread(_search_for_standalone_track) if track_search: return track_search # if none of the search modes succeeded then store the track id to the failure cache - failure_cache.cache_match_failure(spotify_track['id']) + failure_cache.cache_match_failure(spotify_track["id"]) + async def repeat_on_request_error(function, *args, remaining=5, **kwargs): # utility to repeat calling the function up to 5 times if an exception is thrown try: return await function(*args, **kwargs) - except (tidalapi.exceptions.TooManyRequests, requests.exceptions.RequestException, spotipy.exceptions.SpotifyException) as e: + except ( + tidal_exceptions.TooManyRequests, + requests.exceptions.RequestException, + spotipy.exceptions.SpotifyException, + ) as e: if remaining: print(f"{str(e)} occurred, retrying {remaining} times") else: print(f"{str(e)} could not be recovered") - if isinstance(e, requests.exceptions.RequestException) and not e.response is None: + if ( + isinstance(e, requests.exceptions.RequestException) + and e.response is not None + ): print(f"Response message: {e.response.text}") print(f"Response headers: {e.response.headers}") @@ -153,57 +225,96 @@ async def repeat_on_request_error(function, *args, remaining=5, **kwargs): print(f"The following arguments were provided:\n\n {str(args)}") print(traceback.format_exc()) sys.exit(1) - sleep_schedule = {5: 1, 4:10, 3:60, 2:5*60, 1:10*60} # sleep variable length of time depending on retry number + sleep_schedule = { + 5: 1, + 4: 10, + 3: 60, + 2: 5 * 60, + 1: 10 * 60, + } # sleep variable length of time depending on retry number time.sleep(sleep_schedule.get(remaining, 1)) - return await repeat_on_request_error(function, *args, remaining=remaining-1, **kwargs) + return await repeat_on_request_error( + function, *args, remaining=remaining - 1, **kwargs + ) async def _fetch_all_from_spotify_in_chunks(fetch_function: Callable) -> List[dict]: output = [] results = fetch_function(0) - output.extend([item['track'] for item in results['items'] if item['track'] is not None]) + output.extend( + [item["track"] for item in results["items"] if item["track"] is not None] + ) # Get all the remaining tracks in parallel - if results['next']: - offsets = [results['limit'] * n for n in range(1, math.ceil(results['total'] / results['limit']))] + if results["next"]: + offsets = [ + results["limit"] * n + for n in range(1, math.ceil(results["total"] / results["limit"])) + ] extra_results = await atqdm.gather( *[asyncio.to_thread(fetch_function, offset) for offset in offsets], - desc="Fetching additional data chunks" + desc="Fetching additional data chunks", ) for extra_result in extra_results: - output.extend([item['track'] for item in extra_result['items'] if item['track'] is not None]) + output.extend( + [ + item["track"] + for item in extra_result["items"] + if item["track"] is not None + ] + ) return output -async def get_tracks_from_spotify_playlist(spotify_session: spotipy.Spotify, spotify_playlist): +async def get_tracks_from_spotify_playlist( + spotify_session: spotipy.Spotify, spotify_playlist +): def _get_tracks_from_spotify_playlist(offset: int, playlist_id: str): fields = "next,total,limit,items(track(name,album(name,artists),artists,track_number,duration_ms,id,external_ids(isrc))),type" - return spotify_session.playlist_tracks(playlist_id=playlist_id, fields=fields, offset=offset) + return spotify_session.playlist_tracks( + playlist_id=playlist_id, fields=fields, offset=offset + ) print(f"Loading tracks from Spotify playlist '{spotify_playlist['name']}'") - items = await repeat_on_request_error( _fetch_all_from_spotify_in_chunks, lambda offset: _get_tracks_from_spotify_playlist(offset=offset, playlist_id=spotify_playlist["id"])) - track_filter = lambda item: item.get('type', 'track') == 'track' # type may be 'episode' also - sanity_filter = lambda item: ('album' in item - and 'name' in item['album'] - and 'artists' in item['album'] - and len(item['album']['artists']) > 0 - and item['album']['artists'][0]['name'] is not None) + items = await repeat_on_request_error( + _fetch_all_from_spotify_in_chunks, + lambda offset: _get_tracks_from_spotify_playlist( + offset=offset, playlist_id=spotify_playlist["id"] + ), + ) + def track_filter(item): + return item.get("type", "track") == "track" # type may be 'episode' also + def sanity_filter(item): + return ( + "album" in item + and "name" in item["album"] + and "artists" in item["album"] + and len(item["album"]["artists"]) > 0 + and item["album"]["artists"][0]["name"] is not None + ) return list(filter(sanity_filter, filter(track_filter, items))) -def populate_track_match_cache(spotify_tracks_: Sequence[t_spotify.SpotifyTrack], tidal_tracks_: Sequence[tidalapi.Track]): - """ Populate the track match cache with all the existing tracks in Tidal playlist corresponding to Spotify playlist """ + +def populate_track_match_cache( + spotify_tracks_: Sequence[t_spotify.SpotifyTrack], + tidal_tracks_: Sequence[tidalapi.Track], +): + """Populate the track match cache with all the existing tracks in Tidal playlist corresponding to Spotify playlist""" + def _populate_one_track_from_spotify(spotify_track: t_spotify.SpotifyTrack): for idx, tidal_track in list(enumerate(tidal_tracks)): if tidal_track.available and match(tidal_track, spotify_track): - track_match_cache.insert((spotify_track['id'], tidal_track.id)) + assert tidal_track.id is not None + track_match_cache.insert((spotify_track["id"], tidal_track.id)) tidal_tracks.pop(idx) return def _populate_one_track_from_tidal(tidal_track: tidalapi.Track): for idx, spotify_track in list(enumerate(spotify_tracks)): if tidal_track.available and match(tidal_track, spotify_track): - track_match_cache.insert((spotify_track['id'], tidal_track.id)) + assert tidal_track.id is not None + track_match_cache.insert((spotify_track["id"], tidal_track.id)) spotify_tracks.pop(idx) return @@ -218,46 +329,71 @@ def _populate_one_track_from_tidal(tidal_track: tidalapi.Track): for track in spotify_tracks: _populate_one_track_from_spotify(track) -def get_new_spotify_tracks(spotify_tracks: Sequence[t_spotify.SpotifyTrack]) -> List[t_spotify.SpotifyTrack]: - ''' Extracts only the tracks that have not already been seen in our Tidal caches ''' + +def get_new_spotify_tracks( + spotify_tracks: Sequence[t_spotify.SpotifyTrack], +) -> List[t_spotify.SpotifyTrack]: + """Extracts only the tracks that have not already been seen in our Tidal caches""" results = [] for spotify_track in spotify_tracks: - if not spotify_track['id']: continue - if not track_match_cache.get(spotify_track['id']) and not failure_cache.has_match_failure(spotify_track['id']): + if not spotify_track["id"]: + continue + if not track_match_cache.get( + spotify_track["id"] + ) and not failure_cache.has_match_failure(spotify_track["id"]): results.append(spotify_track) return results -def get_tracks_for_new_tidal_playlist(spotify_tracks: Sequence[t_spotify.SpotifyTrack]) -> Sequence[int]: - ''' gets list of corresponding tidal track ids for each spotify track, ignoring duplicates ''' + +def get_tracks_for_new_tidal_playlist( + spotify_tracks: Sequence[t_spotify.SpotifyTrack], +) -> Sequence[int]: + """gets list of corresponding tidal track ids for each spotify track, ignoring duplicates""" output = [] seen_tracks = set() for spotify_track in spotify_tracks: - if not spotify_track['id']: continue - tidal_id = track_match_cache.get(spotify_track['id']) + if not spotify_track["id"]: + continue + tidal_id = track_match_cache.get(spotify_track["id"]) if tidal_id: if tidal_id in seen_tracks: - track_name = spotify_track['name'] - artist_names = ', '.join([artist['name'] for artist in spotify_track['artists']]) - print(f'Duplicate found: Track "{track_name}" by {artist_names} will be ignored') + track_name = spotify_track["name"] + artist_names = ", ".join( + [artist["name"] for artist in spotify_track["artists"]] + ) + print( + f'Duplicate found: Track "{track_name}" by {artist_names} will be ignored' + ) else: output.append(tidal_id) seen_tracks.add(tidal_id) return output -async def search_new_tracks_on_tidal(tidal_session: tidalapi.Session, spotify_tracks: Sequence[t_spotify.SpotifyTrack], playlist_name: str, config: dict): - """ Generic function for searching for each item in a list of Spotify tracks which have not already been seen and adding them to the cache """ + +async def search_new_tracks_on_tidal( + tidal_session: tidalapi.Session, + spotify_tracks: Sequence[t_spotify.SpotifyTrack], + playlist_name: str, + config: dict, +): + """Generic function for searching for each item in a list of Spotify tracks which have not already been seen and adding them to the cache""" + async def _run_rate_limiter(semaphore): - ''' Leaky bucket algorithm for rate limiting. Periodically releases items from semaphore at rate_limit''' - _sleep_time = config.get('max_concurrency', 10)/config.get('rate_limit', 10)/4 # aim to sleep approx time to drain 1/4 of 'bucket' + """Leaky bucket algorithm for rate limiting. Periodically releases items from semaphore at rate_limit""" + _sleep_time = ( + config.get("max_concurrency", 10) / config.get("rate_limit", 10) / 4 + ) # aim to sleep approx time to drain 1/4 of 'bucket' t0 = datetime.datetime.now() while True: await asyncio.sleep(_sleep_time) t = datetime.datetime.now() dt = (t - t0).total_seconds() - new_items = round(config.get('rate_limit', 10)*dt) + new_items = round(config.get("rate_limit", 10) * dt) t0 = t - [semaphore.release() for i in range(new_items)] # leak new_items from the 'bucket' + [ + semaphore.release() for i in range(new_items) + ] # leak new_items from the 'bucket' # Extract the new tracks that do not already exist in the old tidal tracklist tracks_to_search = get_new_spotify_tracks(spotify_tracks) @@ -265,20 +401,32 @@ async def _run_rate_limiter(semaphore): return # Search for each of the tracks on Tidal concurrently - task_description = "Searching Tidal for {}/{} tracks in Spotify playlist '{}'".format(len(tracks_to_search), len(spotify_tracks), playlist_name) - semaphore = asyncio.Semaphore(config.get('max_concurrency', 10)) + task_description = ( + "Searching Tidal for {}/{} tracks in Spotify playlist '{}'".format( + len(tracks_to_search), len(spotify_tracks), playlist_name + ) + ) + semaphore = asyncio.Semaphore(config.get("max_concurrency", 10)) rate_limiter_task = asyncio.create_task(_run_rate_limiter(semaphore)) - search_results = await atqdm.gather( *[ repeat_on_request_error(tidal_search, t, semaphore, tidal_session) for t in tracks_to_search ], desc=task_description ) + search_results = await atqdm.gather( + *[ + repeat_on_request_error(tidal_search, t, semaphore, tidal_session) + for t in tracks_to_search + ], + desc=task_description, + ) rate_limiter_task.cancel() # Add the search results to the cache song404 = [] for idx, spotify_track in enumerate(tracks_to_search): if search_results[idx]: - track_match_cache.insert( (spotify_track['id'], search_results[idx].id) ) + track_match_cache.insert((spotify_track["id"], search_results[idx].id)) else: - song404.append(f"{spotify_track['id']}: {','.join([a['name'] for a in spotify_track['artists']])} - {spotify_track['name']}") - color = ('\033[91m', '\033[0m') + song404.append( + f"{spotify_track['id']}: {','.join([a['name'] for a in spotify_track['artists']])} - {spotify_track['name']}" + ) + color = ("\033[91m", "\033[0m") print(color[0] + "Could not find the track " + song404[-1] + color[1]) file_name = "songs not found.txt" header = f"==========================\nPlaylist: {playlist_name}\n==========================\n" @@ -287,42 +435,68 @@ async def _run_rate_limiter(semaphore): for song in song404: file.write(f"{song}\n") - -async def sync_playlist(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, spotify_playlist, tidal_playlist: tidalapi.Playlist | None, config: dict): - """ sync given playlist to tidal """ + +async def sync_playlist( + spotify_session: spotipy.Spotify, + tidal_session: tidalapi.Session, + spotify_playlist, + tidal_playlist: tidalapi.Playlist | None, + config: dict, +): + """sync given playlist to tidal""" # Get the tracks from both Spotify and Tidal, creating a new Tidal playlist if necessary - spotify_tracks = await get_tracks_from_spotify_playlist(spotify_session, spotify_playlist) + spotify_tracks = await get_tracks_from_spotify_playlist( + spotify_session, spotify_playlist + ) if len(spotify_tracks) == 0: - return # nothing to do + return # nothing to do if tidal_playlist: old_tidal_tracks = await get_all_playlist_tracks(tidal_playlist) else: - print(f"No playlist found on Tidal corresponding to Spotify playlist: '{spotify_playlist['name']}', creating new playlist") - tidal_playlist = tidal_session.user.create_playlist(spotify_playlist['name'], spotify_playlist['description']) + print( + f"No playlist found on Tidal corresponding to Spotify playlist: '{spotify_playlist['name']}', creating new playlist" + ) + assert isinstance(tidal_session.user, LoggedInUser) + tidal_playlist = tidal_session.user.create_playlist( + spotify_playlist["name"], spotify_playlist["description"] + ) old_tidal_tracks = [] # Extract the new tracks from the playlist that we haven't already seen before populate_track_match_cache(spotify_tracks, old_tidal_tracks) - await search_new_tracks_on_tidal(tidal_session, spotify_tracks, spotify_playlist['name'], config) + await search_new_tracks_on_tidal( + tidal_session, spotify_tracks, spotify_playlist["name"], config + ) new_tidal_track_ids = get_tracks_for_new_tidal_playlist(spotify_tracks) # Update the Tidal playlist if there are changes old_tidal_track_ids = [t.id for t in old_tidal_tracks] if new_tidal_track_ids == old_tidal_track_ids: print("No changes to write to Tidal playlist") - elif new_tidal_track_ids[:len(old_tidal_track_ids)] == old_tidal_track_ids: + elif new_tidal_track_ids[: len(old_tidal_track_ids)] == old_tidal_track_ids: # Append new tracks to the existing playlist if possible - add_multiple_tracks_to_playlist(tidal_playlist, new_tidal_track_ids[len(old_tidal_track_ids):]) + assert isinstance(tidal_playlist, tidalapi.UserPlaylist) + add_multiple_tracks_to_playlist( + tidal_playlist, new_tidal_track_ids[len(old_tidal_track_ids) :] + ) else: # Erase old playlist and add new tracks from scratch if any reordering occured clear_tidal_playlist(tidal_playlist) add_multiple_tracks_to_playlist(tidal_playlist, new_tidal_track_ids) -async def sync_favorites(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config: dict): - """ sync user favorites to tidal """ + +async def sync_favorites( + spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config: dict +): + """sync user favorites to tidal""" + async def get_tracks_from_spotify_favorites() -> List[dict]: - _get_favorite_tracks = lambda offset: spotify_session.current_user_saved_tracks(offset=offset) - tracks = await repeat_on_request_error( _fetch_all_from_spotify_in_chunks, _get_favorite_tracks) + _get_favorite_tracks = lambda offset: spotify_session.current_user_saved_tracks( + offset=offset + ) + tracks = await repeat_on_request_error( + _fetch_all_from_spotify_in_chunks, _get_favorite_tracks + ) tracks.reverse() return tracks @@ -330,77 +504,126 @@ def get_new_tidal_favorites() -> List[int]: existing_favorite_ids = set([track.id for track in old_tidal_tracks]) new_ids = [] for spotify_track in spotify_tracks: - match_id = track_match_cache.get(spotify_track['id']) - if match_id and not match_id in existing_favorite_ids: + match_id = track_match_cache.get(spotify_track["id"]) + if match_id and match_id not in existing_favorite_ids: new_ids.append(match_id) return new_ids print("Loading favorite tracks from Spotify") spotify_tracks = await get_tracks_from_spotify_favorites() print("Loading existing favorite tracks from Tidal") - old_tidal_tracks = await get_all_favorites(tidal_session.user.favorites, order='DATE') + old_tidal_tracks = await get_all_favorites( + tidal_session.user.favorites, order="DATE" + ) populate_track_match_cache(spotify_tracks, old_tidal_tracks) await search_new_tracks_on_tidal(tidal_session, spotify_tracks, "Favorites", config) new_tidal_favorite_ids = get_new_tidal_favorites() if new_tidal_favorite_ids: - for tidal_id in tqdm(new_tidal_favorite_ids, desc="Adding new tracks to Tidal favorites"): + for tidal_id in tqdm( + new_tidal_favorite_ids, desc="Adding new tracks to Tidal favorites" + ): tidal_session.user.favorites.add_track(tidal_id) else: print("No new tracks to add to Tidal favorites") -def sync_playlists_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, playlists, config: dict): - for spotify_playlist, tidal_playlist in playlists: - # sync the spotify playlist to tidal - asyncio.run(sync_playlist(spotify_session, tidal_session, spotify_playlist, tidal_playlist, config) ) -def sync_favorites_wrapper(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config): - asyncio.run(main=sync_favorites(spotify_session=spotify_session, tidal_session=tidal_session, config=config)) +def sync_playlists_wrapper( + spotify_session: spotipy.Spotify, + tidal_session: tidalapi.Session, + playlists, + config: dict, +): + for spotify_playlist, tidal_playlist in playlists: + # sync the spotify playlist to tidal + asyncio.run( + sync_playlist( + spotify_session, tidal_session, spotify_playlist, tidal_playlist, config + ) + ) + + +def sync_favorites_wrapper( + spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config +): + asyncio.run( + main=sync_favorites( + spotify_session=spotify_session, tidal_session=tidal_session, config=config + ) + ) + -def get_tidal_playlists_wrapper(tidal_session: tidalapi.Session) -> Mapping[str, tidalapi.Playlist]: +def get_tidal_playlists_wrapper( + tidal_session: tidalapi.Session, +) -> Mapping[str, tidalapi.Playlist]: tidal_playlists = asyncio.run(get_all_playlists(tidal_session.user)) return {playlist.name: playlist for playlist in tidal_playlists} -def pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists: Mapping[str, tidalapi.Playlist]): - if spotify_playlist['name'] in tidal_playlists: - # if there's an existing tidal playlist with the name of the current playlist then use that - tidal_playlist = tidal_playlists[spotify_playlist['name']] - return (spotify_playlist, tidal_playlist) + +def pick_tidal_playlist_for_spotify_playlist( + spotify_playlist, tidal_playlists: Mapping[str, tidalapi.Playlist] +): + if spotify_playlist["name"] in tidal_playlists: + # if there's an existing tidal playlist with the name of the current playlist then use that + tidal_playlist = tidal_playlists[spotify_playlist["name"]] + return (spotify_playlist, tidal_playlist) else: - return (spotify_playlist, None) + return (spotify_playlist, None) + -def get_user_playlist_mappings(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config): +def get_user_playlist_mappings( + spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config +): results = [] spotify_playlists = asyncio.run(get_playlists_from_spotify(spotify_session, config)) tidal_playlists = get_tidal_playlists_wrapper(tidal_session) for spotify_playlist in spotify_playlists: - results.append( pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists) ) + results.append( + pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists) + ) return results + async def get_playlists_from_spotify(spotify_session: spotipy.Spotify, config): # get all the playlists from the Spotify account playlists = [] print("Loading Spotify playlists") first_results = spotify_session.current_user_playlists() - exclude_list = set([x.split(':')[-1] for x in config.get('excluded_playlists', [])]) - playlists.extend([p for p in first_results['items']]) - user_id = spotify_session.current_user()['id'] + exclude_list = set([x.split(":")[-1] for x in config.get("excluded_playlists", [])]) + playlists.extend([p for p in first_results["items"]]) + user_id = spotify_session.current_user()["id"] # get all the remaining playlists in parallel - if first_results['next']: - offsets = [ first_results['limit'] * n for n in range(1, math.ceil(first_results['total']/first_results['limit'])) ] - extra_results = await atqdm.gather( *[asyncio.to_thread(spotify_session.current_user_playlists, offset=offset) for offset in offsets ] ) + if first_results["next"]: + offsets = [ + first_results["limit"] * n + for n in range( + 1, math.ceil(first_results["total"] / first_results["limit"]) + ) + ] + extra_results = await atqdm.gather( + *[ + asyncio.to_thread(spotify_session.current_user_playlists, offset=offset) + for offset in offsets + ] + ) for extra_result in extra_results: - playlists.extend([p for p in extra_result['items']]) + playlists.extend([p for p in extra_result["items"]]) # filter out playlists that don't belong to us or are on the exclude list - my_playlist_filter = lambda p: p and p['owner']['id'] == user_id - exclude_filter = lambda p: not p['id'] in exclude_list - return list(filter( exclude_filter, filter( my_playlist_filter, playlists ))) + my_playlist_filter = lambda p: p and p["owner"]["id"] == user_id + exclude_filter = lambda p: p["id"] not in exclude_list + return list(filter(exclude_filter, filter(my_playlist_filter, playlists))) + -def get_playlists_from_config(spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config): +def get_playlists_from_config( + spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config +): # get the list of playlist sync mappings from the configuration file def get_playlist_ids(config): - return [(item['spotify_id'], item['tidal_id']) for item in config['sync_playlists']] + return [ + (item["spotify_id"], item["tidal_id"]) for item in config["sync_playlists"] + ] + output = [] for spotify_id, tidal_id in get_playlist_ids(config=config): try: @@ -415,4 +638,3 @@ def get_playlist_ids(config): raise e output.append((spotify_playlist, tidal_playlist)) return output - From e1f5f74bb59fa450ae19c14b0a745d2e67186e7f Mon Sep 17 00:00:00 2001 From: Cas Teeuwen Date: Wed, 12 Nov 2025 21:46:43 +0100 Subject: [PATCH 2/5] fixing all type errors --- src/spotify_to_tidal/__main__.py | 65 +++++++++---- src/spotify_to_tidal/auth.py | 71 ++++++++------ src/spotify_to_tidal/cache.py | 73 +++++++++----- src/spotify_to_tidal/sync.py | 33 +++++-- src/spotify_to_tidal/tidalapi_patch.py | 127 ++++++++++++++++++------- src/spotify_to_tidal/type/__init__.py | 7 +- src/spotify_to_tidal/type/config.py | 2 +- src/spotify_to_tidal/type/spotify.py | 3 +- 8 files changed, 264 insertions(+), 117 deletions(-) diff --git a/src/spotify_to_tidal/__main__.py b/src/spotify_to_tidal/__main__.py index 8a95fc6..7e39f5b 100644 --- a/src/spotify_to_tidal/__main__.py +++ b/src/spotify_to_tidal/__main__.py @@ -1,21 +1,31 @@ -import yaml import argparse import sys -from . import sync as _sync +import yaml + from . import auth as _auth +from . import sync as _sync + def main(): parser = argparse.ArgumentParser() - parser.add_argument('--config', default='config.yml', help='location of the config file') - parser.add_argument('--uri', help='synchronize a specific URI instead of the one in the config') - parser.add_argument('--sync-favorites', action=argparse.BooleanOptionalAction, help='synchronize the favorites') + parser.add_argument( + "--config", default="config.yml", help="location of the config file" + ) + parser.add_argument( + "--uri", help="synchronize a specific URI instead of the one in the config" + ) + parser.add_argument( + "--sync-favorites", + action=argparse.BooleanOptionalAction, + help="synchronize the favorites", + ) args = parser.parse_args() - with open(args.config, 'r') as f: + with open(args.config, "r") as f: config = yaml.safe_load(f) print("Opening Spotify session") - spotify_session = _auth.open_spotify_session(config['spotify']) + spotify_session = _auth.open_spotify_session(config["spotify"]) print("Opening Tidal session") tidal_session = _auth.open_tidal_session() if not tidal_session.check_login(): @@ -24,23 +34,44 @@ def main(): # if a playlist ID is explicitly provided as a command line argument then use that spotify_playlist = spotify_session.playlist(args.uri) tidal_playlists = _sync.get_tidal_playlists_wrapper(tidal_session) - tidal_playlist = _sync.pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists) - _sync.sync_playlists_wrapper(spotify_session, tidal_session, [tidal_playlist], config) - sync_favorites = args.sync_favorites # only sync favorites if command line argument explicitly passed + tidal_playlist = _sync.pick_tidal_playlist_for_spotify_playlist( + spotify_playlist, tidal_playlists + ) + _sync.sync_playlists_wrapper( + spotify_session, tidal_session, [tidal_playlist], config + ) + sync_favorites = ( + args.sync_favorites + ) # only sync favorites if command line argument explicitly passed elif args.sync_favorites: - sync_favorites = True # sync only the favorites - elif config.get('sync_playlists', None): + sync_favorites = True # sync only the favorites + elif config.get("sync_playlists", None): # if the config contains a sync_playlists list of mappings then use that - _sync.sync_playlists_wrapper(spotify_session, tidal_session, _sync.get_playlists_from_config(spotify_session, tidal_session, config), config) - sync_favorites = args.sync_favorites is None and config.get('sync_favorites_default', True) + _sync.sync_playlists_wrapper( + spotify_session, + tidal_session, + _sync.get_playlists_from_config(spotify_session, tidal_session, config), + config, + ) + sync_favorites = args.sync_favorites is None and config.get( + "sync_favorites_default", True + ) else: # otherwise sync all the user playlists in the Spotify account and favorites unless explicitly disabled - _sync.sync_playlists_wrapper(spotify_session, tidal_session, _sync.get_user_playlist_mappings(spotify_session, tidal_session, config), config) - sync_favorites = args.sync_favorites is None and config.get('sync_favorites_default', True) + _sync.sync_playlists_wrapper( + spotify_session, + tidal_session, + _sync.get_user_playlist_mappings(spotify_session, tidal_session, config), + config, + ) + sync_favorites = args.sync_favorites is None and config.get( + "sync_favorites_default", True + ) if sync_favorites: _sync.sync_favorites_wrapper(spotify_session, tidal_session, config) -if __name__ == '__main__': + +if __name__ == "__main__": main() sys.exit(0) diff --git a/src/spotify_to_tidal/auth.py b/src/spotify_to_tidal/auth.py index cb1762b..9324151 100644 --- a/src/spotify_to_tidal/auth.py +++ b/src/spotify_to_tidal/auth.py @@ -1,36 +1,42 @@ #!/usr/bin/env python3 import sys +import webbrowser + import spotipy import tidalapi -import webbrowser import yaml -__all__ = [ - 'open_spotify_session', - 'open_tidal_session' -] +__all__ = ["open_spotify_session", "open_tidal_session"] + +SPOTIFY_SCOPES = "playlist-read-private, user-library-read" -SPOTIFY_SCOPES = 'playlist-read-private, user-library-read' def open_spotify_session(config) -> spotipy.Spotify: - credentials_manager = spotipy.SpotifyOAuth(username=config['username'], - scope=SPOTIFY_SCOPES, - client_id=config['client_id'], - client_secret=config['client_secret'], - redirect_uri=config['redirect_uri'], - requests_timeout=2, - open_browser=config.get('open_browser', True)) + credentials_manager = spotipy.SpotifyOAuth( + username=config["username"], + scope=SPOTIFY_SCOPES, + client_id=config["client_id"], + client_secret=config["client_secret"], + redirect_uri=config["redirect_uri"], + requests_timeout=2, + open_browser=config.get("open_browser", True), + ) try: credentials_manager.get_access_token(as_dict=False) except spotipy.SpotifyOauthError: - sys.exit("Error opening Spotify sesion; could not get token for username: ".format(config['username'])) + sys.exit( + "Error opening Spotify sesion; could not get token for username: ".format( + config["username"] + ) + ) return spotipy.Spotify(oauth_manager=credentials_manager) -def open_tidal_session(config = None) -> tidalapi.Session: + +def open_tidal_session(config=None) -> tidalapi.Session: try: - with open('.session.yml', 'r') as session_file: + with open(".session.yml", "r") as session_file: previous_session = yaml.safe_load(session_file) except OSError: previous_session = None @@ -41,25 +47,30 @@ def open_tidal_session(config = None) -> tidalapi.Session: session = tidalapi.Session() if previous_session: try: - if session.load_oauth_session(token_type= previous_session['token_type'], - access_token=previous_session['access_token'], - refresh_token=previous_session['refresh_token'] ): + if session.load_oauth_session( + token_type=previous_session["token_type"], + access_token=previous_session["access_token"], + refresh_token=previous_session["refresh_token"], + ): return session except Exception as e: - print("Error loading previous Tidal Session: \n" + str(e) ) + print("Error loading previous Tidal Session: \n" + str(e)) login, future = session.login_oauth() - print('Login with the webbrowser: ' + login.verification_uri_complete) + print("Login with the webbrowser: " + login.verification_uri_complete) url = login.verification_uri_complete - if not url.startswith('https://'): - url = 'https://' + url + if not url.startswith("https://"): + url = "https://" + url webbrowser.open(url) future.result() - with open('.session.yml', 'w') as f: - yaml.dump( {'session_id': session.session_id, - 'token_type': session.token_type, - 'access_token': session.access_token, - 'refresh_token': session.refresh_token}, f ) + with open(".session.yml", "w") as f: + yaml.dump( + { + "session_id": session.session_id, + "token_type": session.token_type, + "access_token": session.access_token, + "refresh_token": session.refresh_token, + }, + f, + ) return session - - diff --git a/src/spotify_to_tidal/cache.py b/src/spotify_to_tidal/cache.py index 1ccb300..c0b50fc 100644 --- a/src/spotify_to_tidal/cache.py +++ b/src/spotify_to_tidal/cache.py @@ -1,27 +1,42 @@ import datetime +from typing import Dict + import sqlalchemy -from sqlalchemy import Table, Column, String, DateTime, MetaData, insert, select, update, delete -from typing import Dict, List, Sequence, Set, Mapping +from sqlalchemy import ( + Column, + DateTime, + MetaData, + String, + Table, + delete, + insert, + select, + update, +) class MatchFailureDatabase: - """ + """ sqlite database of match failures which persists between runs this can be used concurrently between multiple processes """ - def __init__(self, filename='.cache.db'): + def __init__(self, filename=".cache.db"): self.engine = sqlalchemy.create_engine(f"sqlite:///{filename}") meta = MetaData() - self.match_failures = Table('match_failures', meta, - Column('track_id', String, - primary_key=True), - Column('insert_time', DateTime), - Column('next_retry', DateTime), - sqlite_autoincrement=False) + self.match_failures = Table( + "match_failures", + meta, + Column("track_id", String, primary_key=True), + Column("insert_time", DateTime), + Column("next_retry", DateTime), + sqlite_autoincrement=False, + ) meta.create_all(self.engine) - def _get_next_retry_time(self, insert_time: datetime.datetime | None = None) -> datetime.datetime: + def _get_next_retry_time( + self, insert_time: datetime.datetime | None = None + ) -> datetime.datetime: if insert_time: # double interval on each retry interval = 2 * (datetime.datetime.now() - insert_time) @@ -30,26 +45,36 @@ def _get_next_retry_time(self, insert_time: datetime.datetime | None = None) -> return datetime.datetime.now() + interval def cache_match_failure(self, track_id: str): - """ notifies that matching failed for the given track_id """ + """notifies that matching failed for the given track_id""" fetch_statement = select(self.match_failures).where( - self.match_failures.c.track_id == track_id) + self.match_failures.c.track_id == track_id + ) with self.engine.connect() as connection: with connection.begin(): # Either update the next_retry time if track_id already exists, otherwise create a new entry - existing_failure = connection.execute( - fetch_statement).fetchone() + existing_failure = connection.execute(fetch_statement).fetchone() if existing_failure: - update_statement = update(self.match_failures).where( - self.match_failures.c.track_id == track_id).values(next_retry=self._get_next_retry_time()) + update_statement = ( + update(self.match_failures) + .where(self.match_failures.c.track_id == track_id) + .values(next_retry=self._get_next_retry_time()) + ) connection.execute(update_statement) else: - connection.execute(insert(self.match_failures), { - "track_id": track_id, "insert_time": datetime.datetime.now(), "next_retry": self._get_next_retry_time()}) + connection.execute( + insert(self.match_failures), + { + "track_id": track_id, + "insert_time": datetime.datetime.now(), + "next_retry": self._get_next_retry_time(), + }, + ) def has_match_failure(self, track_id: str) -> bool: - """ checks if there was a recent search for which matching failed with the given track_id """ + """checks if there was a recent search for which matching failed with the given track_id""" statement = select(self.match_failures.c.next_retry).where( - self.match_failures.c.track_id == track_id) + self.match_failures.c.track_id == track_id + ) with self.engine.connect() as connection: match_failure = connection.execute(statement).fetchone() if match_failure: @@ -57,9 +82,10 @@ def has_match_failure(self, track_id: str) -> bool: return False def remove_match_failure(self, track_id: str): - """ removes match failure from the database """ + """removes match failure from the database""" statement = delete(self.match_failures).where( - self.match_failures.c.track_id == track_id) + self.match_failures.c.track_id == track_id + ) with self.engine.connect() as connection: with connection.begin(): connection.execute(statement) @@ -70,6 +96,7 @@ class TrackMatchCache: Non-persistent mapping of spotify ids -> tidal_ids This should NOT be accessed concurrently from multiple processes """ + data: Dict[str, int] = {} def get(self, track_id: str) -> int | None: diff --git a/src/spotify_to_tidal/sync.py b/src/spotify_to_tidal/sync.py index 5055913..6f5d95b 100755 --- a/src/spotify_to_tidal/sync.py +++ b/src/spotify_to_tidal/sync.py @@ -283,8 +283,10 @@ def _get_tracks_from_spotify_playlist(offset: int, playlist_id: str): offset=offset, playlist_id=spotify_playlist["id"] ), ) + def track_filter(item): return item.get("type", "track") == "track" # type may be 'episode' also + def sanity_filter(item): return ( "album" in item @@ -293,6 +295,7 @@ def sanity_filter(item): and len(item["album"]["artists"]) > 0 and item["album"]["artists"][0]["name"] is not None ) + return list(filter(sanity_filter, filter(track_filter, items))) @@ -481,6 +484,7 @@ async def sync_playlist( ) else: # Erase old playlist and add new tracks from scratch if any reordering occured + assert isinstance(tidal_playlist, tidalapi.UserPlaylist) clear_tidal_playlist(tidal_playlist) add_multiple_tracks_to_playlist(tidal_playlist, new_tidal_track_ids) @@ -490,10 +494,10 @@ async def sync_favorites( ): """sync user favorites to tidal""" - async def get_tracks_from_spotify_favorites() -> List[dict]: - _get_favorite_tracks = lambda offset: spotify_session.current_user_saved_tracks( - offset=offset - ) + async def get_tracks_from_spotify_favorites() -> List[t_spotify.SpotifyTrack]: + def _get_favorite_tracks(offset): + return spotify_session.current_user_saved_tracks(offset=offset) + tracks = await repeat_on_request_error( _fetch_all_from_spotify_in_chunks, _get_favorite_tracks ) @@ -512,6 +516,7 @@ def get_new_tidal_favorites() -> List[int]: print("Loading favorite tracks from Spotify") spotify_tracks = await get_tracks_from_spotify_favorites() print("Loading existing favorite tracks from Tidal") + assert isinstance(tidal_session.user, LoggedInUser) old_tidal_tracks = await get_all_favorites( tidal_session.user.favorites, order="DATE" ) @@ -522,7 +527,7 @@ def get_new_tidal_favorites() -> List[int]: for tidal_id in tqdm( new_tidal_favorite_ids, desc="Adding new tracks to Tidal favorites" ): - tidal_session.user.favorites.add_track(tidal_id) + tidal_session.user.favorites.add_track(str(tidal_id)) else: print("No new tracks to add to Tidal favorites") @@ -555,8 +560,13 @@ def sync_favorites_wrapper( def get_tidal_playlists_wrapper( tidal_session: tidalapi.Session, ) -> Mapping[str, tidalapi.Playlist]: + assert isinstance(tidal_session.user, LoggedInUser) tidal_playlists = asyncio.run(get_all_playlists(tidal_session.user)) - return {playlist.name: playlist for playlist in tidal_playlists} + return { + playlist.name: playlist + for playlist in tidal_playlists + if playlist.name is not None + } def pick_tidal_playlist_for_spotify_playlist( @@ -589,8 +599,9 @@ async def get_playlists_from_spotify(spotify_session: spotipy.Spotify, config): print("Loading Spotify playlists") first_results = spotify_session.current_user_playlists() exclude_list = set([x.split(":")[-1] for x in config.get("excluded_playlists", [])]) + assert isinstance(first_results, dict) playlists.extend([p for p in first_results["items"]]) - user_id = spotify_session.current_user()["id"] + user_id = spotify_session.current_user()["id"] # type: ignore # get all the remaining playlists in parallel if first_results["next"]: @@ -610,8 +621,12 @@ async def get_playlists_from_spotify(spotify_session: spotipy.Spotify, config): playlists.extend([p for p in extra_result["items"]]) # filter out playlists that don't belong to us or are on the exclude list - my_playlist_filter = lambda p: p and p["owner"]["id"] == user_id - exclude_filter = lambda p: p["id"] not in exclude_list + def my_playlist_filter(p): + return p and p["owner"]["id"] == user_id + + def exclude_filter(p): + return p["id"] not in exclude_list + return list(filter(exclude_filter, filter(my_playlist_filter, playlists))) diff --git a/src/spotify_to_tidal/tidalapi_patch.py b/src/spotify_to_tidal/tidalapi_patch.py index 9da23cc..4c6ab92 100644 --- a/src/spotify_to_tidal/tidalapi_patch.py +++ b/src/spotify_to_tidal/tidalapi_patch.py @@ -1,79 +1,142 @@ import asyncio import math -from typing import List +from typing import Iterable, List, Sequence + import tidalapi from tqdm import tqdm from tqdm.asyncio import tqdm as atqdm -def _remove_indices_from_playlist(playlist: tidalapi.UserPlaylist, indices: List[int]): - headers = {'If-None-Match': playlist._etag} + +def _remove_indices_from_playlist( + playlist: tidalapi.UserPlaylist, indices: Iterable[int] +): + headers = {"If-None-Match": playlist._etag} index_string = ",".join(map(str, indices)) - playlist.request.request('DELETE', (playlist._base_url + '/items/%s') % (playlist.id, index_string), headers=headers) + playlist.request.request( + "DELETE", + (playlist._base_url + "/items/%s") % (playlist.id, index_string), + headers=headers, #type: ignore + ) playlist._reparse() -def clear_tidal_playlist(playlist: tidalapi.UserPlaylist, chunk_size: int=20): - with tqdm(desc="Erasing existing tracks from Tidal playlist", total=playlist.num_tracks) as progress: + +def clear_tidal_playlist(playlist: tidalapi.UserPlaylist, chunk_size: int = 20): + with tqdm( + desc="Erasing existing tracks from Tidal playlist", total=playlist.num_tracks + ) as progress: while playlist.num_tracks: indices = range(min(playlist.num_tracks, chunk_size)) _remove_indices_from_playlist(playlist, indices) progress.update(len(indices)) - -def add_multiple_tracks_to_playlist(playlist: tidalapi.UserPlaylist, track_ids: List[int], chunk_size: int=20): + + +def add_multiple_tracks_to_playlist( + playlist: tidalapi.UserPlaylist, track_ids: Sequence[int], chunk_size: int = 20 +): offset = 0 - with tqdm(desc="Adding new tracks to Tidal playlist", total=len(track_ids)) as progress: + with tqdm( + desc="Adding new tracks to Tidal playlist", total=len(track_ids) + ) as progress: while offset < len(track_ids): count = min(chunk_size, len(track_ids) - offset) - playlist.add(track_ids[offset:offset+chunk_size]) + playlist.add(track_ids[offset : offset + chunk_size]) offset += count progress.update(count) -async def _get_all_chunks(url, session, parser, params={}) -> List[tidalapi.Track]: - """ - Helper function to get all items from a Tidal endpoint in parallel - The main library doesn't provide the total number of items or expose the raw json, so use this wrapper instead + +async def _get_all_chunks(url, session, parser, params={}) -> List[tidalapi.Track | tidalapi.Playlist]: + """ + Helper function to get all items from a Tidal endpoint in parallel + The main library doesn't provide the total number of items or expose the raw json, so use this wrapper instead """ - def _make_request(offset: int=0): + + def _make_request(offset: int = 0): new_params = params - new_params['offset'] = offset + new_params["offset"] = offset return session.request.map_request(url, params=new_params) first_chunk_raw = _make_request() - limit = first_chunk_raw['limit'] - total = first_chunk_raw['totalNumberOfItems'] + limit = first_chunk_raw["limit"] + total = first_chunk_raw["totalNumberOfItems"] items = session.request.map_json(first_chunk_raw, parse=parser) if len(items) < total: - offsets = [limit * n for n in range(1, math.ceil(total/limit))] + offsets = [limit * n for n in range(1, math.ceil(total / limit))] extra_results = await atqdm.gather( - *[asyncio.to_thread(lambda offset: session.request.map_json(_make_request(offset), parse=parser), offset) for offset in offsets], - desc="Fetching additional data chunks" + *[ + asyncio.to_thread( + lambda offset: session.request.map_json( + _make_request(offset), parse=parser + ), + offset, + ) + for offset in offsets + ], + desc="Fetching additional data chunks", ) for extra_result in extra_results: items.extend(extra_result) return items -async def get_all_favorites(favorites: tidalapi.Favorites, order: str = "NAME", order_direction: str = "ASC", chunk_size: int=100) -> List[tidalapi.Track]: - """ Get all favorites from Tidal playlist in chunks """ +async def _get_all_track_chunks(*args, **kwargs) -> List[tidalapi.Track]: + result = asyncio.run(_get_all_chunks(*args, **kwargs)) + if not all(isinstance(item, tidalapi.Track) for item in result): + raise TypeError("Expected all items to be of type tidalapi.Track") + return result #type: ignore + +async def _get_all_playlist_chunks(*args, **kwargs) -> List[tidalapi.Playlist]: + result = asyncio.run(_get_all_chunks(*args, **kwargs)) + if not all(isinstance(item, tidalapi.Playlist) for item in result): + raise TypeError("Expected all items to be of type tidalapi.Playlist") + return result #type: ignore + +async def get_all_favorites( + favorites: tidalapi.Favorites, + order: str = "NAME", + order_direction: str = "ASC", + chunk_size: int = 100, +) -> List[tidalapi.Track]: + """Get all favorites from Tidal playlist in chunks""" params = { "limit": chunk_size, "order": order, "orderDirection": order_direction, } - return await _get_all_chunks(f"{favorites.base_url}/tracks", session=favorites.session, parser=favorites.session.parse_track, params=params) + return await _get_all_track_chunks( + f"{favorites.base_url}/tracks", + session=favorites.session, + parser=favorites.session.parse_track, + params=params, + ) -async def get_all_playlists(user: tidalapi.User, chunk_size: int=10) -> List[tidalapi.Playlist]: - """ Get all user playlists from Tidal in chunks """ - print(f"Loading playlists from Tidal user") + +async def get_all_playlists( + user: tidalapi.User, chunk_size: int = 10 +) -> List[tidalapi.Playlist]: + """Get all user playlists from Tidal in chunks""" + print("Loading playlists from Tidal user") params = { "limit": chunk_size, } - return await _get_all_chunks(f"users/{user.id}/playlists", session=user.session, parser=user.playlist.parse_factory, params=params) + return await _get_all_playlist_chunks( + f"users/{user.id}/playlists", + session=user.session, + parser=user.playlist.parse_factory, + params=params, + ) + -async def get_all_playlist_tracks(playlist: tidalapi.Playlist, chunk_size: int=20) -> List[tidalapi.Track]: - """ Get all tracks from Tidal playlist in chunks """ +async def get_all_playlist_tracks( + playlist: tidalapi.Playlist, chunk_size: int = 20 +) -> List[tidalapi.Track]: + """Get all tracks from Tidal playlist in chunks""" params = { "limit": chunk_size, } print(f"Loading tracks from Tidal playlist '{playlist.name}'") - return await _get_all_chunks(f"{playlist._base_url%playlist.id}/tracks", session=playlist.session, parser=playlist.session.parse_track, params=params) - + return await _get_all_track_chunks( + f"{playlist._base_url % playlist.id}/tracks", + session=playlist.session, + parser=playlist.session.parse_track, + params=params, + ) diff --git a/src/spotify_to_tidal/type/__init__.py b/src/spotify_to_tidal/type/__init__.py index 13e01b6..8991d75 100644 --- a/src/spotify_to_tidal/type/__init__.py +++ b/src/spotify_to_tidal/type/__init__.py @@ -1,9 +1,9 @@ -from .config import SpotifyConfig, TidalConfig, PlaylistConfig, SyncConfig -from .spotify import SpotifyTrack - from spotipy import Spotify from tidalapi import Session, Track +from .config import PlaylistConfig, SpotifyConfig, SyncConfig, TidalConfig +from .spotify import SpotifyTrack + TidalID = str SpotifyID = str TidalSession = Session @@ -15,7 +15,6 @@ "TidalConfig", "PlaylistConfig", "SyncConfig", - "TidalPlaylist", "TidalID", "SpotifyID", "SpotifySession", diff --git a/src/spotify_to_tidal/type/config.py b/src/spotify_to_tidal/type/config.py index 01fd1ad..5599fcb 100644 --- a/src/spotify_to_tidal/type/config.py +++ b/src/spotify_to_tidal/type/config.py @@ -1,4 +1,4 @@ -from typing import TypedDict, Literal, List, Optional +from typing import List, Literal, Optional, TypedDict class SpotifyConfig(TypedDict): diff --git a/src/spotify_to_tidal/type/spotify.py b/src/spotify_to_tidal/type/spotify.py index 3970ad5..8e96936 100644 --- a/src/spotify_to_tidal/type/spotify.py +++ b/src/spotify_to_tidal/type/spotify.py @@ -1,5 +1,6 @@ +from typing import Dict, List, Literal, Mapping, Optional, TypedDict + from spotipy import Spotify -from typing import TypedDict, List, Dict, Mapping, Literal, Optional class SpotifyImage(TypedDict): From 503dd067fb5be73c5b97790de2532adc944274b8 Mon Sep 17 00:00:00 2001 From: Cas Teeuwen Date: Wed, 12 Nov 2025 22:44:17 +0100 Subject: [PATCH 3/5] fix: correct string formatting in Spotify session error message --- src/spotify_to_tidal/auth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/spotify_to_tidal/auth.py b/src/spotify_to_tidal/auth.py index 9324151..e90ac40 100644 --- a/src/spotify_to_tidal/auth.py +++ b/src/spotify_to_tidal/auth.py @@ -26,7 +26,7 @@ def open_spotify_session(config) -> spotipy.Spotify: credentials_manager.get_access_token(as_dict=False) except spotipy.SpotifyOauthError: sys.exit( - "Error opening Spotify sesion; could not get token for username: ".format( + "Error opening Spotify sesion; could not get token for username: {}".format( config["username"] ) ) From e49e8c60a144fe8c0972693ac76a995c4bcdd9ee Mon Sep 17 00:00:00 2001 From: Cas Teeuwen Date: Thu, 13 Nov 2025 18:46:31 +0100 Subject: [PATCH 4/5] add return types, fix pyright issues --- src/spotify_to_tidal/__main__.py | 5 +- src/spotify_to_tidal/cache.py | 8 +- src/spotify_to_tidal/sync.py | 126 ++++++++++++++++--------- src/spotify_to_tidal/tidalapi_patch.py | 19 ++-- 4 files changed, 102 insertions(+), 56 deletions(-) diff --git a/src/spotify_to_tidal/__main__.py b/src/spotify_to_tidal/__main__.py index 7e39f5b..eaa6877 100644 --- a/src/spotify_to_tidal/__main__.py +++ b/src/spotify_to_tidal/__main__.py @@ -1,5 +1,6 @@ import argparse import sys +from typing import Any, Mapping import yaml @@ -7,7 +8,7 @@ from . import sync as _sync -def main(): +def main() -> None: parser = argparse.ArgumentParser() parser.add_argument( "--config", default="config.yml", help="location of the config file" @@ -32,7 +33,7 @@ def main(): sys.exit("Could not connect to Tidal") if args.uri: # if a playlist ID is explicitly provided as a command line argument then use that - spotify_playlist = spotify_session.playlist(args.uri) + spotify_playlist: Mapping[str, Any] = spotify_session.playlist(args.uri) #type: ignore tidal_playlists = _sync.get_tidal_playlists_wrapper(tidal_session) tidal_playlist = _sync.pick_tidal_playlist_for_spotify_playlist( spotify_playlist, tidal_playlists diff --git a/src/spotify_to_tidal/cache.py b/src/spotify_to_tidal/cache.py index c0b50fc..c2363ac 100644 --- a/src/spotify_to_tidal/cache.py +++ b/src/spotify_to_tidal/cache.py @@ -21,7 +21,7 @@ class MatchFailureDatabase: this can be used concurrently between multiple processes """ - def __init__(self, filename=".cache.db"): + def __init__(self, filename: str = ".cache.db") -> None: self.engine = sqlalchemy.create_engine(f"sqlite:///{filename}") meta = MetaData() self.match_failures = Table( @@ -44,7 +44,7 @@ def _get_next_retry_time( interval = datetime.timedelta(days=7) return datetime.datetime.now() + interval - def cache_match_failure(self, track_id: str): + def cache_match_failure(self, track_id: str) -> None: """notifies that matching failed for the given track_id""" fetch_statement = select(self.match_failures).where( self.match_failures.c.track_id == track_id @@ -81,7 +81,7 @@ def has_match_failure(self, track_id: str) -> bool: return match_failure.next_retry > datetime.datetime.now() return False - def remove_match_failure(self, track_id: str): + def remove_match_failure(self, track_id: str) -> None: """removes match failure from the database""" statement = delete(self.match_failures).where( self.match_failures.c.track_id == track_id @@ -102,7 +102,7 @@ class TrackMatchCache: def get(self, track_id: str) -> int | None: return self.data.get(track_id, None) - def insert(self, mapping: tuple[str, int]): + def insert(self, mapping: tuple[str, int]) -> None: self.data[mapping[0]] = mapping[1] diff --git a/src/spotify_to_tidal/sync.py b/src/spotify_to_tidal/sync.py index 6f5d95b..42025dc 100755 --- a/src/spotify_to_tidal/sync.py +++ b/src/spotify_to_tidal/sync.py @@ -8,7 +8,17 @@ import traceback import unicodedata from difflib import SequenceMatcher -from typing import Callable, List, Mapping, Sequence, Set +from typing import ( + Any, + Awaitable, + Callable, + List, + Mapping, + Sequence, + Set, + Tuple, + TypeVar, +) import requests import spotipy @@ -29,7 +39,7 @@ from .type import spotify as t_spotify -def normalize(s) -> str: +def normalize(s: str) -> str: return unicodedata.normalize("NFD", s).encode("ascii", "ignore").decode("ascii") @@ -40,18 +50,22 @@ def simple(input_string: str) -> str: ) -def isrc_match(tidal_track: tidalapi.Track, spotify_track) -> bool: +def isrc_match(tidal_track: tidalapi.Track, spotify_track: Mapping[str, Any]) -> bool: if "isrc" in spotify_track["external_ids"]: return tidal_track.isrc == spotify_track["external_ids"]["isrc"] return False -def duration_match(tidal_track: tidalapi.Track, spotify_track, tolerance=2) -> bool: +def duration_match( + tidal_track: tidalapi.Track, spotify_track: Mapping[str, Any], tolerance: int = 2 +) -> bool: # the duration of the two tracks must be the same to within 2 seconds return abs(tidal_track.duration - spotify_track["duration_ms"] / 1000) < tolerance -def name_match(tidal_track, spotify_track) -> bool: +def name_match( + tidal_track: tidalapi.Track, spotify_track: t_spotify.SpotifyTrack +) -> bool: def exclusion_rule( pattern: str, tidal_track: tidalapi.Track, spotify_track: t_spotify.SpotifyTrack ): @@ -73,15 +87,19 @@ def exclusion_rule( # the simplified version of the Spotify track name must be a substring of the Tidal track name # Try with both un-normalized and then normalized + assert tidal_track.name is not None simple_spotify_track = ( simple(spotify_track["name"].lower()).split("feat.")[0].strip() ) + assert tidal_track.name is not None return simple_spotify_track in tidal_track.name.lower() or normalize( simple_spotify_track ) in normalize(tidal_track.name.lower()) -def artist_match(tidal: tidalapi.Track | tidalapi.Album, spotify) -> bool: +def artist_match( + tidal: tidalapi.Track | tidalapi.Album, spotify: Mapping[str, Any] +) -> bool: def split_artist_name(artist: str) -> Sequence[str]: if "&" in artist: return artist.split("&") @@ -97,11 +115,13 @@ def get_tidal_artists( if tidal.artists is None: return set() + for artist in tidal.artists: + assert artist.name is not None if do_normalize: artist_name = normalize(artist.name) else: - artist_name: str = artist.name if artist.name is not None else "" + artist_name = artist.name result.extend(split_artist_name(artist_name)) return set([simple(x.strip().lower()) for x in result]) @@ -125,7 +145,7 @@ def get_spotify_artists(spotify, do_normalize=False) -> Set[str]: ) -def match(tidal_track, spotify_track) -> bool: +def match(tidal_track: tidalapi.Track, spotify_track: t_spotify.SpotifyTrack) -> bool: if not spotify_track["id"]: return False return isrc_match(tidal_track, spotify_track) or ( @@ -135,16 +155,23 @@ def match(tidal_track, spotify_track) -> bool: ) -def test_album_similarity(spotify_album, tidal_album, threshold=0.6): +def test_album_similarity( + spotify_album: Mapping[str, Any], + tidal_album: tidalapi.Album, + threshold: float = 0.6, +) -> bool: + assert tidal_album.name is not None return SequenceMatcher( None, simple(spotify_album["name"]), simple(tidal_album.name) ).ratio() >= threshold and artist_match(tidal_album, spotify_album) async def tidal_search( - spotify_track, rate_limiter, tidal_session: tidalapi.Session + spotify_track: t_spotify.SpotifyTrack, + rate_limiter: asyncio.Semaphore, + tidal_session: tidalapi.Session, ) -> tidalapi.Track | None: - def _search_for_track_in_album(): + def _search_for_track_in_album() -> tidalapi.Track | None: # search for album name and first album artist if ( "album" in spotify_track @@ -158,6 +185,7 @@ def _search_for_track_in_album(): ) album_result = tidal_session.search(query, models=[tidalapi.album.Album]) for album in album_result["albums"]: + assert album.num_tracks is not None if album.num_tracks >= spotify_track[ "track_number" ] and test_album_similarity(spotify_track["album"], album): @@ -172,7 +200,7 @@ def _search_for_track_in_album(): failure_cache.remove_match_failure(spotify_track["id"]) return track - def _search_for_standalone_track(): + def _search_for_standalone_track() -> tidalapi.Track | None: # if album search fails then search for track name and first artist query = ( simple(spotify_track["name"]) @@ -199,7 +227,12 @@ def _search_for_standalone_track(): failure_cache.cache_match_failure(spotify_track["id"]) -async def repeat_on_request_error(function, *args, remaining=5, **kwargs): +T = TypeVar("T") + + +async def repeat_on_request_error( + function: Callable[..., Awaitable[T]], *args: Any, remaining: int = 5, **kwargs: Any +) -> T: # utility to repeat calling the function up to 5 times if an exception is thrown try: return await function(*args, **kwargs) @@ -269,12 +302,14 @@ async def _fetch_all_from_spotify_in_chunks(fetch_function: Callable) -> List[di async def get_tracks_from_spotify_playlist( spotify_session: spotipy.Spotify, spotify_playlist -): - def _get_tracks_from_spotify_playlist(offset: int, playlist_id: str): +) -> List[t_spotify.SpotifyTrack]: + def _get_tracks_from_spotify_playlist( + offset: int, playlist_id: str + ) -> Mapping[str, Any]: fields = "next,total,limit,items(track(name,album(name,artists),artists,track_number,duration_ms,id,external_ids(isrc))),type" return spotify_session.playlist_tracks( playlist_id=playlist_id, fields=fields, offset=offset - ) + ) #type: ignore print(f"Loading tracks from Spotify playlist '{spotify_playlist['name']}'") items = await repeat_on_request_error( @@ -284,10 +319,10 @@ def _get_tracks_from_spotify_playlist(offset: int, playlist_id: str): ), ) - def track_filter(item): + def track_filter(item: Mapping[str, Any]) -> bool: return item.get("type", "track") == "track" # type may be 'episode' also - def sanity_filter(item): + def sanity_filter(item: Mapping[str, Any]) -> bool: return ( "album" in item and "name" in item["album"] @@ -295,17 +330,18 @@ def sanity_filter(item): and len(item["album"]["artists"]) > 0 and item["album"]["artists"][0]["name"] is not None ) - + + items = [t_spotify.SpotifyTrack(**item) for item in items] return list(filter(sanity_filter, filter(track_filter, items))) def populate_track_match_cache( spotify_tracks_: Sequence[t_spotify.SpotifyTrack], tidal_tracks_: Sequence[tidalapi.Track], -): +) -> None: """Populate the track match cache with all the existing tracks in Tidal playlist corresponding to Spotify playlist""" - def _populate_one_track_from_spotify(spotify_track: t_spotify.SpotifyTrack): + def _populate_one_track_from_spotify(spotify_track: t_spotify.SpotifyTrack) -> None: for idx, tidal_track in list(enumerate(tidal_tracks)): if tidal_track.available and match(tidal_track, spotify_track): assert tidal_track.id is not None @@ -313,7 +349,7 @@ def _populate_one_track_from_spotify(spotify_track: t_spotify.SpotifyTrack): tidal_tracks.pop(idx) return - def _populate_one_track_from_tidal(tidal_track: tidalapi.Track): + def _populate_one_track_from_tidal(tidal_track: tidalapi.Track) -> None: for idx, spotify_track in list(enumerate(spotify_tracks)): if tidal_track.available and match(tidal_track, spotify_track): assert tidal_track.id is not None @@ -379,10 +415,10 @@ async def search_new_tracks_on_tidal( spotify_tracks: Sequence[t_spotify.SpotifyTrack], playlist_name: str, config: dict, -): +) -> None: """Generic function for searching for each item in a list of Spotify tracks which have not already been seen and adding them to the cache""" - async def _run_rate_limiter(semaphore): + async def _run_rate_limiter(semaphore: asyncio.Semaphore) -> None: """Leaky bucket algorithm for rate limiting. Periodically releases items from semaphore at rate_limit""" _sleep_time = ( config.get("max_concurrency", 10) / config.get("rate_limit", 10) / 4 @@ -445,7 +481,7 @@ async def sync_playlist( spotify_playlist, tidal_playlist: tidalapi.Playlist | None, config: dict, -): +) -> None: """sync given playlist to tidal""" # Get the tracks from both Spotify and Tidal, creating a new Tidal playlist if necessary spotify_tracks = await get_tracks_from_spotify_playlist( @@ -491,16 +527,17 @@ async def sync_playlist( async def sync_favorites( spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config: dict -): +) -> None: """sync user favorites to tidal""" async def get_tracks_from_spotify_favorites() -> List[t_spotify.SpotifyTrack]: - def _get_favorite_tracks(offset): - return spotify_session.current_user_saved_tracks(offset=offset) + def _get_favorite_tracks(offset: int) -> Mapping[str, Any]: + return spotify_session.current_user_saved_tracks(offset=offset) #type: ignore tracks = await repeat_on_request_error( _fetch_all_from_spotify_in_chunks, _get_favorite_tracks ) + tracks = [t_spotify.SpotifyTrack(**item) for item in tracks] tracks.reverse() return tracks @@ -535,9 +572,9 @@ def get_new_tidal_favorites() -> List[int]: def sync_playlists_wrapper( spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, - playlists, + playlists: Sequence[Tuple[Mapping[str, Any], tidalapi.Playlist | None]], config: dict, -): +) -> None: for spotify_playlist, tidal_playlist in playlists: # sync the spotify playlist to tidal asyncio.run( @@ -548,8 +585,8 @@ def sync_playlists_wrapper( def sync_favorites_wrapper( - spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config -): + spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config: dict +) -> None: asyncio.run( main=sync_favorites( spotify_session=spotify_session, tidal_session=tidal_session, config=config @@ -570,8 +607,9 @@ def get_tidal_playlists_wrapper( def pick_tidal_playlist_for_spotify_playlist( - spotify_playlist, tidal_playlists: Mapping[str, tidalapi.Playlist] -): + spotify_playlist: Mapping[str, Any], + tidal_playlists: Mapping[str, tidalapi.Playlist], +) -> Tuple[Mapping[str, Any], tidalapi.Playlist | None]: if spotify_playlist["name"] in tidal_playlists: # if there's an existing tidal playlist with the name of the current playlist then use that tidal_playlist = tidal_playlists[spotify_playlist["name"]] @@ -581,8 +619,8 @@ def pick_tidal_playlist_for_spotify_playlist( def get_user_playlist_mappings( - spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config -): + spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config: dict +) -> List[Tuple[Mapping[str, Any], tidalapi.Playlist | None]]: results = [] spotify_playlists = asyncio.run(get_playlists_from_spotify(spotify_session, config)) tidal_playlists = get_tidal_playlists_wrapper(tidal_session) @@ -593,7 +631,9 @@ def get_user_playlist_mappings( return results -async def get_playlists_from_spotify(spotify_session: spotipy.Spotify, config): +async def get_playlists_from_spotify( + spotify_session: spotipy.Spotify, config: dict +) -> List[Mapping[str, Any]]: # get all the playlists from the Spotify account playlists = [] print("Loading Spotify playlists") @@ -621,20 +661,20 @@ async def get_playlists_from_spotify(spotify_session: spotipy.Spotify, config): playlists.extend([p for p in extra_result["items"]]) # filter out playlists that don't belong to us or are on the exclude list - def my_playlist_filter(p): - return p and p["owner"]["id"] == user_id + def my_playlist_filter(p: Mapping[str, Any]) -> bool: + return bool(p and p["owner"]["id"] == user_id) - def exclude_filter(p): + def exclude_filter(p: Mapping[str, Any]) -> bool: return p["id"] not in exclude_list return list(filter(exclude_filter, filter(my_playlist_filter, playlists))) def get_playlists_from_config( - spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config -): + spotify_session: spotipy.Spotify, tidal_session: tidalapi.Session, config: dict +) -> List[Tuple[Mapping[str, Any], tidalapi.Playlist]]: # get the list of playlist sync mappings from the configuration file - def get_playlist_ids(config): + def get_playlist_ids(config: dict) -> List[Tuple[str, str]]: return [ (item["spotify_id"], item["tidal_id"]) for item in config["sync_playlists"] ] diff --git a/src/spotify_to_tidal/tidalapi_patch.py b/src/spotify_to_tidal/tidalapi_patch.py index 4c6ab92..6d04519 100644 --- a/src/spotify_to_tidal/tidalapi_patch.py +++ b/src/spotify_to_tidal/tidalapi_patch.py @@ -9,18 +9,18 @@ def _remove_indices_from_playlist( playlist: tidalapi.UserPlaylist, indices: Iterable[int] -): +) -> None: headers = {"If-None-Match": playlist._etag} index_string = ",".join(map(str, indices)) playlist.request.request( "DELETE", (playlist._base_url + "/items/%s") % (playlist.id, index_string), - headers=headers, #type: ignore + headers=headers, # type: ignore ) playlist._reparse() -def clear_tidal_playlist(playlist: tidalapi.UserPlaylist, chunk_size: int = 20): +def clear_tidal_playlist(playlist: tidalapi.UserPlaylist, chunk_size: int = 20) -> None: with tqdm( desc="Erasing existing tracks from Tidal playlist", total=playlist.num_tracks ) as progress: @@ -32,7 +32,7 @@ def clear_tidal_playlist(playlist: tidalapi.UserPlaylist, chunk_size: int = 20): def add_multiple_tracks_to_playlist( playlist: tidalapi.UserPlaylist, track_ids: Sequence[int], chunk_size: int = 20 -): +) -> None: offset = 0 with tqdm( desc="Adding new tracks to Tidal playlist", total=len(track_ids) @@ -44,7 +44,9 @@ def add_multiple_tracks_to_playlist( progress.update(count) -async def _get_all_chunks(url, session, parser, params={}) -> List[tidalapi.Track | tidalapi.Playlist]: +async def _get_all_chunks( + url, session, parser, params={} +) -> List[tidalapi.Track | tidalapi.Playlist]: """ Helper function to get all items from a Tidal endpoint in parallel The main library doesn't provide the total number of items or expose the raw json, so use this wrapper instead @@ -78,17 +80,20 @@ def _make_request(offset: int = 0): items.extend(extra_result) return items + async def _get_all_track_chunks(*args, **kwargs) -> List[tidalapi.Track]: result = asyncio.run(_get_all_chunks(*args, **kwargs)) if not all(isinstance(item, tidalapi.Track) for item in result): raise TypeError("Expected all items to be of type tidalapi.Track") - return result #type: ignore + return result # type: ignore + async def _get_all_playlist_chunks(*args, **kwargs) -> List[tidalapi.Playlist]: result = asyncio.run(_get_all_chunks(*args, **kwargs)) if not all(isinstance(item, tidalapi.Playlist) for item in result): raise TypeError("Expected all items to be of type tidalapi.Playlist") - return result #type: ignore + return result # type: ignore + async def get_all_favorites( favorites: tidalapi.Favorites, From 8b3bb244c234a1c832c47036bca0f86161b60acd Mon Sep 17 00:00:00 2001 From: Cas Teeuwen Date: Thu, 13 Nov 2025 19:39:17 +0100 Subject: [PATCH 5/5] pre-commit with pyright and ruff --- .pre-commit-config.yaml | 16 ++++++++++++++++ pyproject.toml | 12 +++++++++++- src/spotify_to_tidal/__main__.py | 2 +- src/spotify_to_tidal/sync.py | 7 +++---- tests/conftest.py | 2 +- tests/unit/test_auth.py | 7 +------ tests/unit/test_cache.py | 5 +---- 7 files changed, 34 insertions(+), 17 deletions(-) create mode 100644 .pre-commit-config.yaml diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..a7e6510 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,16 @@ +repos: + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.14.0 + hooks: + - id: ruff-check + name: "Static code analysis (ruff)" + args: [--fix] # auto-fix issues if possible + - id: ruff-check + name: "Check naming conventions (ruff)" + args: [--select, N] + - id: ruff-format + name: "Code formatting (ruff)" + - repo: https://github.com/RobertCraigie/pyright-python + rev: v1.1.406 + hooks: + - id: pyright \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index a1986ef..c7cf640 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,8 @@ dependencies = [ "tqdm~=4.64", "sqlalchemy~=2.0", "pytest~=8.0", - "pytest-mock~=3.8" + "pytest-mock~=3.8", + "pre-commit~=3.7" ] [tools.setuptools.packages."spotify_to_tidal"] @@ -23,3 +24,12 @@ include = "spotify_to_tidal*" [project.scripts] spotify_to_tidal = "spotify_to_tidal.__main__:main" + +[tool.pyright] +include = ["src"] +venvPath = "." +venv = ".venv" +pythonPlatform = "All" +useLibraryCodeForTypes = true +extraPaths = ["src"] +reportPrivateImportUsage = "none" diff --git a/src/spotify_to_tidal/__main__.py b/src/spotify_to_tidal/__main__.py index eaa6877..f33b9ba 100644 --- a/src/spotify_to_tidal/__main__.py +++ b/src/spotify_to_tidal/__main__.py @@ -33,7 +33,7 @@ def main() -> None: sys.exit("Could not connect to Tidal") if args.uri: # if a playlist ID is explicitly provided as a command line argument then use that - spotify_playlist: Mapping[str, Any] = spotify_session.playlist(args.uri) #type: ignore + spotify_playlist: Mapping[str, Any] = spotify_session.playlist(args.uri) # type: ignore tidal_playlists = _sync.get_tidal_playlists_wrapper(tidal_session) tidal_playlist = _sync.pick_tidal_playlist_for_spotify_playlist( spotify_playlist, tidal_playlists diff --git a/src/spotify_to_tidal/sync.py b/src/spotify_to_tidal/sync.py index 42025dc..efddd94 100755 --- a/src/spotify_to_tidal/sync.py +++ b/src/spotify_to_tidal/sync.py @@ -115,7 +115,6 @@ def get_tidal_artists( if tidal.artists is None: return set() - for artist in tidal.artists: assert artist.name is not None if do_normalize: @@ -309,7 +308,7 @@ def _get_tracks_from_spotify_playlist( fields = "next,total,limit,items(track(name,album(name,artists),artists,track_number,duration_ms,id,external_ids(isrc))),type" return spotify_session.playlist_tracks( playlist_id=playlist_id, fields=fields, offset=offset - ) #type: ignore + ) # type: ignore print(f"Loading tracks from Spotify playlist '{spotify_playlist['name']}'") items = await repeat_on_request_error( @@ -330,7 +329,7 @@ def sanity_filter(item: Mapping[str, Any]) -> bool: and len(item["album"]["artists"]) > 0 and item["album"]["artists"][0]["name"] is not None ) - + items = [t_spotify.SpotifyTrack(**item) for item in items] return list(filter(sanity_filter, filter(track_filter, items))) @@ -532,7 +531,7 @@ async def sync_favorites( async def get_tracks_from_spotify_favorites() -> List[t_spotify.SpotifyTrack]: def _get_favorite_tracks(offset: int) -> Mapping[str, Any]: - return spotify_session.current_user_saved_tracks(offset=offset) #type: ignore + return spotify_session.current_user_saved_tracks(offset=offset) # type: ignore tracks = await repeat_on_request_error( _fetch_all_from_spotify_in_chunks, _get_favorite_tracks diff --git a/tests/conftest.py b/tests/conftest.py index 9464070..92fe8fc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,4 +2,4 @@ import os # Add the src directory to the Python path -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../src"))) \ No newline at end of file +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../src"))) diff --git a/tests/unit/test_auth.py b/tests/unit/test_auth.py index c857984..f017046 100644 --- a/tests/unit/test_auth.py +++ b/tests/unit/test_auth.py @@ -1,12 +1,7 @@ # tests/unit/test_auth.py -import pytest import spotipy -import tidalapi -import yaml -import sys -from unittest import mock -from spotify_to_tidal.auth import open_spotify_session, open_tidal_session, SPOTIFY_SCOPES +from spotify_to_tidal.auth import open_spotify_session, SPOTIFY_SCOPES def test_open_spotify_session(mocker): diff --git a/tests/unit/test_cache.py b/tests/unit/test_cache.py index 7aa0e15..5be0ae9 100644 --- a/tests/unit/test_cache.py +++ b/tests/unit/test_cache.py @@ -1,10 +1,7 @@ # tests/unit/test_cache.py import pytest -import datetime -import sqlalchemy from sqlalchemy import create_engine, select -from unittest import mock from spotify_to_tidal.cache import MatchFailureDatabase, TrackMatchCache @@ -77,4 +74,4 @@ def test_track_match_cache_get(): track_cache = TrackMatchCache() track_cache.insert(("spotify_id", 123)) assert track_cache.get("spotify_id") == 123 - assert track_cache.get("nonexistent_id") is None \ No newline at end of file + assert track_cache.get("nonexistent_id") is None