|
| 1 | +import asyncio |
| 2 | +from typing import Any |
| 3 | + |
| 4 | +from loguru import logger |
| 5 | + |
| 6 | +from app.core.settings import UserSettings |
| 7 | +from app.models.taste_profile import TasteProfile |
| 8 | +from app.services.profile.scorer import ProfileScorer |
| 9 | +from app.services.recommendation.filtering import RecommendationFiltering |
| 10 | +from app.services.recommendation.metadata import RecommendationMetadata |
| 11 | +from app.services.recommendation.scoring import RecommendationScoring |
| 12 | +from app.services.recommendation.utils import ( |
| 13 | + content_type_to_mtype, |
| 14 | + filter_by_genres, |
| 15 | + filter_watched_by_imdb, |
| 16 | + resolve_tmdb_id, |
| 17 | +) |
| 18 | +from app.services.tmdb.service import TMDBService |
| 19 | + |
| 20 | +TOP_ITEMS_LIMIT = 10 |
| 21 | + |
| 22 | + |
| 23 | +class AllBasedService: |
| 24 | + """ |
| 25 | + Handles recommendations based on all loved or all liked items. |
| 26 | + """ |
| 27 | + |
| 28 | + def __init__(self, tmdb_service: TMDBService, user_settings: UserSettings | None = None): |
| 29 | + self.tmdb_service = tmdb_service |
| 30 | + self.user_settings = user_settings |
| 31 | + self.scorer = ProfileScorer() |
| 32 | + |
| 33 | + async def get_recommendations_from_all_items( |
| 34 | + self, |
| 35 | + library_items: dict[str, list[dict[str, Any]]], |
| 36 | + content_type: str, |
| 37 | + watched_tmdb: set[int], |
| 38 | + watched_imdb: set[str], |
| 39 | + whitelist: set[int] | None = None, |
| 40 | + limit: int = 20, |
| 41 | + item_type: str = "loved", # "loved" or "liked" |
| 42 | + profile: TasteProfile | None = None, |
| 43 | + ) -> list[dict[str, Any]]: |
| 44 | + """ |
| 45 | + Get recommendations based on all loved or liked items. |
| 46 | +
|
| 47 | + Strategy: |
| 48 | + 1. Get all loved/liked items for the content type |
| 49 | + 2. Fetch recommendations for each item (limit to top 10 items to avoid too many API calls) |
| 50 | + 3. Combine and deduplicate recommendations |
| 51 | + 4. Filter by genres and watched items |
| 52 | + 5. Return top N |
| 53 | +
|
| 54 | + Args: |
| 55 | + library_items: Library items dict |
| 56 | + content_type: Content type (movie/series) |
| 57 | + watched_tmdb: Set of watched TMDB IDs |
| 58 | + watched_imdb: Set of watched IMDB IDs |
| 59 | + whitelist: Genre whitelist |
| 60 | + limit: Number of items to return |
| 61 | + item_type: "loved" or "liked" |
| 62 | + profile: Optional profile for scoring (if None, uses popularity only) |
| 63 | +
|
| 64 | + Returns: |
| 65 | + List of recommended items |
| 66 | + """ |
| 67 | + # Get all loved or liked items for the content type |
| 68 | + items = library_items.get(item_type, []) |
| 69 | + typed_items = [it for it in items if it.get("type") == content_type] |
| 70 | + |
| 71 | + if not typed_items or len(typed_items) == 0: |
| 72 | + return [] |
| 73 | + |
| 74 | + # We'll process them in parallel |
| 75 | + top_items = typed_items[:TOP_ITEMS_LIMIT] |
| 76 | + |
| 77 | + mtype = content_type_to_mtype(content_type) |
| 78 | + |
| 79 | + # Fetch recommendations for each item in parallel |
| 80 | + all_candidates = {} |
| 81 | + tasks = [] |
| 82 | + |
| 83 | + for item in top_items: |
| 84 | + item_id = item.get("_id", "") |
| 85 | + if not item_id: |
| 86 | + continue |
| 87 | + |
| 88 | + # Resolve TMDB ID and fetch recommendations |
| 89 | + tasks.append(self._fetch_recommendations_for_item(item_id, mtype)) |
| 90 | + |
| 91 | + # Execute all in parallel |
| 92 | + results = await asyncio.gather(*tasks, return_exceptions=True) |
| 93 | + |
| 94 | + # Combine all recommendations (deduplicate by TMDB ID) |
| 95 | + for res in results: |
| 96 | + if isinstance(res, Exception): |
| 97 | + logger.debug(f"Error fetching recommendations: {res}") |
| 98 | + continue |
| 99 | + for candidate in res: |
| 100 | + candidate_id = candidate.get("id") |
| 101 | + if candidate_id: |
| 102 | + all_candidates[candidate_id] = candidate |
| 103 | + |
| 104 | + # Convert to list |
| 105 | + candidates = list(all_candidates.values()) |
| 106 | + |
| 107 | + # Filter by genres and watched items |
| 108 | + excluded_ids = RecommendationFiltering.get_excluded_genre_ids(self.user_settings, content_type) |
| 109 | + whitelist = whitelist or set() |
| 110 | + filtered = filter_by_genres(candidates, watched_tmdb, whitelist, excluded_ids) |
| 111 | + |
| 112 | + # Score with profile if available |
| 113 | + if profile: |
| 114 | + scored = [] |
| 115 | + for item in filtered: |
| 116 | + try: |
| 117 | + final_score = RecommendationScoring.calculate_final_score( |
| 118 | + item=item, |
| 119 | + profile=profile, |
| 120 | + scorer=self.scorer, |
| 121 | + mtype=mtype, |
| 122 | + is_ranked=False, |
| 123 | + is_fresh=False, |
| 124 | + ) |
| 125 | + |
| 126 | + # Apply genre multiplier (if whitelist available) |
| 127 | + genre_mult = RecommendationFiltering.get_genre_multiplier(item.get("genre_ids"), whitelist) |
| 128 | + final_score *= genre_mult |
| 129 | + |
| 130 | + scored.append((final_score, item)) |
| 131 | + except Exception as e: |
| 132 | + logger.debug(f"Failed to score item {item.get('id')}: {e}") |
| 133 | + continue |
| 134 | + |
| 135 | + # Sort by score |
| 136 | + scored.sort(key=lambda x: x[0], reverse=True) |
| 137 | + filtered = [item for _, item in scored] |
| 138 | + |
| 139 | + # Enrich metadata |
| 140 | + enriched = await RecommendationMetadata.fetch_batch( |
| 141 | + self.tmdb_service, filtered, content_type, user_settings=self.user_settings |
| 142 | + ) |
| 143 | + |
| 144 | + # Final filter (remove watched by IMDB ID) |
| 145 | + final = filter_watched_by_imdb(enriched, watched_imdb) |
| 146 | + |
| 147 | + # Return top N |
| 148 | + return final[:limit] |
| 149 | + |
| 150 | + async def _fetch_recommendations_for_item(self, item_id: str, mtype: str) -> list[dict[str, Any]]: |
| 151 | + """ |
| 152 | + Fetch recommendations for a single item. |
| 153 | +
|
| 154 | + Args: |
| 155 | + item_id: Item ID (tt... or tmdb:...) |
| 156 | + mtype: Media type (movie/tv) |
| 157 | +
|
| 158 | + Returns: |
| 159 | + List of candidate items |
| 160 | + """ |
| 161 | + # Resolve TMDB ID |
| 162 | + tmdb_id = await resolve_tmdb_id(item_id, self.tmdb_service) |
| 163 | + if not tmdb_id: |
| 164 | + return [] |
| 165 | + |
| 166 | + combined = {} |
| 167 | + |
| 168 | + # Fetch 1 page each for recommendations |
| 169 | + try: |
| 170 | + res = await self.tmdb_service.get_recommendations(tmdb_id, mtype, page=1) |
| 171 | + for item in res.get("results", []): |
| 172 | + candidate_id = item.get("id") |
| 173 | + if candidate_id: |
| 174 | + combined[candidate_id] = item |
| 175 | + except Exception as e: |
| 176 | + logger.debug(f"Error fetching recommendations for {tmdb_id}: {e}") |
| 177 | + |
| 178 | + return list(combined.values()) |
0 commit comments