Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.14.0
hooks:
- id: ruff-check
name: "Static code analysis (ruff)"
args: [--fix] # auto-fix issues if possible
- id: ruff-check
name: "Check naming conventions (ruff)"
args: [--select, N]
- id: ruff-format
name: "Code formatting (ruff)"
- repo: https://github.com/RobertCraigie/pyright-python
rev: v1.1.406
hooks:
- id: pyright
12 changes: 11 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ dependencies = [
"tqdm~=4.64",
"sqlalchemy~=2.0",
"pytest~=8.0",
"pytest-mock~=3.8"
"pytest-mock~=3.8",
"pre-commit~=3.7"
]

[tools.setuptools.packages."spotify_to_tidal"]
Expand All @@ -23,3 +24,12 @@ include = "spotify_to_tidal*"

[project.scripts]
spotify_to_tidal = "spotify_to_tidal.__main__:main"

[tool.pyright]
include = ["src"]
venvPath = "."
venv = ".venv"
pythonPlatform = "All"
useLibraryCodeForTypes = true
extraPaths = ["src"]
reportPrivateImportUsage = "none"
70 changes: 51 additions & 19 deletions src/spotify_to_tidal/__main__.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,78 @@
import yaml
import argparse
import sys
from typing import Any, Mapping

import yaml

from . import sync as _sync
from . import auth as _auth
from . import sync as _sync


def main():
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument('--config', default='config.yml', help='location of the config file')
parser.add_argument('--uri', help='synchronize a specific URI instead of the one in the config')
parser.add_argument('--sync-favorites', action=argparse.BooleanOptionalAction, help='synchronize the favorites')
parser.add_argument(
"--config", default="config.yml", help="location of the config file"
)
parser.add_argument(
"--uri", help="synchronize a specific URI instead of the one in the config"
)
parser.add_argument(
"--sync-favorites",
action=argparse.BooleanOptionalAction,
help="synchronize the favorites",
)
args = parser.parse_args()

with open(args.config, 'r') as f:
with open(args.config, "r") as f:
config = yaml.safe_load(f)
print("Opening Spotify session")
spotify_session = _auth.open_spotify_session(config['spotify'])
spotify_session = _auth.open_spotify_session(config["spotify"])
print("Opening Tidal session")
tidal_session = _auth.open_tidal_session()
if not tidal_session.check_login():
sys.exit("Could not connect to Tidal")
if args.uri:
# if a playlist ID is explicitly provided as a command line argument then use that
spotify_playlist = spotify_session.playlist(args.uri)
spotify_playlist: Mapping[str, Any] = spotify_session.playlist(args.uri) # type: ignore
tidal_playlists = _sync.get_tidal_playlists_wrapper(tidal_session)
tidal_playlist = _sync.pick_tidal_playlist_for_spotify_playlist(spotify_playlist, tidal_playlists)
_sync.sync_playlists_wrapper(spotify_session, tidal_session, [tidal_playlist], config)
sync_favorites = args.sync_favorites # only sync favorites if command line argument explicitly passed
tidal_playlist = _sync.pick_tidal_playlist_for_spotify_playlist(
spotify_playlist, tidal_playlists
)
_sync.sync_playlists_wrapper(
spotify_session, tidal_session, [tidal_playlist], config
)
sync_favorites = (
args.sync_favorites
) # only sync favorites if command line argument explicitly passed
elif args.sync_favorites:
sync_favorites = True # sync only the favorites
elif config.get('sync_playlists', None):
sync_favorites = True # sync only the favorites
elif config.get("sync_playlists", None):
# if the config contains a sync_playlists list of mappings then use that
_sync.sync_playlists_wrapper(spotify_session, tidal_session, _sync.get_playlists_from_config(spotify_session, tidal_session, config), config)
sync_favorites = args.sync_favorites is None and config.get('sync_favorites_default', True)
_sync.sync_playlists_wrapper(
spotify_session,
tidal_session,
_sync.get_playlists_from_config(spotify_session, tidal_session, config),
config,
)
sync_favorites = args.sync_favorites is None and config.get(
"sync_favorites_default", True
)
else:
# otherwise sync all the user playlists in the Spotify account and favorites unless explicitly disabled
_sync.sync_playlists_wrapper(spotify_session, tidal_session, _sync.get_user_playlist_mappings(spotify_session, tidal_session, config), config)
sync_favorites = args.sync_favorites is None and config.get('sync_favorites_default', True)
_sync.sync_playlists_wrapper(
spotify_session,
tidal_session,
_sync.get_user_playlist_mappings(spotify_session, tidal_session, config),
config,
)
sync_favorites = args.sync_favorites is None and config.get(
"sync_favorites_default", True
)

