18
18
19
19
import collections
20
20
import time
21
+ from typing import TYPE_CHECKING , Literal , Sequence
21
22
22
23
import requests
23
24
import unidecode
24
25
25
26
from beets import ui
26
27
from beets .autotag import AlbumInfo , TrackInfo
27
28
from beets .dbcore import types
28
- from beets .plugins import BeetsPlugin , MetadataSourcePlugin
29
+ from beets .plugins import BeetsPlugin , MetadataSourcePlugin , Response
29
30
31
+ if TYPE_CHECKING :
32
+ from beets .library import Item , Library
33
+ from beetsplug ._typing import JSONDict
30
34
31
- class DeezerPlugin (MetadataSourcePlugin , BeetsPlugin ):
35
+
36
+ class DeezerPlugin (MetadataSourcePlugin [Response ], BeetsPlugin ):
32
37
data_source = "Deezer"
33
38
34
39
item_types = {
35
40
"deezer_track_rank" : types .INTEGER ,
36
41
"deezer_track_id" : types .INTEGER ,
37
42
"deezer_updated" : types .DATE ,
38
43
}
39
-
40
44
# Base URLs for the Deezer API
41
45
# Documentation: https://developers.deezer.com/api/
42
46
search_url = "https://api.deezer.com/search/"
43
47
album_url = "https://api.deezer.com/album/"
44
48
track_url = "https://api.deezer.com/track/"
45
49
46
- def __init__ (self ):
47
- super ().__init__ ()
48
-
49
50
def commands (self ):
50
51
"""Add beet UI commands to interact with Deezer."""
51
52
deezer_update_cmd = ui .Subcommand (
52
53
"deezerupdate" , help = f"Update { self .data_source } rank"
53
54
)
54
55
55
- def func (lib , opts , args ):
56
+ def func (lib : Library , opts , args ):
56
57
items = lib .items (ui .decargs (args ))
57
- self .deezerupdate (items , ui .should_write ())
58
+ self .deezerupdate (list ( items ) , ui .should_write ())
58
59
59
60
deezer_update_cmd .func = func
60
61
61
62
return [deezer_update_cmd ]
62
63
63
- def fetch_data (self , url ):
64
- try :
65
- response = requests .get (url , timeout = 10 )
66
- response .raise_for_status ()
67
- data = response .json ()
68
- except requests .exceptions .RequestException as e :
69
- self ._log .error ("Error fetching data from {}\n Error: {}" , url , e )
70
- return None
71
- if "error" in data :
72
- self ._log .debug ("Deezer API error: {}" , data ["error" ]["message" ])
73
- return None
74
- return data
75
-
76
64
def album_for_id (self , album_id : str ) -> AlbumInfo | None :
77
65
"""Fetch an album by its Deezer ID or URL."""
78
66
if not (deezer_id := self ._get_id (album_id )):
@@ -156,52 +144,18 @@ def album_for_id(self, album_id: str) -> AlbumInfo | None:
156
144
cover_art_url = album_data .get ("cover_xl" ),
157
145
)
158
146
159
- def _get_track (self , track_data ) :
160
- """Convert a Deezer track object dict to a TrackInfo object .
147
+ def track_for_id (self , track_id : str ) -> None | TrackInfo :
148
+ """Fetch a track by its Deezer ID or URL .
161
149
162
- :param track_data: Deezer Track object dict
163
- :type track_data: dict
164
- :return: TrackInfo object for track
165
- :rtype: beets.autotag.hooks.TrackInfo
150
+ Returns a TrackInfo object or None if the track is not found.
166
151
"""
167
- artist , artist_id = self .get_artist (
168
- track_data .get ("contributors" , [track_data ["artist" ]])
169
- )
170
- return TrackInfo (
171
- title = track_data ["title" ],
172
- track_id = track_data ["id" ],
173
- deezer_track_id = track_data ["id" ],
174
- isrc = track_data .get ("isrc" ),
175
- artist = artist ,
176
- artist_id = artist_id ,
177
- length = track_data ["duration" ],
178
- index = track_data .get ("track_position" ),
179
- medium = track_data .get ("disk_number" ),
180
- deezer_track_rank = track_data .get ("rank" ),
181
- medium_index = track_data .get ("track_position" ),
182
- data_source = self .data_source ,
183
- data_url = track_data ["link" ],
184
- deezer_updated = time .time (),
185
- )
186
-
187
- def track_for_id (self , track_id = None , track_data = None ):
188
- """Fetch a track by its Deezer ID or URL and return a
189
- TrackInfo object or None if the track is not found.
152
+ if not (deezer_id := self ._get_id (track_id )):
153
+ self ._log .debug ("Invalid Deezer track_id: {}" , track_id )
154
+ return None
190
155
191
- :param track_id: (Optional) Deezer ID or URL for the track. Either
192
- ``track_id`` or ``track_data`` must be provided.
193
- :type track_id: str
194
- :param track_data: (Optional) Simplified track object dict. May be
195
- provided instead of ``track_id`` to avoid unnecessary API calls.
196
- :type track_data: dict
197
- :return: TrackInfo object for track
198
- :rtype: beets.autotag.hooks.TrackInfo or None
199
- """
200
- if track_data is None :
201
- if not (deezer_id := self ._get_id (track_id )) or not (
202
- track_data := self .fetch_data (f"{ self .track_url } { deezer_id } " )
203
- ):
204
- return None
156
+ if not (track_data := self .fetch_data (f"{ self .track_url } { deezer_id } " )):
157
+ self ._log .debug ("Track not found: {}" , track_id )
158
+ return None
205
159
206
160
track = self ._get_track (track_data )
207
161
@@ -229,18 +183,43 @@ def track_for_id(self, track_id=None, track_data=None):
229
183
track .medium_total = medium_total
230
184
return track
231
185
186
+ def _get_track (self , track_data : JSONDict ) -> TrackInfo :
187
+ """Convert a Deezer track object dict to a TrackInfo object.
188
+
189
+ :param track_data: Deezer Track object dict
190
+ :return: TrackInfo object for track
191
+ """
192
+ artist , artist_id = self .get_artist (
193
+ track_data .get ("contributors" , [track_data ["artist" ]])
194
+ )
195
+ return TrackInfo (
196
+ title = track_data ["title" ],
197
+ track_id = track_data ["id" ],
198
+ deezer_track_id = track_data ["id" ],
199
+ isrc = track_data .get ("isrc" ),
200
+ artist = artist ,
201
+ artist_id = artist_id ,
202
+ length = track_data ["duration" ],
203
+ index = track_data .get ("track_position" ),
204
+ medium = track_data .get ("disk_number" ),
205
+ deezer_track_rank = track_data .get ("rank" ),
206
+ medium_index = track_data .get ("track_position" ),
207
+ data_source = self .data_source ,
208
+ data_url = track_data ["link" ],
209
+ deezer_updated = time .time (),
210
+ )
211
+
232
212
@staticmethod
233
- def _construct_search_query (filters = None , keywords = "" ):
213
+ def _construct_search_query (
214
+ filters : dict [str , str ], keywords : str = ""
215
+ ) -> str :
234
216
"""Construct a query string with the specified filters and keywords to
235
217
be provided to the Deezer Search API
236
218
(https://developers.deezer.com/api/search).
237
219
238
- :param filters: (Optional) Field filters to apply.
239
- :type filters: dict
220
+ :param filters: Field filters to apply.
240
221
:param keywords: (Optional) Query keywords to use.
241
- :type keywords: str
242
222
:return: Query string to be provided to the Search API.
243
- :rtype: str
244
223
"""
245
224
query_components = [
246
225
keywords ,
@@ -251,25 +230,30 @@ def _construct_search_query(filters=None, keywords=""):
251
230
query = query .decode ("utf8" )
252
231
return unidecode .unidecode (query )
253
232
254
- def _search_api (self , query_type , filters = None , keywords = "" ):
233
+ def _search_api (
234
+ self ,
235
+ query_type : Literal [
236
+ "album" ,
237
+ "track" ,
238
+ "artist" ,
239
+ "history" ,
240
+ "playlist" ,
241
+ "podcast" ,
242
+ "radio" ,
243
+ "user" ,
244
+ ],
245
+ filters : dict [str , str ],
246
+ keywords = "" ,
247
+ ) -> Sequence [Response ]:
255
248
"""Query the Deezer Search API for the specified ``keywords``, applying
256
249
the provided ``filters``.
257
250
258
- :param query_type: The Deezer Search API method to use. Valid types
259
- are: 'album', 'artist', 'history', 'playlist', 'podcast',
260
- 'radio', 'track', 'user', and 'track'.
261
- :type query_type: str
262
- :param filters: (Optional) Field filters to apply.
263
- :type filters: dict
251
+ :param query_type: The Deezer Search API method to use.
264
252
:param keywords: (Optional) Query keywords to use.
265
- :type keywords: str
266
253
:return: JSON data for the class:`Response <Response>` object or None
267
254
if no search results are returned.
268
- :rtype: dict or None
269
255
"""
270
256
query = self ._construct_search_query (keywords = keywords , filters = filters )
271
- if not query :
272
- return None
273
257
self ._log .debug (f"Searching { self .data_source } for '{ query } '" )
274
258
try :
275
259
response = requests .get (
@@ -284,7 +268,7 @@ def _search_api(self, query_type, filters=None, keywords=""):
284
268
self .data_source ,
285
269
e ,
286
270
)
287
- return None
271
+ return ()
288
272
response_data = response .json ().get ("data" , [])
289
273
self ._log .debug (
290
274
"Found {} result(s) from {} for '{}'" ,
@@ -294,7 +278,7 @@ def _search_api(self, query_type, filters=None, keywords=""):
294
278
)
295
279
return response_data
296
280
297
- def deezerupdate (self , items , write ):
281
+ def deezerupdate (self , items : Sequence [ Item ] , write : bool ):
298
282
"""Obtain rank information from Deezer."""
299
283
for index , item in enumerate (items , start = 1 ):
300
284
self ._log .info (
@@ -320,3 +304,16 @@ def deezerupdate(self, items, write):
320
304
item .deezer_updated = time .time ()
321
305
if write :
322
306
item .try_write ()
307
+
308
+ def fetch_data (self , url : str ):
309
+ try :
310
+ response = requests .get (url , timeout = 10 )
311
+ response .raise_for_status ()
312
+ data = response .json ()
313
+ except requests .exceptions .RequestException as e :
314
+ self ._log .error ("Error fetching data from {}\n Error: {}" , url , e )
315
+ return None
316
+ if "error" in data :
317
+ self ._log .debug ("Deezer API error: {}" , data ["error" ]["message" ])
318
+ return None
319
+ return data
0 commit comments