Replies: 3 comments
-
I like this idea! Will have to do some research though to figure out how to implement, API limits and all that fun stuff. If anyone has some scripts or ideas to accomplish this, I'm open! I'm going to move this to a discussion. |
Beta Was this translation helpful? Give feedback.
-
I found this the other day: https://github.com/AlkalineJaunt/xmltv-tools It was fairly out of date so I had a crack at updating it. The gist is:
# /usr/bin/env python
"""
Original Author:
xmltv-proc-nz by Hadley Rich <[email protected]> "https://github.com/hadleyrich/xmltv-tools"
Contributions by Aaron Pelly <[email protected]> "https://github.com/apelly/xmltv-tools"
Licensed under the BSD License.
Modified by Lepresidente
ChangeLog:
* Use Redis to store found items to reduce load on api on reruns
* Dropped tvdb due to api changes
* fixed xmltv support using the latest standard
* Some code cleanup and dead code removal
* Updated to use environment variables for docker support
* Removed local image storage in favor of direct TMDB URLs
* Added --no-cache flag to disable Redis caching
"""
# TODO: Find repeats
# TODO: Regex replacements for categories
import redis
import requests
import multiprocessing
import os
import sys
import logging
import threading
import time
import re
from pathlib import Path
from xml.etree import cElementTree as ElementTree
from datetime import timedelta, tzinfo
from optparse import OptionParser
from dotenv import load_dotenv
import xml.sax.saxutils as saxutils
import xml.etree.ElementTree as ET
# Load environment variables from .env file
load_dotenv()
NAME = 'enhance'
VERSION = '0.0.2'
TIME_FORMAT = '%Y%m%d%H%M%S'
threadcount = multiprocessing.cpu_count() * 2
log = logging.getLogger(NAME)
logging.basicConfig(level=logging.WARNING, format='%(message)s')
downloadlist = []
# Variables
REDIS_HOST = os.getenv('REDIS_HOST', "localhost")
REDIS_PORT = os.getenv('REDIS_PORT', 6379)
REDIS_PASS = os.getenv('REDIS_PASS', "")
TMDB_API = os.getenv('TMDB_API', None)
r = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
password=REDIS_PASS)
try:
import tmdbv3api
except ImportError:
log.warning("Failed to import tmdbv3api module.")
tmdbcheck = False
else:
tmdbcheck = True
class UTC(tzinfo):
"""
Represents the UTC timezone
"""
def utcoffset(self, dt):
return timedelta(0)
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return timedelta(0)
class LocalTimezone(tzinfo):
"""
Represents the computers local timezone
"""
def __init__(self):
self.STDOFFSET = timedelta(seconds=-time.timezone)
if time.daylight:
self.DSTOFFSET = timedelta(seconds=-time.altzone)
else:
self.DSTOFFSET = self.STDOFFSET
self.DSTDIFF = self.DSTOFFSET - self.STDOFFSET
tzinfo.__init__(self)
def utcoffset(self, dt):
if self._isdst(dt):
return self.DSTOFFSET
else:
return self.STDOFFSET
def dst(self, dt):
if self._isdst(dt):
return self.DSTDIFF
else:
return timedelta(0)
def tzname(self, dt):
return time.tzname[self._isdst(dt)]
def _isdst(self, dt):
tt = (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second,
dt.weekday(), 0, -1)
stamp = time.mktime(tt)
tt = time.localtime(stamp)
return tt.tm_isdst > 0
localtz = LocalTimezone()
utc = UTC()
class BaseProcessor(object):
valid = True
def __call__(self, programme):
raise NotImplementedError
def post_process(self, programmes):
raise NotImplementedError
class Movies(BaseProcessor):
"""
Augment movies with data from themoviedb.com
"""
def __init__(self):
if not tmdbcheck:
self.valid = False
log.warning('Movies: TMDB module not found.')
return
if TMDB_API is None:
self.valid = False
log.critical("TMDB API key missing")
sys.exit(1)
log.debug("Using TMDB API key %s", TMDB_API)
tmdb = tmdbv3api.TMDb()
tmdb.api_key = TMDB_API
tmdb.language = 'en'
def __call__(self, programme):
if not self.valid:
return
try:
start = programme.get('start')
stop = programme.get('stop')
title_elem = programme.find('title')
if title_elem is None or title_elem.text is None:
return
title = str(title_elem.text)
channel = programme.get('channel')
except Exception as e:
log.debug('Movies: Ignoring invalid programme: %s', str(e))
return
if stop is None:
return
if programme.get('skip_processors') == 'true':
return
elif programme.get('skip_processors') == 'false':
return
else:
if programme.find('icon') is not None:
log.debug('Movies: Skipping "%s" - already has an image', title)
programme.set('skip_processors', 'true') # Set flag to skip other processors
return
# Add date from start time
if start and len(start) >= 8:
date = start[:8] # Extract YYYYMMDD
if programme.find('date') is None:
date_elem = ElementTree.SubElement(programme, 'date')
date_elem.text = date
log.info('Movies: Adding date "%s"', date)
# Unfortunately strptime can't handle numeric timezones so we strip it.
# It's only for getting possible movies so won't matter too much.
if ' ' in start:
start = start.split(' ')[0]
if ' ' in stop:
stop = stop.split(' ')[0]
start_time = time.mktime(time.strptime(start, TIME_FORMAT))
stop_time = time.mktime(time.strptime(stop, TIME_FORMAT))
duration = stop_time - start_time
# always look up things in the movie category. try to identify others
# by duration/channel/title
MovieCat = False
for cat in programme.findall('category'):
if cat.text != "movie":
MovieCat = True
if not MovieCat:
# Between 90 mins and 4 hours
if duration <= 5400 or duration > 14400:
return
log.debug('Movies: Possible movie "%s" (duration %dm) on channel "%s"',
title, duration / 60, channel)
movie = None
movie_title = None
movie_runtime = None
movie_posterurl = None
movie_overview = None
movie_genres = None
genres_list = None
movie_credits = None
if not no_cache and r.get('movies.{}.title'.format(title.replace(" ", "_"))):
movie_title = r.get('movies.{}.title'.format(title.replace(" ",
"_"))).decode('utf-8')
if movie_title == "NotFound":
log.debug('Movies: Ignored "%s" due to being set to NotFound '
'on tmdb', title)
return
if movie_title == "Multiples":
log.debug('Movies: Ignored "%s" due to multiple results on '
'tmdb', title)
return
else:
movie_runtime = r.get('movies.{}.runtime'.format(
title.replace(" ", "_"))).decode('utf-8')
if r.get('movies.{}.poster'.format(title.replace(" ", "_"))):
movie_posterurl = r.get('movies.{}.poster'.format(
title.replace(" ", "_"))
).decode('utf-8')
movie_overview = r.get('movies.{}.overview'.format(
title.replace(" ", "_"))
).decode('utf-8')
if r.get('movies.{}.genres'.format(title.replace(" ", "_"))):
movie_genres = r.get('movies.{}.genres'.format(
title.replace(" ", "_"))).decode('utf-8')
if r.get('movies.{}.credits'.format(title.replace(" ", "_"))):
movie_credits = r.get('movies.{}.credits'.format(
title.replace(" ", "_"))).decode('utf-8')
log.debug('Movies: Redis hit for "%s"', title)
else:
try:
movietmdb = tmdbv3api.Movie()
results = movietmdb.search(title.replace('?', ''))
except tmdbv3api.tmdb.TMDbException as e:
log.warning('Movies: TMDB search failed for "%s": %s', title, str(e))
return
matches = []
for result in results:
if result is not None and hasattr(result, 'title'):
result_title = str(result.title)
if normalise_title(title) == normalise_title(result_title):
matches.append(result)
log.debug('Movies: Exact title matches: %d', len(matches))
for movie in matches:
try:
moviedetails = movietmdb.details(movie.id)
except tmdbv3api.tmdb.TMDbException as e:
log.warning('Movies: TMDB details failed for "%s": %s', title, str(e))
continue
if moviedetails.release_date is None:
log.debug('Movies: Found match "%s"', moviedetails.title)
else:
log.debug('Movies: Found match "%s" (%s)', moviedetails.title, moviedetails.release_date)
if len(matches) == 1:
try:
log.debug('Movies: Cache miss for "%s"', title)
movie = movietmdb.details(matches[0].id)
# Get credits
credits = movietmdb.credits(matches[0].id)
if credits and hasattr(credits, 'cast'):
cast_list = []
# Convert cast to list and take first 5
cast_members = list(credits.cast)[:5]
for cast in cast_members:
if hasattr(cast, 'name'):
cast_list.append(cast.name)
movie_credits = "|".join(cast_list)
if not no_cache:
r.set('movies.{}.credits'.format(title.replace(" ", "_")),
movie_credits)
r.expire('movies.{}.credits'.format(title.replace(" ", "_")
), 60 * 60 * 24 * 90)
except tmdbv3api.tmdb.TMDbException as e:
log.warning('Movies: TMDB details failed for "%s": %s', title, str(e))
return
movie_title = movie.title
movie_runtime = movie.runtime
if movie.poster_path is not None:
tmdbconfiguration = tmdbv3api.Configuration()
base_url = tmdbconfiguration.info().images['base_url']
movie_posterurl = base_url + "w342" + movie.poster_path
if not no_cache:
r.set('movies.{}.poster'.format(title.replace(" ", "_")),
movie_posterurl)
r.expire('movies.{}.poster'.format(title.replace(" ", "_")
), 60 * 60 * 24 * 90)
movie_overview = movie.overview
for genre in movie.genres:
if genres_list is None:
genres_list = "{}".format(genre.name)
else:
genres_list += "|{}".format(genre.name)
movie_genres = genres_list
if not no_cache:
r.set('movies.{}.title'.format(title.replace(" ", "_")
), movie.title)
r.expire('movies.{}.title'.format(title.replace(" ", "_")
), 60 * 60 * 24 * 90)
r.set('movies.{}.runtime'.format(title.replace(" ", "_")
), movie.runtime)
r.expire('movies.{}.runtime'.format(title.replace(" ", "_")
), 60 * 60 * 24 * 90)
r.set('movies.{}.overview'.format(title.replace(" ", "_")
), movie.overview)
r.expire('movies.{}.overview'.format(title.replace(" ", "_")
), 60 * 60 * 24 * 90)
if movie_genres is not None:
r.set('movies.{}.genres'.format(title.replace(" ", "_")
), movie_genres)
r.expire('movies.{}.genres'.format(title.replace(" ", "_")
), 60 * 60 * 24 * 90)
elif len(matches) > 1:
if not no_cache:
r.set('movies.{}.title'.format(title.replace(" ", "_")
), "Multiples")
r.expire('movies.{}.title'.format(title.replace(" ", "_")
), 60 * 60 * 24 * 90)
return
else:
if not no_cache:
r.set('movies.{}.title'.format(title.replace(" ", "_")
), "NotFound")
r.expire('movies.{}.title'.format(title.replace(" ", "_")
), 60 * 60 * 24 * 90)
return
if movie_title is None:
log.debug('Movies: Ignored due to being not found before "%s"',
title)
return
if movie_posterurl:
log.info('Movies: Adding poster URL for %s', title)
poster = ElementTree.SubElement(programme, 'icon')
poster.set('src', movie_posterurl)
programme.set('skip_processors', 'false')
if movie_credits:
log.info('Movies: Adding credits for %s', title)
# Remove existing credits if any
existing_credits = programme.find('credits')
if existing_credits is not None:
programme.remove(existing_credits)
credits = ElementTree.SubElement(programme, 'credits')
for actor in movie_credits.split('|'):
actor_elem = ElementTree.SubElement(credits, 'actor')
actor_elem.text = actor
if movie_genres:
for c in movie_genres.split("|"):
exists = False
for old_cat in programme.findall('category'):
if old_cat.text == c:
exists = True
break
if not exists:
log.info('Movies: Adding category "%s"', c)
category = ElementTree.SubElement(programme, 'category')
category.set('lang', 'en')
category.text = c
log.info('Movies: Adding info from TMDB for %s', title)
exists = False
for old_cat in programme.findall('category'):
if old_cat.text == 'movie':
exists = True
break
if not exists:
log.info('Movies: Adding category "%s"', 'Movie')
category = ElementTree.SubElement(programme, 'category')
category.set('lang', 'en')
category.text = 'movie'
if movie_overview:
log.info('Movies: Adding overview "%s"', movie_overview)
if programme.find('desc') is not None:
programme.find('desc').text = movie_overview
else:
desc = ElementTree.SubElement(programme, 'desc')
desc.text = movie.overview
if movie_runtime:
log.info('Movies: Adding runtime "%s"', movie_runtime)
if programme.find('length') is not None:
programme.remove(programme.find('length'))
length = ElementTree.SubElement(programme, 'length')
length.set('units', 'minutes')
length.text = str(movie_runtime)
class Series(BaseProcessor):
"""
Augment TV shows with data from thetvdb.com
"""
def __init__(self):
if not tmdbcheck:
self.valid = False
log.warning('Series: TMDB module not found.')
return
if TMDB_API is None:
self.valid = False
log.critical("TMDB API key missing")
sys.exit(1)
log.debug("Using TMDB API key %s", TMDB_API)
tmdb = tmdbv3api.TMDb()
tmdb.api_key = TMDB_API
tmdb.language = 'en'
def __call__(self, programme):
if not self.valid:
return
try:
start = programme.get('start')
stop = programme.get('stop')
title_elem = programme.find('title')
if title_elem is None or title_elem.text is None:
return
title = str(title_elem.text)
except Exception as e:
log.debug('Series: Ignoring invalid programme: %s', str(e))
return
if stop is None:
return
if programme.get('skip_processors') == 'true':
return
elif programme.get('skip_processors') == 'false':
return
else:
if programme.find('icon') is not None:
log.debug('Movies: Skipping "%s" - already has an image', title)
programme.set('skip_processors', 'true') # Set flag to skip other processors
return
# Add date from start time
if start and len(start) >= 8:
date = start[:8] # Extract YYYYMMDD
if programme.find('date') is None:
date_elem = ElementTree.SubElement(programme, 'date')
date_elem.text = date
log.info('Series: Adding date "%s"', date)
# Unfortunately strptime can't handle numeric timezones so we strip it.
# It's only for getting possible tv shows so won't matter too much.
if ' ' in start:
start = start.split(' ')[0]
if ' ' in stop:
stop = stop.split(' ')[0]
start_time = time.mktime(time.strptime(start, TIME_FORMAT))
stop_time = time.mktime(time.strptime(stop, TIME_FORMAT))
duration = stop_time - start_time
if duration > 5400:
log.debug('Series: Skipping "%s" since runtime over 90 minutes',
title)
return
series_poster = None
series_credits = None
series_genres = None
genres_list = None
series_runtime = None
if not no_cache and r.get('series.{}.title'.format(title.replace(" ", "_"))):
if r.get('series.{}.title'.format(title.replace(" ", "_"))
) == "NotFound":
log.debug('Series: Series ignore for "%s"', title)
return
else:
log.debug('Series: Cache hit for "%s"', title)
if r.get('series.{}.poster'.format(title.replace(" ", "_"))
) is not None:
series_poster = r.get('series.{}.poster'
.format(title.replace(" ", "_"))
).decode('utf-8')
if r.get('series.{}.credits'.format(title.replace(" ", "_"))
) is not None:
series_credits = r.get('series.{}.credits'
.format(title.replace(" ", "_"))
).decode('utf-8')
if r.get('series.{}.genres'.format(title.replace(" ", "_"))
) is not None:
series_genres = r.get('series.{}.genres'
.format(title.replace(" ", "_"))
).decode('utf-8')
if r.get('series.{}.runtime'.format(title.replace(" ", "_"))
) is not None:
series_runtime = r.get('series.{}.runtime'
.format(title.replace(" ", "_"))
).decode('utf-8')
if not series_poster:
log.debug('Series: Series ignored no poster for "%s"',
title)
return
else:
try:
tvtmdb = tmdbv3api.TV()
results = tvtmdb.search(title.replace('?', ''))
log.debug('Series: Searching for title %s', title.replace('?', ''))
except tmdbv3api.tmdb.TMDbException as e:
log.warning('Series: TMDB search failed for "%s": %s', title, str(e))
return
matches = []
for result in results:
if result is not None and hasattr(result, 'name'):
result_title = str(result.name)
if normalise_title(title) == normalise_title(result_title):
matches.append(result)
log.debug('Series: Exact title matches: %d', len(matches))
for series in matches:
log.debug('Series: Found match "%s"', series.name)
if len(matches) >= 1:
try:
log.debug('Series: Cache miss for "%s"', title)
seriesdetails = tvtmdb.details(matches[0].id)
# Get credits
credits = tvtmdb.credits(matches[0].id)
if credits and hasattr(credits, 'cast'):
cast_list = []
# Convert cast to list and take first 5
cast_members = list(credits.cast)[:5]
for cast in cast_members:
if hasattr(cast, 'name'):
cast_list.append(cast.name)
series_credits = "|".join(cast_list)
if not no_cache:
r.set('series.{}.credits'.format(title.replace(" ", "_")),
series_credits)
r.expire('series.{}.credits'.format(title.replace(" ", "_")
), 60 * 60 * 24 * 90)
except tmdbv3api.tmdb.TMDbException as e:
log.warning('Series: TMDB details failed for "%s": %s', title, str(e))
return
log.debug('Series: Cache miss for "%s"', title)
tmdbconfiguration = tmdbv3api.Configuration()
base_url = tmdbconfiguration.info().images['base_url']
if seriesdetails.poster_path is not None:
series_poster = base_url + "w342" + seriesdetails.poster_path
if series_poster is not None:
if not no_cache:
r.set('series.{}.title'.format(title.replace(" ", "_")),
title)
r.expire('series.{}.title'.format(title.replace(" ", "_")
), 60 * 60 * 24 * 90)
series_poster = series_poster
r.set('series.{}.poster'.format(title.replace(" ", "_")
), series_poster)
r.expire('series.{}.poster'.format(title.replace(" ", "_")
), 60 * 60 * 24 * 90)
# Store genres
for genre in seriesdetails.genres:
if genres_list is None:
genres_list = "{}".format(genre.name)
else:
genres_list += "|{}".format(genre.name)
series_genres = genres_list
if not no_cache and series_genres:
r.set('series.{}.genres'.format(title.replace(" ", "_")),
series_genres)
r.expire('series.{}.genres'.format(title.replace(" ", "_")
), 60 * 60 * 24 * 90)
# Store runtime
if seriesdetails.episode_run_time and len(seriesdetails.episode_run_time) > 0:
series_runtime = seriesdetails.episode_run_time[0] # Use first episode runtime
if not no_cache:
r.set('series.{}.runtime'.format(title.replace(" ", "_")),
series_runtime)
r.expire('series.{}.runtime'.format(title.replace(" ", "_")
), 60 * 60 * 24 * 90)
log.info('Series: Adding info from TMDB for %s', title)
else:
log.debug('Series: No poster found "%s"', title)
if not no_cache:
r.set('series.{}.title'.format(title.replace(" ", "_")
), "NotFound")
r.expire('series.{}.title'.format(title.replace(" ", "_")
), 60 * 60 * 24 * 90)
return
if series_poster is not None:
log.info('Series: Adding poster URL for show "%s"', title)
poster = ElementTree.SubElement(programme, 'icon')
poster.set('src', series_poster)
programme.set('skip_processors', 'false')
if series_credits:
log.info('Series: Adding credits for %s', title)
# Remove existing credits if any
existing_credits = programme.find('credits')
if existing_credits is not None:
programme.remove(existing_credits)
credits = ElementTree.SubElement(programme, 'credits')
for actor in series_credits.split('|'):
actor_elem = ElementTree.SubElement(credits, 'actor')
actor_elem.text = actor
if series_genres:
for c in series_genres.split("|"):
exists = False
for old_cat in programme.findall('category'):
if old_cat.text == c:
exists = True
break
if not exists:
log.info('Series: Adding category "%s"', c)
category = ElementTree.SubElement(programme, 'category')
category.set('lang', 'en')
category.text = c
if series_runtime:
log.info('Series: Adding runtime "%s"', series_runtime)
if programme.find('length') is not None:
programme.remove(programme.find('length'))
length = ElementTree.SubElement(programme, 'length')
length.set('units', 'minutes')
length.text = str(series_runtime)
class Episodes(BaseProcessor):
"""
Augment TV shows with data from thetvdb.com
"""
def __init__(self):
self.cache = {}
if not tmdbcheck:
self.valid = False
log.warning('Series: TMDB module not found.')
return
if TMDB_API is None:
self.valid = False
log.critical("TMDB API key missing")
sys.exit(1)
log.debug("Using TMDB API key %s", TMDB_API)
tmdb = tmdbv3api.TMDb()
tmdb.api_key = TMDB_API
tmdb.language = 'en'
def __call__(self, programme):
if not self.valid:
return
# Skip if programme has skip flag
if programme.get('skip_processors') == 'true':
return
try:
start = programme.get('start')
stop = programme.get('stop')
title_elem = programme.find('title')
if title_elem is None or title_elem.text is None:
return
title = str(title_elem.text)
episodes = programme.findall('episode-num')
except:
log.debug('Episodes: Ignoring invalid programme')
return
if stop is None:
return
# Unfortunately strptime can't handle numeric timezones so we strip it.
# It's only for getting possible tv shows so won't matter too much.
if ' ' in start:
start = start.split(' ')[0]
if ' ' in stop:
stop = stop.split(' ')[0]
start_time = time.mktime(time.strptime(start, TIME_FORMAT))
stop_time = time.mktime(time.strptime(stop, TIME_FORMAT))
duration = stop_time - start_time
if duration > 5400: # give up if longer than 90 minutes
return
try:
for episode in episodes:
if episode.get('system') == "xmltv_ns":
try:
season = int(episode.text.split('.')[0]) + 1
episode_num = int(episode.text.split('.')[1]) + 1
except (ValueError, IndexError):
log.debug('Episodes: Invalid episode number format for "%s"', title)
continue
log.debug('Episodes: Looking up season %s, episode %s of show "%s"',
season, episode_num, title)
try:
tvtmdb = tmdbv3api.TV()
results = tvtmdb.search(title.replace('?', ''))
except tmdbv3api.tmdb.TMDbException as e:
log.warning('Episodes: TMDB search failed for "%s": %s', title, str(e))
continue
matches = []
for result in results:
if result is not None:
if normalise_title(title) == normalise_title(result.name):
matches.append(result)
if not matches:
log.debug('Episodes: No matches found for "%s"', title)
continue
try:
series_id = matches[0].id
episodetmdb = tmdbv3api.Episode()
episodedetails = episodetmdb.details(series_id, season, episode_num)
except tmdbv3api.tmdb.TMDbException as e:
log.warning('Episodes: Failed to get episode details for "%s" S%sE%s: %s',
title, season, episode_num, str(e))
continue
# Process episode details
if episodedetails.name:
subtitle = ElementTree.SubElement(programme, 'sub-title')
subtitle.text = episodedetails.name
log.info('Episodes: Subtitle for "%s" is "%s"', title, episodedetails.name)
# Update description with TMDB episode description
if hasattr(episodedetails, 'overview') and episodedetails.overview:
if programme.find('desc') is not None:
programme.find('desc').text = episodedetails.overview
else:
desc = ElementTree.SubElement(programme, 'desc')
desc.text = episodedetails.overview
log.info('Episodes: Updated description for "%s" S%sE%s',
title, season, episode_num)
if episodedetails.vote_average:
rating = episodedetails.vote_average
log.info('Episodes: Adding rating "%s"', rating)
if programme.find('star-rating') is not None:
programme.remove(programme.find('star-rating'))
urating = ElementTree.SubElement(programme, 'star-rating')
value = ElementTree.SubElement(urating, 'value')
value.text = str('%s/10' % rating)
# Add new tag if episode is new
if programme.find('new') is None: # Only proceed if no new tag exists
if hasattr(episodedetails, 'still_path') and episodedetails.still_path:
# Check if episode is new (first airing)
if hasattr(episodedetails, 'air_date') and episodedetails.air_date:
try:
# Get the series details to check if this is the current season
series_details = tvtmdb.details(series_id)
current_season = series_details.season_number if hasattr(series_details, 'season_number') else None
# Only mark as new if:
# 1. This is the current season
# 2. This is the first airing of this episode
# 3. The air date is recent (within last 7 days)
if (current_season and season == current_season and
episodedetails.episode_number == episode_num):
air_date = time.strptime(episodedetails.air_date, '%Y-%m-%d')
air_date_timestamp = time.mktime(air_date)
current_time = time.time()
if current_time - air_date_timestamp <= 7 * 24 * 60 * 60: # 7 days in seconds
new_elem = ElementTree.SubElement(programme, 'new')
log.info('Episodes: Adding new tag for "%s" S%sE%s (first airing)',
title, season, episode_num)
except (ValueError, TypeError) as e:
log.debug('Episodes: Error processing air date for "%s": %s',
title, str(e))
except Exception as e:
log.exception('Episodes: Error processing "%s": %s', title, str(e))
return
class HD(BaseProcessor):
"""
Look for a HD note in a description.
"""
regexes = (
re.compile(r'HD\.?$'),
re.compile(r'\(HD\)$'),
)
def __call__(self, programme):
# Skip if programme has skip flag
if programme.get('skip_processors') == 'true':
return
desc = programme.find('desc')
if desc is not None and desc.text:
for regex in self.regexes:
matched = regex.search(desc.text)
if matched:
log.debug('HD: Found "%s"', programme.find('title').text)
if programme.find('video') is not None:
if programme.find('quality') is None:
quality = ElementTree.SubElement(programme.find('video'), 'quality')
quality.text = 'HDTV'
elif programme.find('quality').text != 'HDTV':
programme.find('quality').text = 'HDTV'
else:
video = ElementTree.SubElement(programme, 'video')
present = ElementTree.SubElement(video, 'present')
present.text = 'yes'
aspect = ElementTree.SubElement(video, 'aspect')
aspect.text = '16:9'
quality = ElementTree.SubElement(video, 'quality')
quality.text = 'HDTV'
desc.text = regex.sub('', desc.text)
class Subtitle(BaseProcessor):
"""
Look for a subtitle in a description.
"""
regexes = (
re.compile(r"(Today|Tonight)?:? ?'(?P<subtitle>.*?)'\.\s?"),
re.compile(r"'(?P<subtitle>.{2,60}?)\.'\s"),
re.compile(r"(?P<subtitle>.{2,60}?):\s"),
)
def __call__(self, programme):
# Skip if programme has skip flag
if programme.get('skip_processors') == 'true':
return
desc = programme.find('desc')
if desc is not None and desc.text:
for regex in self.regexes:
matched = regex.match(desc.text)
if matched and 'subtitle' not in programme:
subtitle = ElementTree.SubElement(programme, 'sub-title')
subtitle.text = matched.group('subtitle')
log.debug('Subtitle: "%s" for "%s"', subtitle.text, programme.find('title').text)
desc.text = regex.sub('', desc.text)
class EpDesc(BaseProcessor):
"""
Look for a Season/Episode info in a description.
"""
desc_regexes = (
re.compile(r'[Ss](\d+)[Ee](\d+)'), # S01E02 format
re.compile(r'[Ss]eason\s*(\d+)[Ee]pisode\s*(\d+)'), # Season 1 Episode 2 format
re.compile(r'[Ss](\d+)\s*[Ee](\d+)'), # S1 E2 format
re.compile(r'[Ss]eason\s*(\d+)\s*[Ee]p\s*(\d+)'), # Season 1 Ep 2 format
re.compile(r'[Ss](\d+)\s*[Ee]p\s*(\d+)'), # S1 Ep 2 format
)
progid_regexes = (
re.compile(r'\s?(\d+)Ep\s?(\d+)'),
)
def __call__(self, programme):
# Skip if programme has skip flag
if programme.get('skip_processors') == 'true':
return
desc = programme.find('desc')
if desc is not None and desc.text:
for regex in self.desc_regexes:
matched = regex.search(desc.text)
if matched:
try:
season, episode = [int(x) for x in matched.groups()]
log.debug('EpDesc: From desc: Found season %s episode %s for "%s"',
season, episode, programme.find('title').text)
# Check if xmltv_ns format exists
has_xmltv_ns = False
has_onscreen = False
for ep in programme.findall('episode-num'):
if ep.get('system') == 'xmltv_ns':
has_xmltv_ns = True
elif ep.get('system') == 'onscreen':
has_onscreen = True
# Add xmltv_ns format if it doesn't exist
if not has_xmltv_ns:
episode_num = ElementTree.SubElement(programme, 'episode-num')
episode_num.set('system', 'xmltv_ns')
episode_num.text = '%s.%s.0' % (season - 1, episode - 1)
# Add onscreen format if it doesn't exist
if not has_onscreen:
episode_num = ElementTree.SubElement(programme, 'episode-num')
episode_num.set('system', 'onscreen')
episode_num.text = 'S%dE%d' % (season, episode)
break # Stop after first match
except (ValueError, IndexError) as e:
log.debug('EpDesc: Error parsing season/episode: %s', str(e))
continue
# Check for dd_progid format
episodes = programme.findall('episode-num')
for episode in episodes:
if episode.get('system') == "dd_progid":
for regex in self.progid_regexes:
matched = regex.search(episode.text)
if matched:
try:
season, ep = [int(x) for x in matched.groups()]
log.debug('EpDesc: From dd_progid: Found season %s episode %s for "%s"',
season, ep, programme.find('title').text)
# Check if formats exist
has_xmltv_ns = False
has_onscreen = False
for ep in programme.findall('episode-num'):
if ep.get('system') == 'xmltv_ns':
has_xmltv_ns = True
elif ep.get('system') == 'onscreen':
has_onscreen = True
# Add xmltv_ns format if it doesn't exist
if not has_xmltv_ns:
episode_num = ElementTree.SubElement(programme, 'episode-num')
episode_num.set('system', 'xmltv_ns')
episode_num.text = '%s.%s.0' % (season - 1, ep - 1)
# Add onscreen format if it doesn't exist
if not has_onscreen:
episode_num = ElementTree.SubElement(programme, 'episode-num')
episode_num.set('system', 'onscreen')
episode_num.text = 'S%dE%d' % (season, ep)
break # Stop after first match
except (ValueError, IndexError) as e:
log.debug('EpDesc: Error parsing dd_progid: %s', str(e))
continue
def compare_programme(x):
"""
Comparison helper to sort the children elements of an
XMLTV programme tag.
"""
programme_order = (
'title', 'sub-title', 'desc', 'credits', 'date',
'category', 'language', 'orig-language', 'length',
'icon', 'url', 'country', 'episode-num', 'video', 'audio',
'previously-shown', 'premiere', 'last-chance', 'new',
'subtitles', 'rating', 'star-rating',
)
try:
if x.tag == 'episode-num':
system = x.get('system', '')
if system == 'onscreen':
return programme_order.index('episode-num')
elif system == 'xmltv_ns':
return programme_order.index('episode-num') + 0.1
elif system == 'tms':
return programme_order.index('episode-num') + 0.2
else:
return programme_order.index('episode-num') + 0.3 + ord(system[0])/1000
return programme_order.index(x.tag)
except ValueError:
# If tag is not in the order list, put it at the end
return len(programme_order)
def normalise_title(title):
"""
Normalise titles to help comparisons.
"""
if not isinstance(title, str):
log.warning('normalise_title: Expected string but got %s', type(title))
return ''
normalised = title.lower()
if normalised.startswith('the '):
normalised = normalised[4:]
normalised = re.sub('[^a-z ]', '', normalised)
normalised = re.sub(' +', ' ', normalised)
normalised = normalised.replace(' the ', ' ')
return normalised
CONTROL_ENTITY_MAP = {
'\n': '
',
'\r': '
',
'\t': '	',
}
def escape_text(text):
if text is None:
return ''
# First escape XML-reserved chars
escaped = saxutils.escape(text)
# Then escape control characters
return ''.join(CONTROL_ENTITY_MAP.get(c, c) for c in escaped)
def write_element(elem, level=0):
indent = '\t' * level
result = []
# Opening tag
attrs = ' '.join(f'{k}="{saxutils.escape(v)}"' for k, v in elem.attrib.items())
tag_open = f"{indent}<{elem.tag}"
if attrs:
tag_open += f" {attrs}"
children = list(elem)
if not children and (elem.text is None or not elem.text.strip()):
tag_open += " />"
result.append(tag_open)
return '\n'.join(result)
tag_open += ">"
result.append(tag_open)
# Text content
if elem.text and elem.text.strip():
result[-1] += escape_text(elem.text)
elif children:
result[-1] = result[-1].rstrip()
# Child elements
for child in children:
child_str = write_element(child, level + 1)
if child_str.strip():
result.append(child_str)
# Closing tag
if children:
result.append(f"{indent}</{elem.tag}>")
else:
if elem.text and elem.text.strip():
result[-1] += f"</{elem.tag}>"
else:
result.append(f"{indent}</{elem.tag}>")
# Tail
if elem.tail and elem.tail.strip():
result[-1] += escape_text(elem.tail)
return '\n'.join(result)
def serialize_tree(root: ET.Element) -> str:
# Add timestamp to root element with timezone
root.set('generated', time.strftime('%Y%m%d%H%M%S +0000'))
declaration = '<?xml version="1.0" encoding="utf-8"?>'
doctype = '<!DOCTYPE tv SYSTEM "xmltv.dtd">'
root_str = write_element(root)
return f"{declaration}\n{doctype}\n{root_str}"
def download(link, filelocation):
if not os.path.exists(os.path.dirname(filelocation.strip())):
log.info('Made Directory: "%s"', os.path.dirname(filelocation.strip()))
os.makedirs(os.path.dirname(filelocation.strip()))
r = requests.get(link.replace("http://thetvdb", "http://www.thetvdb"), stream=True)
with open(filelocation, 'wb') as f:
for chunk in r.iter_content(1024):
if chunk:
f.write(chunk)
def createNewDownloadThread(link, filelocation):
threads = []
t = threading.Thread(target=download, args=(link, filelocation))
threads.append(t)
t.start()
# cap the threads if over limit
while threading.active_count() >= threadcount:
threads = threading.active_count()
time.sleep(5)
#############################################################################
# ---------------------------------------------------------------------------
# Main loop
# ---------------------------------------------------------------------------
#############################################################################
if __name__ == '__main__':
parser = OptionParser(version='%prog ' + str(VERSION))
parser.set_defaults(debug=False)
parser.add_option('-d', '--debug', action='store_true',
help='output debugging information.')
parser.add_option('-v', '--verbose', action='store_true',
help='output verbose information.')
parser.add_option('-o', '--output', action='store', metavar='FILE',
help='set output file for enhanced XMLTV data.')
parser.add_option('--no-cache', action='store_true',
help='disable Redis caching and always fetch fresh data from TMDB.')
(options, args) = parser.parse_args()
if options.verbose:
log.setLevel(logging.INFO)
if options.debug:
log.setLevel(logging.DEBUG)
if options.output:
output_file = options.output
log.info('Using output file "%s" ', options.output)
else:
output_file = "enhanced-xmltv.xml"
# Set global no_cache flag
no_cache = options.no_cache
if no_cache:
log.info('Redis caching disabled - will always fetch fresh data from TMDB')
# What are we working with?
if sys.stdin.isatty():
if len(args) == 0:
log.critical('No input file to process.')
sys.exit(2)
try:
with open(args[0], 'r', encoding='utf-8') as f:
data = f.read()
except IOError:
log.critical('Could not open input file "%s"', args[0])
sys.exit(2)
else:
data = sys.stdin.read()
# Parse the XML
tree = ElementTree.XML(data)
processors = [
Subtitle(), # extract the show sub-title from the title
EpDesc(), # find season/episode in the description
HD(), # check the description for clues the show in in HD and flag accordingly
Movies(), # augment the guide data with info from TMDB
Series(),
Episodes(), # augment the guide data with info from TVDB
]
for processor in processors:
for programme in tree.findall('.//programme'):
try:
processor(programme)
except:
log.exception("Failed processing with processor: %s", processor)
try:
processor.post_process(tree)
except NotImplementedError:
pass
except:
log.exception("Failed post processing with processor: %s", processor)
for programme in tree.findall('.//programme'):
programme[:] = sorted(programme, key=compare_programme)
# Remove skip_processors attributes from all programmes
for programme in tree.findall('.//programme'):
if 'skip_processors' in programme.attrib:
del programme.attrib['skip_processors']
xml_string = serialize_tree(tree)
with open(output_file, "w", encoding="utf-8") as f:
f.write(xml_string)
# monitor for threads winding down
while threading.active_count() != 1:
threads = threading.active_count()
string = "Active download threads running - " + str(threads)
logging.debug(string)
time.sleep(5) |
Beta Was this translation helpful? Give feedback.
-
love the ide tvh-iptv-config does this already one would need a tmdb api key but its quite easy to get: |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
I’m using an M3U playlist with an accompanying XMLTV-format EPG, but the EPG data is lacking in several areas - specifically missing cast information, categories/genres, and poster artwork.
It would be valuable if M3U Editor could enrich existing EPG data by integrating additional metadata from external sources such as TMDB and TVDB. The goal would be to augment, rather than replace, the original XMLTV data - filling in missing fields where possible.
This would greatly enhance usability and presentation for users relying on third-party or incomplete EPG feeds.
Beta Was this translation helpful? Give feedback.
All reactions