if sync_favorites:
_sync.sync_favorites_wrapper(spotify_session, tidal_session, config)

if __name__ == '__main__':

if __name__ == "__main__":
main()
sys.exit(0)
71 changes: 41 additions & 30 deletions src/spotify_to_tidal/auth.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,42 @@
#!/usr/bin/env python3

import sys
import webbrowser

import spotipy
import tidalapi
import webbrowser
import yaml

__all__ = [
'open_spotify_session',
'open_tidal_session'
]
__all__ = ["open_spotify_session", "open_tidal_session"]

SPOTIFY_SCOPES = "playlist-read-private, user-library-read"

SPOTIFY_SCOPES = 'playlist-read-private, user-library-read'

def open_spotify_session(config) -> spotipy.Spotify:
credentials_manager = spotipy.SpotifyOAuth(username=config['username'],
scope=SPOTIFY_SCOPES,
client_id=config['client_id'],
client_secret=config['client_secret'],
redirect_uri=config['redirect_uri'],
requests_timeout=2,
open_browser=config.get('open_browser', True))
credentials_manager = spotipy.SpotifyOAuth(
username=config["username"],
scope=SPOTIFY_SCOPES,
client_id=config["client_id"],
client_secret=config["client_secret"],
redirect_uri=config["redirect_uri"],
requests_timeout=2,
open_browser=config.get("open_browser", True),
)
try:
credentials_manager.get_access_token(as_dict=False)
except spotipy.SpotifyOauthError:
sys.exit("Error opening Spotify sesion; could not get token for username: ".format(config['username']))
sys.exit(
"Error opening Spotify sesion; could not get token for username: {}".format(
config["username"]
)
)

return spotipy.Spotify(oauth_manager=credentials_manager)

def open_tidal_session(config = None) -> tidalapi.Session:

def open_tidal_session(config=None) -> tidalapi.Session:
try:
with open('.session.yml', 'r') as session_file:
with open(".session.yml", "r") as session_file:
previous_session = yaml.safe_load(session_file)
except OSError:
previous_session = None
Expand All @@ -41,25 +47,30 @@ def open_tidal_session(config = None) -> tidalapi.Session:
session = tidalapi.Session()
if previous_session:
try:
if session.load_oauth_session(token_type= previous_session['token_type'],
access_token=previous_session['access_token'],
refresh_token=previous_session['refresh_token'] ):
if session.load_oauth_session(
token_type=previous_session["token_type"],
access_token=previous_session["access_token"],
refresh_token=previous_session["refresh_token"],
):
return session
except Exception as e:
print("Error loading previous Tidal Session: \n" + str(e) )
print("Error loading previous Tidal Session: \n" + str(e))

login, future = session.login_oauth()
print('Login with the webbrowser: ' + login.verification_uri_complete)
print("Login with the webbrowser: " + login.verification_uri_complete)
url = login.verification_uri_complete
if not url.startswith('https://'):
url = 'https://' + url
if not url.startswith("https://"):
url = "https://" + url
webbrowser.open(url)
future.result()
with open('.session.yml', 'w') as f:
yaml.dump( {'session_id': session.session_id,
'token_type': session.token_type,
'access_token': session.access_token,
'refresh_token': session.refresh_token}, f )
with open(".session.yml", "w") as f:
yaml.dump(
{
"session_id": session.session_id,
"token_type": session.token_type,
"access_token": session.access_token,
"refresh_token": session.refresh_token,
},
f,
)
return session


79 changes: 53 additions & 26 deletions src/spotify_to_tidal/cache.py
Original file line number Diff line number Diff line change
@@ -1,65 +1,91 @@
import datetime
from typing import Dict

