|
| 1 | +import logging |
| 2 | +import uuid |
| 3 | + |
| 4 | +import requests |
| 5 | +from flask import current_app |
| 6 | +from psycopg2.extras import execute_values |
| 7 | +from psycopg2.sql import SQL, Identifier |
| 8 | +from requests.adapters import HTTPAdapter, Retry |
| 9 | +from sqlalchemy import text |
| 10 | + |
| 11 | +from brainzutils import musicbrainz_db |
| 12 | + |
| 13 | +from data.model.external_service import ExternalServiceType |
| 14 | +from listenbrainz.db import external_service_oauth |
| 15 | +from listenbrainz.domain.importer_service import ImporterService |
| 16 | +from listenbrainz.webserver import db_conn, ts_conn |
| 17 | +from listenbrainz.webserver.errors import APINotFound |
| 18 | + |
| 19 | +logger = logging.getLogger(__name__) |
| 20 | + |
| 21 | + |
| 22 | +def bulk_insert_loved_tracks(user_id: int, feedback: list[tuple[int, str]], column: str): |
| 23 | + """ Insert loved tracks imported from an audioscrobbler service into feedback table """ |
| 24 | + # delete existing feedback for given mbids and then import new in same transaction |
| 25 | + delete_query = SQL(""" |
| 26 | + WITH entries(user_id, {column}) AS (VALUES %s) |
| 27 | + DELETE FROM recording_feedback rf |
| 28 | + USING entries e |
| 29 | + WHERE e.user_id = rf.user_id |
| 30 | + AND e.{column}::uuid = rf.{column} |
| 31 | + """).format(column=Identifier(column)) |
| 32 | + insert_query = SQL(""" |
| 33 | + INSERT INTO recording_feedback (user_id, created, {column}, score) |
| 34 | + VALUES %s |
| 35 | + """).format(column=Identifier(column)) |
| 36 | + with db_conn.connection.cursor() as cursor: |
| 37 | + execute_values(cursor, delete_query, [(mbid,) for ts, mbid in feedback], template=f"({user_id}, %s)") |
| 38 | + execute_values(cursor, insert_query, feedback, template=f"({user_id}, to_timestamp(%s), %s, 1)") |
| 39 | + db_conn.connection.commit() |
| 40 | + |
| 41 | + |
| 42 | +def load_recordings_from_tracks(track_mbids: list) -> dict[str, str]: |
| 43 | + """ Fetch recording mbids corresponding to track mbids. Audioscrobbler services use track mbids |
| 44 | + in loved tracks endpoint but we use recording mbids in feedback table so need convert between the two. """ |
| 45 | + if not track_mbids: |
| 46 | + return {} |
| 47 | + query = """ |
| 48 | + SELECT track.gid::text AS track_mbid |
| 49 | + , recording.gid::text AS recording_mbid |
| 50 | + FROM track |
| 51 | + JOIN recording |
| 52 | + ON track.recording = recording.id |
| 53 | + WHERE track.gid IN :tracks |
| 54 | + """ |
| 55 | + with musicbrainz_db.engine.connect() as connection: |
| 56 | + result = connection.execute(text(query), {"tracks": tuple(track_mbids)}) |
| 57 | + return {row["track_mbid"]: row["recording_mbid"] for row in result.mappings()} |
| 58 | + |
| 59 | + |
| 60 | +def bulk_get_msids(connection, items): |
| 61 | + """ Fetch msids for all the specified items (recording, artist_credit) in batches. """ |
| 62 | + query = """ |
| 63 | + SELECT DISTINCT ON (key) |
| 64 | + lower(s.recording) || '-' || lower(s.artist_credit) AS key |
| 65 | + , s.gid::text AS recording_msid |
| 66 | + FROM messybrainz.submissions s |
| 67 | + WHERE EXISTS( |
| 68 | + SELECT 1 |
| 69 | + FROM (VALUES %s) AS t(track_name, artist_name) |
| 70 | + WHERE lower(s.recording) = lower(t.track_name) |
| 71 | + AND lower(s.artist_credit) = lower(t.artist_name) |
| 72 | + ) |
| 73 | + ORDER BY key, s.submitted, recording_msid |
| 74 | + """ |
| 75 | + curs = connection.connection.cursor() |
| 76 | + result = execute_values(curs, query, [(x["track_name"], x["artist_name"]) for x in items], fetch=True) |
| 77 | + return {r[0]: r[1] for r in result} |
| 78 | + |
| 79 | + |
| 80 | +class AudioscrobblerService(ImporterService): |
| 81 | + """ Base class for audioscrobbler-compatible services (Last.fm, Libre.fm) that support |
| 82 | + importing loved tracks as feedback into ListenBrainz. """ |
| 83 | + |
| 84 | + def __init__(self, service: ExternalServiceType, api_url: str, api_key: str): |
| 85 | + super().__init__(service) |
| 86 | + self.api_url = api_url |
| 87 | + self.api_key = api_key |
| 88 | + |
| 89 | + def add_new_user(self, user_id: int, token: dict) -> bool: |
| 90 | + external_service_oauth.save_token( |
| 91 | + db_conn, user_id=user_id, service=self.service, access_token=None, refresh_token=None, |
| 92 | + token_expires_ts=None, record_listens=True, scopes=[], external_user_id=token["external_user_id"], |
| 93 | + latest_listened_at=token["latest_listened_at"] |
| 94 | + ) |
| 95 | + return True |
| 96 | + |
| 97 | + def fetch_feedback(self, username: str): |
| 98 | + """ Retrieve the loved tracks of a user from the audioscrobbler-compatible API. """ |
| 99 | + session = requests.Session() |
| 100 | + session.mount("https://", HTTPAdapter(max_retries=Retry(total=3, backoff_factor=1, allowed_methods=["GET"]))) |
| 101 | + |
| 102 | + params = { |
| 103 | + "method": "user.getlovedtracks", |
| 104 | + "user": username, |
| 105 | + "format": "json", |
| 106 | + "api_key": self.api_key, |
| 107 | + "limit": 100 |
| 108 | + } |
| 109 | + response = session.get(self.api_url, params=params) |
| 110 | + if response.status_code == 404: |
| 111 | + raise APINotFound(f"{self.service.value.capitalize()} user with username '{username}' not found") |
| 112 | + response.raise_for_status() |
| 113 | + |
| 114 | + response_object = response.json()["lovedtracks"] |
| 115 | + |
| 116 | + if "@attr" in response_object: |
| 117 | + total_pages = int(response_object["@attr"]["totalPages"]) |
| 118 | + total_count = int(response_object["@attr"]["total"]) |
| 119 | + else: |
| 120 | + total_pages = int(response_object["totalPages"]) |
| 121 | + total_count = int(response_object["total"]) |
| 122 | + |
| 123 | + if total_count == 0: |
| 124 | + return [], 0 |
| 125 | + |
| 126 | + items = [] |
| 127 | + responses = [response] |
| 128 | + |
| 129 | + for page in range(2, total_pages + 1): |
| 130 | + params["page"] = page |
| 131 | + resp = session.get(self.api_url, params=params) |
| 132 | + if resp.status_code != 200: |
| 133 | + current_app.logger.error("Unable to import page %d for user %s: %s", page, username, resp.text) |
| 134 | + continue |
| 135 | + responses.append(resp) |
| 136 | + |
| 137 | + for resp in responses: |
| 138 | + tracks = resp.json()["lovedtracks"]["track"] |
| 139 | + if isinstance(tracks, dict): |
| 140 | + tracks = [tracks] |
| 141 | + for track in tracks: |
| 142 | + item: dict = { |
| 143 | + "timestamp": int(track["date"]["uts"]), |
| 144 | + "track_name": track["name"], |
| 145 | + "artist_name": track["artist"]["name"] |
| 146 | + } |
| 147 | + |
| 148 | + try: |
| 149 | + uuid.UUID(track["mbid"]) |
| 150 | + item["mbid"] = track["mbid"] |
| 151 | + except (ValueError, TypeError): |
| 152 | + item["mbid"] = None |
| 153 | + |
| 154 | + items.append(item) |
| 155 | + |
| 156 | + return items, total_count |
| 157 | + |
| 158 | + def import_feedback(self, user_id: int, username: str): |
| 159 | + """ Import a user's loved tracks from an audioscrobbler-compatible service into the LB feedback table. |
| 160 | +
|
| 161 | + This method retrieves the entire list of loved tracks for a user, converts track mbids |
| 162 | + to recording mbids, looks up msids for tracks without mbids, and inserts loved feedback. |
| 163 | +
|
| 164 | + Args: |
| 165 | + user_id: the listenbrainz user id of the user |
| 166 | + username: the username on the external service |
| 167 | +
|
| 168 | + Returns a dict having various counts associated with the import. |
| 169 | + """ |
| 170 | + items, total_count = self.fetch_feedback(username) |
| 171 | + |
| 172 | + all_mbids = [x["mbid"] for x in items if x["mbid"]] |
| 173 | + recordings_from_tracks = load_recordings_from_tracks(all_mbids) |
| 174 | + |
| 175 | + items_with_mbids, items_without_mbids = [], [] |
| 176 | + for item in items: |
| 177 | + if item["mbid"]: |
| 178 | + if item["mbid"] in recordings_from_tracks: |
| 179 | + item["mbid"] = recordings_from_tracks[item["mbid"]] |
| 180 | + items_with_mbids.append(item) |
| 181 | + else: |
| 182 | + items_without_mbids.append(item) |
| 183 | + |
| 184 | + mbid_feedback = [(x["timestamp"], x["mbid"]) for x in items_with_mbids] |
| 185 | + |
| 186 | + msids_map = bulk_get_msids(ts_conn, items_without_mbids) |
| 187 | + for item in items_without_mbids: |
| 188 | + key = f"{item['track_name'].lower()}-{item['artist_name'].lower()}" |
| 189 | + item["msid"] = msids_map.get(key) |
| 190 | + msid_feedback = [(x["timestamp"], x["msid"]) for x in items_without_mbids if x["msid"]] |
| 191 | + |
| 192 | + bulk_insert_loved_tracks(user_id, mbid_feedback, "recording_mbid") |
| 193 | + bulk_insert_loved_tracks(user_id, msid_feedback, "recording_msid") |
| 194 | + |
| 195 | + return { |
| 196 | + "total": total_count, |
| 197 | + "imported": len(mbid_feedback) + len(msid_feedback), |
| 198 | + } |
0 commit comments