diff --git a/resources/lib/kodiUtilities.py b/resources/lib/kodiUtilities.py index 6f55806b..5ee276e9 100644 --- a/resources/lib/kodiUtilities.py +++ b/resources/lib/kodiUtilities.py @@ -7,6 +7,7 @@ import json import re import logging +from typing import Tuple, List, Dict, Union, Optional from resources.lib import utilities @@ -19,46 +20,46 @@ def notification( - header: str, message: str, time=5000, icon=__addon__.getAddonInfo("icon") -): + header: str, message: str, time: int = 5000, icon: str = __addon__.getAddonInfo("icon") +) -> None: xbmcgui.Dialog().notification(header, message, icon, time) -def showSettings(): +def showSettings() -> None: __addon__.openSettings() -def getSetting(setting): +def getSetting(setting: str) -> str: return __addon__.getSetting(setting).strip() -def setSetting(setting, value): +def setSetting(setting: str, value: Union[str, int, float, bool]) -> None: __addon__.setSetting(setting, str(value)) -def getSettingAsBool(setting): +def getSettingAsBool(setting: str) -> bool: return getSetting(setting).lower() == "true" -def getSettingAsFloat(setting): +def getSettingAsFloat(setting: str) -> float: try: return float(getSetting(setting)) except ValueError: return 0 -def getSettingAsInt(setting): +def getSettingAsInt(setting: str) -> int: try: return int(getSettingAsFloat(setting)) except ValueError: return 0 -def getString(string_id): +def getString(string_id: int) -> str: return __addon__.getLocalizedString(string_id) -def kodiJsonRequest(params): +def kodiJsonRequest(params: Dict) -> Optional[Union[Dict, List]]: data = json.dumps(params) request = xbmc.executeJSONRPC(data) @@ -131,7 +132,7 @@ def checkExclusion(fullpath: str) -> bool: return found -def kodiRpcToTraktMediaObject(type, data, mode="collected"): +def kodiRpcToTraktMediaObject(type: str, data: Dict, mode: str = "collected") -> Optional[Dict]: if type == "show": if "uniqueid" in data: data["ids"] = data.pop("uniqueid") @@ -230,7 +231,7 @@ def kodiRpcToTraktMediaObject(type, data, mode="collected"): return -def kodiRpcToTraktMediaObjects(data, mode="collected"): +def kodiRpcToTraktMediaObjects(data: Dict, mode: str = "collected") -> Optional[List]: if "tvshows" in data: shows = data["tvshows"] @@ -270,7 +271,7 @@ def kodiRpcToTraktMediaObjects(data, mode="collected"): return -def getShowDetailsFromKodi(showID, fields): +def getShowDetailsFromKodi(showID: int, fields: List) -> Optional[Dict]: result = kodiJsonRequest( { "jsonrpc": "2.0", @@ -292,7 +293,7 @@ def getShowDetailsFromKodi(showID, fields): return None -def getSeasonDetailsFromKodi(seasonID, fields): +def getSeasonDetailsFromKodi(seasonID: int, fields: List) -> Optional[Dict]: result = kodiJsonRequest( { "jsonrpc": "2.0", @@ -317,7 +318,7 @@ def getSeasonDetailsFromKodi(seasonID, fields): # get a single episode from kodi given the id -def getEpisodeDetailsFromKodi(libraryId, fields): +def getEpisodeDetailsFromKodi(libraryId: int, fields: List) -> Optional[Dict]: result = kodiJsonRequest( { "jsonrpc": "2.0", @@ -358,7 +359,7 @@ def getEpisodeDetailsFromKodi(libraryId, fields): # get a single movie from kodi given the id -def getMovieDetailsFromKodi(libraryId, fields): +def getMovieDetailsFromKodi(libraryId: int, fields: List) -> Optional[Dict]: result = kodiJsonRequest( { "jsonrpc": "2.0", @@ -380,7 +381,7 @@ def getMovieDetailsFromKodi(libraryId, fields): return None -def checkAndConfigureProxy(): +def checkAndConfigureProxy() -> Optional[str]: proxyActive = kodiJsonRequest( { "jsonrpc": "2.0", @@ -480,7 +481,7 @@ def checkAndConfigureProxy(): return None -def getMediaType(): +def getMediaType() -> Optional[str]: listType = xbmc.getInfoLabel("ListItem.DBTYPE") xbmc.log("list item type: %s" % listType, xbmc.LOGINFO) @@ -497,7 +498,7 @@ def getMediaType(): return None -def getInfoLabelDetails(result): +def getInfoLabelDetails(result: Dict) -> Tuple[str, Dict]: type = result["item"]["type"] data = {"action": "started"} # check type of item diff --git a/resources/lib/service.py b/resources/lib/service.py index 952b9e22..6be72c57 100644 --- a/resources/lib/service.py +++ b/resources/lib/service.py @@ -433,7 +433,7 @@ def __init__(self, *args, **kwargs): def onNotification(self, sender, method, data): # method looks like Other.NEXTUPWATCHEDSIGNAL - if method.split(".")[1].upper() != "NEXTUPWATCHEDSIGNAL": + if "." not in method or method.split(".")[1].upper() != "NEXTUPWATCHEDSIGNAL": return logger.debug("Callback received - Upnext skipped to the next episode") diff --git a/resources/lib/sqlitequeue.py b/resources/lib/sqlitequeue.py index 41503d60..a45da780 100644 --- a/resources/lib/sqlitequeue.py +++ b/resources/lib/sqlitequeue.py @@ -14,6 +14,7 @@ import xbmcvfs import xbmcaddon import logging +from typing import Any, Iterator, Optional logger = logging.getLogger(__name__) @@ -59,27 +60,27 @@ def __len__(self) -> int: executed = conn.execute(self._count).fetchone()[0] return executed - def __iter__(self): + def __iter__(self) -> Iterator[Any]: with self._get_conn() as conn: for _, obj_buffer in conn.execute(self._iterate): - yield loads(str(obj_buffer)) + yield loads(obj_buffer) - def _get_conn(self): + def _get_conn(self) -> sqlite3.Connection: id = get_ident() if id not in self._connection_cache: self._connection_cache[id] = sqlite3.Connection(self.path, timeout=60) return self._connection_cache[id] - def purge(self): + def purge(self) -> None: with self._get_conn() as conn: conn.execute(self._purge) - def append(self, obj): + def append(self, obj: Any) -> None: obj_buffer = dumps(obj) with self._get_conn() as conn: conn.execute(self._append, (obj_buffer,)) - def get(self, sleep_wait=True): + def get(self, sleep_wait: bool = True) -> Optional[Any]: keep_pooling = True wait = 0.1 max_wait = 2 @@ -89,10 +90,11 @@ def get(self, sleep_wait=True): while keep_pooling: conn.execute(self._write_lock) cursor = conn.execute(self._get) - try: - id, obj_buffer = cursor.fetchone() + row = cursor.fetchone() + if row: + id, obj_buffer = row keep_pooling = False - except StopIteration: + else: conn.commit() # unlock the database if not sleep_wait: keep_pooling = False @@ -102,13 +104,13 @@ def get(self, sleep_wait=True): wait = min(max_wait, tries / 10 + wait) if id: conn.execute(self._del, (id,)) - return loads(str(obj_buffer)) + return loads(obj_buffer) return None - def peek(self): + def peek(self) -> Optional[Any]: with self._get_conn() as conn: cursor = conn.execute(self._peek) - try: - return loads(str(cursor[0])) - except StopIteration: - return None + row = cursor.fetchone() + if row: + return loads(row[0]) + return None diff --git a/resources/lib/utilities.py b/resources/lib/utilities.py index f89ee570..954642da 100644 --- a/resources/lib/utilities.py +++ b/resources/lib/utilities.py @@ -6,7 +6,7 @@ import re import logging import traceback -from typing import Tuple +from typing import Tuple, List, Dict, Union, Optional import dateutil.parser from datetime import datetime from dateutil.tz import tzutc, tzlocal @@ -18,31 +18,31 @@ logger = logging.getLogger(__name__) -def isMovie(type): +def isMovie(type: str) -> bool: return type == "movie" -def isEpisode(type): +def isEpisode(type: str) -> bool: return type == "episode" -def isShow(type): +def isShow(type: str) -> bool: return type == "show" -def isSeason(type): +def isSeason(type: str) -> bool: return type == "season" -def isValidMediaType(type): +def isValidMediaType(type: str) -> bool: return type in ["movie", "show", "season", "episode"] -def chunks(list, n): - return [list[i : i + n] for i in range(0, len(list), n)] +def chunks(list_data: List, n: int) -> List: + return [list_data[i : i + n] for i in range(0, len(list_data), n)] -def getFormattedItemName(type, info): +def getFormattedItemName(type: str, info: Dict) -> str: s = "" try: if isShow(type): @@ -63,8 +63,8 @@ def getFormattedItemName(type, info): return s -def __findInList(list, case_sensitive=True, **kwargs): - for item in list: +def __findInList(list_data: List, case_sensitive: bool = True, **kwargs) -> Optional[Dict]: + for item in list_data: i = 0 for key in kwargs: # because we can need to find at the root level and inside ids this @@ -88,7 +88,7 @@ def __findInList(list, case_sensitive=True, **kwargs): return None -def findMediaObject(mediaObjectToMatch, listToSearch, matchByTitleAndYear): +def findMediaObject(mediaObjectToMatch: Dict, listToSearch: List, matchByTitleAndYear: bool) -> Optional[Dict]: result = None if ( result is None @@ -134,7 +134,7 @@ def findMediaObject(mediaObjectToMatch, listToSearch, matchByTitleAndYear): return result -def regex_tvshow(label): +def regex_tvshow(label: str) -> Tuple[str, int, int]: regexes = [ # ShowTitle.S01E09; s01e09, s01.e09, s01-e09 r"(.*?)[._ -]s([0-9]+)[._ -]*e([0-9]+)", @@ -160,7 +160,7 @@ def regex_tvshow(label): return "", -1, -1 -def regex_year(title): +def regex_year(title: str) -> Tuple[str, str]: prog = re.compile(r"^(.+) \((\d{4})\)$") result = prog.match(title) @@ -170,7 +170,7 @@ def regex_year(title): return "", "" -def findMovieMatchInList(id, listToMatch, idType): +def findMovieMatchInList(id: str, listToMatch: Dict, idType: str) -> Dict: return next( ( item.to_dict() @@ -181,7 +181,7 @@ def findMovieMatchInList(id, listToMatch, idType): ) -def findShowMatchInList(id, listToMatch, idType): +def findShowMatchInList(id: str, listToMatch: Dict, idType: str) -> Dict: return next( ( item.to_dict() @@ -192,7 +192,7 @@ def findShowMatchInList(id, listToMatch, idType): ) -def findSeasonMatchInList(id, seasonNumber, listToMatch, idType): +def findSeasonMatchInList(id: str, seasonNumber: int, listToMatch: Dict, idType: str) -> Dict: show = findShowMatchInList(id, listToMatch, idType) logger.debug("findSeasonMatchInList %s" % show) if "seasons" in show: @@ -203,7 +203,7 @@ def findSeasonMatchInList(id, seasonNumber, listToMatch, idType): return {} -def findEpisodeMatchInList(id, seasonNumber, episodeNumber, list, idType): +def findEpisodeMatchInList(id: str, seasonNumber: int, episodeNumber: int, list_data: Dict, idType: str) -> Dict: season = findSeasonMatchInList(id, seasonNumber, list, idType) if season: for episode in season["episodes"]: @@ -213,7 +213,7 @@ def findEpisodeMatchInList(id, seasonNumber, episodeNumber, list, idType): return {} -def convertDateTimeToUTC(toConvert): +def convertDateTimeToUTC(toConvert: Optional[str]) -> Optional[str]: if toConvert: dateFormat = "%Y-%m-%d %H:%M:%S" try: @@ -234,7 +234,7 @@ def convertDateTimeToUTC(toConvert): return toConvert -def convertUtcToDateTime(toConvert): +def convertUtcToDateTime(toConvert: Optional[str]) -> Optional[str]: if toConvert: dateFormat = "%Y-%m-%d %H:%M:%S" try: @@ -251,7 +251,7 @@ def convertUtcToDateTime(toConvert): return toConvert -def createError(ex): +def createError(ex: Exception) -> str: template = ( "EXCEPTION Thrown (PythonToCppException) : -->Python callback/script returned the following error<--\n" " - NOTE: IGNORING THIS CAN LEAD TO MEMORY LEAKS!\n" @@ -263,7 +263,7 @@ def createError(ex): return template.format(type(ex).__name__, ex.args, traceback.format_exc()) -def guessBestTraktId(id, type) -> Tuple[dict, str]: +def guessBestTraktId(id: str, type: str) -> Tuple[Dict, str]: data = {} id_type = "" if id.startswith("tt"): @@ -281,7 +281,7 @@ def guessBestTraktId(id, type) -> Tuple[dict, str]: return data, id_type -def best_id(ids, type) -> Tuple[str, str]: +def best_id(ids: Dict, type: str) -> Tuple[str, str]: if "trakt" in ids: return ids["trakt"], "trakt" elif "imdb" in ids and isMovie(type): @@ -296,7 +296,7 @@ def best_id(ids, type) -> Tuple[str, str]: return ids["slug"], "slug" -def checkExcludePath(excludePath, excludePathEnabled, fullpath, x): +def checkExcludePath(excludePath: str, excludePathEnabled: bool, fullpath: str, x: int) -> bool: if excludePath != "" and excludePathEnabled and fullpath.startswith(excludePath): logger.debug( "checkExclusion(): Video is from location, which is currently set as excluded path %i." @@ -307,7 +307,7 @@ def checkExcludePath(excludePath, excludePathEnabled, fullpath, x): return False -def sanitizeMovies(movies): +def sanitizeMovies(movies: List) -> None: # do not remove watched_at and collected_at may cause problems between the # 4 sync types (would probably have to deepcopy etc) for movie in movies: @@ -326,7 +326,7 @@ def sanitizeMovies(movies): # todo add tests -def sanitizeShows(shows): +def sanitizeShows(shows: Dict) -> None: # do not remove watched_at and collected_at may cause problems between the # 4 sync types (would probably have to deepcopy etc) for show in shows["shows"]: @@ -345,14 +345,14 @@ def sanitizeShows(shows): def compareMovies( - movies_col1, - movies_col2, - matchByTitleAndYear, - watched=False, - restrict=False, - playback=False, - rating=False, -): + movies_col1: List, + movies_col2: List, + matchByTitleAndYear: bool, + watched: bool = False, + restrict: bool = False, + playback: bool = False, + rating: bool = False, +) -> List: movies = [] for movie_col1 in movies_col1: if movie_col1: @@ -396,8 +396,8 @@ def compareMovies( def compareShows( - shows_col1, shows_col2, matchByTitleAndYear, rating=False, restrict=False -): + shows_col1: Dict, shows_col2: Dict, matchByTitleAndYear: bool, rating: bool = False, restrict: bool = False +) -> Dict: shows = [] # logger.debug("shows_col1 %s" % shows_col1) # logger.debug("shows_col2 %s" % shows_col2) @@ -454,15 +454,15 @@ def compareShows( # always return shows_col1 if you have enrich it, but don't return shows_col2 def compareEpisodes( - shows_col1, - shows_col2, - matchByTitleAndYear, - watched=False, - restrict=False, - collected=False, - playback=False, - rating=False, -): + shows_col1: Dict, + shows_col2: Dict, + matchByTitleAndYear: bool, + watched: bool = False, + restrict: bool = False, + collected: Union[Dict, bool] = False, + playback: bool = False, + rating: bool = False, +) -> Dict: shows = [] # logger.debug("epi shows_col1 %s" % shows_col1) # logger.debug("epi shows_col2 %s" % shows_col2) @@ -640,7 +640,7 @@ def compareEpisodes( return result -def countEpisodes(shows, collection=True): +def countEpisodes(shows: Union[Dict, List], collection: bool = True) -> int: count = 0 if "shows" in shows: shows = shows["shows"] @@ -659,7 +659,7 @@ def countEpisodes(shows, collection=True): return count -def __getEpisodes(seasons): +def __getEpisodes(seasons: List) -> Dict: data = {} for season in seasons: episodes = {} @@ -670,7 +670,7 @@ def __getEpisodes(seasons): return data -def checkIfNewVersion(old, new): +def checkIfNewVersion(old: str, new: str) -> bool: # Check if old is empty, it might be the first time we check if old == "": return True @@ -686,7 +686,7 @@ def checkIfNewVersion(old, new): return False -def _to_sec(timedelta_string, factors=(1, 60, 3600, 86400)): +def _to_sec(timedelta_string: str, factors: Tuple[int, ...] = (1, 60, 3600, 86400)) -> float: """[[[days:]hours:]minutes:]seconds -> seconds""" return sum( x * y @@ -694,7 +694,7 @@ def _to_sec(timedelta_string, factors=(1, 60, 3600, 86400)): ) -def _fuzzyMatch(string1, string2, match_percent=55.0): +def _fuzzyMatch(string1: str, string2: str, match_percent: float = 55.0) -> bool: s = difflib.SequenceMatcher(None, string1, string2) s.find_longest_match(0, len(string1), 0, len(string2)) return (