import sqlalchemy
from sqlalchemy import Table, Column, String, DateTime, MetaData, insert, select, update, delete
from typing import Dict, List, Sequence, Set, Mapping
from sqlalchemy import (
Column,
DateTime,
MetaData,
String,
Table,
delete,
insert,
select,
update,
)


class MatchFailureDatabase:
"""
"""
sqlite database of match failures which persists between runs
this can be used concurrently between multiple processes
"""

def __init__(self, filename='.cache.db'):
def __init__(self, filename: str = ".cache.db") -> None:
self.engine = sqlalchemy.create_engine(f"sqlite:///{filename}")
meta = MetaData()
self.match_failures = Table('match_failures', meta,
Column('track_id', String,
primary_key=True),
Column('insert_time', DateTime),
Column('next_retry', DateTime),
sqlite_autoincrement=False)
self.match_failures = Table(
"match_failures",
meta,
Column("track_id", String, primary_key=True),
Column("insert_time", DateTime),
Column("next_retry", DateTime),
sqlite_autoincrement=False,
)
meta.create_all(self.engine)

def _get_next_retry_time(self, insert_time: datetime.datetime | None = None) -> datetime.datetime:
def _get_next_retry_time(
self, insert_time: datetime.datetime | None = None
) -> datetime.datetime:
if insert_time:
# double interval on each retry
interval = 2 * (datetime.datetime.now() - insert_time)
else:
interval = datetime.timedelta(days=7)
return datetime.datetime.now() + interval

def cache_match_failure(self, track_id: str):
""" notifies that matching failed for the given track_id """
def cache_match_failure(self, track_id: str) -> None:
"""notifies that matching failed for the given track_id"""
fetch_statement = select(self.match_failures).where(
self.match_failures.c.track_id == track_id)
self.match_failures.c.track_id == track_id
)
with self.engine.connect() as connection:
with connection.begin():
# Either update the next_retry time if track_id already exists, otherwise create a new entry
existing_failure = connection.execute(
fetch_statement).fetchone()
existing_failure = connection.execute(fetch_statement).fetchone()
if existing_failure:
update_statement = update(self.match_failures).where(
self.match_failures.c.track_id == track_id).values(next_retry=self._get_next_retry_time())
update_statement = (
update(self.match_failures)
.where(self.match_failures.c.track_id == track_id)
.values(next_retry=self._get_next_retry_time())
)
connection.execute(update_statement)
else:
connection.execute(insert(self.match_failures), {
"track_id": track_id, "insert_time": datetime.datetime.now(), "next_retry": self._get_next_retry_time()})
connection.execute(
insert(self.match_failures),
{
"track_id": track_id,
"insert_time": datetime.datetime.now(),
"next_retry": self._get_next_retry_time(),
},
)

def has_match_failure(self, track_id: str) -> bool:
""" checks if there was a recent search for which matching failed with the given track_id """
"""checks if there was a recent search for which matching failed with the given track_id"""
statement = select(self.match_failures.c.next_retry).where(
self.match_failures.c.track_id == track_id)
self.match_failures.c.track_id == track_id
)
with self.engine.connect() as connection:
match_failure = connection.execute(statement).fetchone()
if match_failure:
return match_failure.next_retry > datetime.datetime.now()
return False

def remove_match_failure(self, track_id: str):
""" removes match failure from the database """
def remove_match_failure(self, track_id: str) -> None:
"""removes match failure from the database"""
statement = delete(self.match_failures).where(
self.match_failures.c.track_id == track_id)
self.match_failures.c.track_id == track_id
)
with self.engine.connect() as connection:
with connection.begin():
connection.execute(statement)
Expand All @@ -70,12 +96,13 @@ class TrackMatchCache:
Non-persistent mapping of spotify ids -> tidal_ids
This should NOT be accessed concurrently from multiple processes
"""

data: Dict[str, int] = {}

def get(self, track_id: str) -> int | None:
return self.data.get(track_id, None)

def insert(self, mapping: tuple[str, int]):
def insert(self, mapping: tuple[str, int]) -> None:
self.data[mapping[0]] = mapping[1]


Expand Down
Loading