From a9f5475233a59a921a5d85980ac8a245b8fa333e Mon Sep 17 00:00:00 2001 From: Mike Chester Date: Mon, 29 Dec 2025 19:45:24 -0800 Subject: [PATCH 1/2] feat: add Plex Watchlist to Overseerr sync MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add automatic syncing of users' Plex Watchlists to Overseerr requests. Features: - Per-user sync settings with toggle on user dashboard - Background polling job for periodic sync (configurable interval) - Distributed lock for multi-pod safety - Cumulative tracking of items synced and requests created - Recent sync history shown in user card - Admin controls for global enable/disable and sync interval - Sync statistics in admin dashboard Technical details: - New models: WatchlistSyncSettings, WatchlistSyncHistory, WatchlistSyncLock - Stores Plex auth token during login for watchlist API access - Uses Plex discover API to fetch user watchlist - Submits requests to Overseerr via existing API integration - Isolated Node.js instrumentation to avoid Edge Runtime issues 🤖 Generated with [Claude Code](https://claude.com/claude-code) --- actions/admin/watchlist.ts | 179 ++++++++ actions/watchlist.ts | 257 ++++++++++++ app/admin/settings/page.tsx | 4 + .../settings/watchlist-sync-settings.tsx | 282 +++++++++++++ components/dashboard/user-dashboard.tsx | 9 + components/watchlist/sync-history-table.tsx | 190 +++++++++ components/watchlist/sync-settings-card.tsx | 362 ++++++++++++++++ instrumentation.ts | 115 +---- lib/auth.ts | 2 + lib/connections/overseerr.ts | 140 +++++++ lib/instrumentation/node.ts | 185 ++++++++ lib/validations/watchlist.ts | 75 ++++ lib/watchlist/lock.ts | 395 ++++++++++++++++++ lib/watchlist/plex-watchlist.ts | 190 +++++++++ lib/watchlist/sync-service.ts | 363 ++++++++++++++++ .../migration.sql | 82 ++++ .../migration.sql | 3 + prisma/schema.prisma | 123 ++++-- 18 files changed, 2818 insertions(+), 138 deletions(-) create mode 100644 actions/admin/watchlist.ts create mode 100644 actions/watchlist.ts create mode 100644 components/admin/settings/watchlist-sync-settings.tsx create mode 100644 components/watchlist/sync-history-table.tsx create mode 100644 components/watchlist/sync-settings-card.tsx create mode 100644 lib/instrumentation/node.ts create mode 100644 lib/validations/watchlist.ts create mode 100644 lib/watchlist/lock.ts create mode 100644 lib/watchlist/plex-watchlist.ts create mode 100644 lib/watchlist/sync-service.ts create mode 100644 prisma/migrations/20251230031502_add_watchlist_sync/migration.sql create mode 100644 prisma/migrations/20251230034346_add_watchlist_cumulative_counters/migration.sql diff --git a/actions/admin/watchlist.ts b/actions/admin/watchlist.ts new file mode 100644 index 00000000..fda32568 --- /dev/null +++ b/actions/admin/watchlist.ts @@ -0,0 +1,179 @@ +"use server" + +import { authOptions } from "@/lib/auth" +import { prisma } from "@/lib/prisma" +import { requireAdmin } from "@/lib/admin" +import { globalWatchlistSyncSettingsSchema } from "@/lib/validations/watchlist" +import { syncUserWatchlist } from "@/lib/watchlist/sync-service" +import { getServerSession } from "next-auth" + +/** + * Get global watchlist sync settings (admin only) + */ +export async function getGlobalWatchlistSyncSettings() { + const isAdmin = await requireAdmin() + if (!isAdmin) { + return { success: false as const, error: "Unauthorized" } + } + + try { + const config = await prisma.config.findUnique({ + where: { id: "config" }, + select: { + watchlistSyncEnabled: true, + watchlistSyncIntervalMinutes: true, + }, + }) + + return { + success: true as const, + data: { + watchlistSyncEnabled: config?.watchlistSyncEnabled ?? false, + watchlistSyncIntervalMinutes: config?.watchlistSyncIntervalMinutes ?? 60, + }, + } + } catch (error) { + console.error("Error fetching global watchlist sync settings:", error) + return { success: false as const, error: "Failed to fetch settings" } + } +} + +/** + * Update global watchlist sync settings (admin only) + */ +export async function updateGlobalWatchlistSyncSettings(data: unknown) { + const session = await getServerSession(authOptions) + const isAdmin = await requireAdmin() + if (!isAdmin) { + return { success: false as const, error: "Unauthorized" } + } + + // Validate input + const validated = globalWatchlistSyncSettingsSchema.safeParse(data) + if (!validated.success) { + return { success: false as const, error: "Invalid input" } + } + + try { + // Check if Overseerr is configured before enabling + if (validated.data.watchlistSyncEnabled) { + const overseerr = await prisma.overseerr.findFirst({ + where: { isActive: true }, + }) + + if (!overseerr) { + return { + success: false as const, + error: "Cannot enable watchlist sync without an active Overseerr server", + } + } + } + + await prisma.config.upsert({ + where: { id: "config" }, + create: { + id: "config", + watchlistSyncEnabled: validated.data.watchlistSyncEnabled, + watchlistSyncIntervalMinutes: validated.data.watchlistSyncIntervalMinutes, + updatedBy: session?.user?.id, + }, + update: { + watchlistSyncEnabled: validated.data.watchlistSyncEnabled, + watchlistSyncIntervalMinutes: validated.data.watchlistSyncIntervalMinutes, + updatedBy: session?.user?.id, + }, + }) + + return { success: true as const } + } catch (error) { + console.error("Error updating global watchlist sync settings:", error) + return { success: false as const, error: "Failed to update settings" } + } +} + +/** + * Get watchlist sync statistics (admin only) + */ +export async function getWatchlistSyncStats() { + const isAdmin = await requireAdmin() + if (!isAdmin) { + return { success: false as const, error: "Unauthorized" } + } + + try { + const [usersWithSyncEnabled, totalItemsSynced, totalItemsRequested, recentHistory] = + await Promise.all([ + prisma.watchlistSyncSettings.count({ + where: { syncEnabled: true }, + }), + prisma.watchlistSyncHistory.count(), + prisma.watchlistSyncHistory.count({ + where: { status: "REQUESTED" }, + }), + prisma.watchlistSyncHistory.findMany({ + orderBy: { syncedAt: "desc" }, + take: 10, + select: { + id: true, + title: true, + mediaType: true, + status: true, + syncedAt: true, + user: { + select: { name: true, email: true }, + }, + }, + }), + ]) + + return { + success: true as const, + data: { + usersWithSyncEnabled, + totalItemsSynced, + totalItemsRequested, + recentHistory, + }, + } + } catch (error) { + console.error("Error fetching watchlist sync stats:", error) + return { success: false as const, error: "Failed to fetch stats" } + } +} + +/** + * Force sync for a specific user (admin only) + */ +export async function forceUserWatchlistSync(userId: string) { + const isAdmin = await requireAdmin() + if (!isAdmin) { + return { success: false as const, error: "Unauthorized" } + } + + try { + // Verify user exists + const user = await prisma.user.findUnique({ + where: { id: userId }, + select: { id: true }, + }) + + if (!user) { + return { success: false as const, error: "User not found" } + } + + // Perform sync + const result = await syncUserWatchlist(userId) + + if (!result.success) { + return { success: false as const, error: result.error || "Sync failed" } + } + + return { + success: true as const, + data: result.data, + } + } catch (error) { + console.error("Error forcing user watchlist sync:", error) + return { success: false as const, error: "Failed to sync" } + } +} diff --git a/actions/watchlist.ts b/actions/watchlist.ts new file mode 100644 index 00000000..5fc7ca6f --- /dev/null +++ b/actions/watchlist.ts @@ -0,0 +1,257 @@ +"use server" + +import { WatchlistSyncStatus } from "@/lib/generated/prisma/client" +import { prisma } from "@/lib/prisma" +import { updateWatchlistSyncSettingsSchema } from "@/lib/validations/watchlist" +import { syncUserWatchlist } from "@/lib/watchlist/sync-service" +import { getServerSession } from "next-auth" +import { authOptions } from "@/lib/auth" + +/** + * Get current user's watchlist sync settings and status + */ +export async function getWatchlistSyncSettings() { + const session = await getServerSession(authOptions) + if (!session?.user?.id) { + return { success: false as const, error: "Not authenticated" } + } + + try { + // Get user's Plex auth token status and sync settings + const user = await prisma.user.findUnique({ + where: { id: session.user.id }, + select: { + plexAuthToken: true, + watchlistSyncSettings: true, + }, + }) + + // Check if Overseerr is configured + const overseerr = await prisma.overseerr.findFirst({ + where: { isActive: true }, + select: { id: true }, + }) + + // Check if global sync is enabled + const config = await prisma.config.findUnique({ + where: { id: "config" }, + select: { watchlistSyncEnabled: true }, + }) + + // Get recent sync history if sync is enabled + let recentHistory: Array<{ + id: string + title: string + year: number | null + mediaType: string + status: string + syncedAt: Date + }> = [] + + if (user?.watchlistSyncSettings?.syncEnabled) { + recentHistory = await prisma.watchlistSyncHistory.findMany({ + where: { userId: session.user.id }, + orderBy: { syncedAt: "desc" }, + take: 5, + select: { + id: true, + title: true, + year: true, + mediaType: true, + status: true, + syncedAt: true, + }, + }) + } + + return { + success: true as const, + data: { + hasPlexToken: !!user?.plexAuthToken, + hasOverseerr: !!overseerr, + globalSyncEnabled: config?.watchlistSyncEnabled ?? false, + settings: user?.watchlistSyncSettings + ? { + syncEnabled: user.watchlistSyncSettings.syncEnabled, + lastSyncAt: user.watchlistSyncSettings.lastSyncAt, + lastSyncStatus: user.watchlistSyncSettings.lastSyncStatus, + lastSyncError: user.watchlistSyncSettings.lastSyncError, + itemsSynced: user.watchlistSyncSettings.itemsSynced, + itemsRequested: user.watchlistSyncSettings.itemsRequested, + totalItemsSynced: user.watchlistSyncSettings.totalItemsSynced, + totalItemsRequested: user.watchlistSyncSettings.totalItemsRequested, + } + : null, + recentHistory, + }, + } + } catch (error) { + console.error("Error fetching watchlist sync settings:", error) + return { success: false as const, error: "Failed to fetch settings" } + } +} + +/** + * Update user's watchlist sync settings + */ +export async function updateWatchlistSyncSettings(data: unknown) { + const session = await getServerSession(authOptions) + if (!session?.user?.id) { + return { success: false as const, error: "Not authenticated" } + } + + // Validate input + const validated = updateWatchlistSyncSettingsSchema.safeParse(data) + if (!validated.success) { + return { success: false as const, error: "Invalid input" } + } + + try { + // Check prerequisites if enabling + if (validated.data.syncEnabled) { + const user = await prisma.user.findUnique({ + where: { id: session.user.id }, + select: { plexAuthToken: true }, + }) + + if (!user?.plexAuthToken) { + return { + success: false as const, + error: "Please log in with Plex to enable watchlist sync", + } + } + + const overseerr = await prisma.overseerr.findFirst({ + where: { isActive: true }, + }) + + if (!overseerr) { + return { + success: false as const, + error: "Overseerr is not configured. Please contact an administrator.", + } + } + + const config = await prisma.config.findUnique({ + where: { id: "config" }, + select: { watchlistSyncEnabled: true }, + }) + + if (!config?.watchlistSyncEnabled) { + return { + success: false as const, + error: "Watchlist sync is not enabled globally. Please contact an administrator.", + } + } + } + + // Upsert settings + await prisma.watchlistSyncSettings.upsert({ + where: { userId: session.user.id }, + create: { + userId: session.user.id, + syncEnabled: validated.data.syncEnabled, + }, + update: { + syncEnabled: validated.data.syncEnabled, + }, + }) + + return { success: true as const } + } catch (error) { + console.error("Error updating watchlist sync settings:", error) + return { success: false as const, error: "Failed to update settings" } + } +} + +/** + * Trigger a manual sync for the current user + */ +export async function triggerWatchlistSync() { + const session = await getServerSession(authOptions) + if (!session?.user?.id) { + return { success: false as const, error: "Not authenticated" } + } + + try { + // Check if sync is enabled for user + const settings = await prisma.watchlistSyncSettings.findUnique({ + where: { userId: session.user.id }, + }) + + if (!settings?.syncEnabled) { + return { success: false as const, error: "Watchlist sync is not enabled" } + } + + // Perform sync + const result = await syncUserWatchlist(session.user.id) + + if (!result.success) { + return { success: false as const, error: result.error || "Sync failed" } + } + + return { + success: true as const, + data: result.data, + } + } catch (error) { + console.error("Error triggering watchlist sync:", error) + return { success: false as const, error: "Failed to trigger sync" } + } +} + +/** + * Get user's watchlist sync history + */ +export async function getWatchlistSyncHistory(options?: { + limit?: number + offset?: number + status?: WatchlistSyncStatus +}) { + const session = await getServerSession(authOptions) + if (!session?.user?.id) { + return { success: false as const, error: "Not authenticated" } + } + + const limit = options?.limit ?? 20 + const offset = options?.offset ?? 0 + + try { + const where = { + userId: session.user.id, + ...(options?.status ? { status: options.status } : {}), + } + + const [items, total] = await Promise.all([ + prisma.watchlistSyncHistory.findMany({ + where, + orderBy: { syncedAt: "desc" }, + take: limit, + skip: offset, + select: { + id: true, + title: true, + year: true, + mediaType: true, + status: true, + syncedAt: true, + requestedAt: true, + overseerrRequestId: true, + }, + }), + prisma.watchlistSyncHistory.count({ where }), + ]) + + return { + success: true as const, + data: { + items, + total, + hasMore: offset + items.length < total, + }, + } + } catch (error) { + console.error("Error fetching watchlist sync history:", error) + return { success: false as const, error: "Failed to fetch history" } + } +} diff --git a/app/admin/settings/page.tsx b/app/admin/settings/page.tsx index a1924bc0..ec4243a8 100644 --- a/app/admin/settings/page.tsx +++ b/app/admin/settings/page.tsx @@ -1,5 +1,6 @@ import { getAdminSettings } from "@/actions/admin" import { DiscordIntegrationForm, LLMProviderForm, LLMToggle, ServerForm } from "@/components/admin/settings/settings-edit-forms" +import { WatchlistSyncSettings } from "@/components/admin/settings/watchlist-sync-settings" import { WrappedSettingsForm } from "@/components/admin/settings/wrapped-settings-form" import { getBaseUrl } from "@/lib/utils" @@ -355,6 +356,9 @@ export default async function SettingsPage() { + {/* Watchlist Sync Settings */} + + {/* System Information */}
diff --git a/components/admin/settings/watchlist-sync-settings.tsx b/components/admin/settings/watchlist-sync-settings.tsx new file mode 100644 index 00000000..84d326d5 --- /dev/null +++ b/components/admin/settings/watchlist-sync-settings.tsx @@ -0,0 +1,282 @@ +"use client" + +import { + getGlobalWatchlistSyncSettings, + getWatchlistSyncStats, + updateGlobalWatchlistSyncSettings, +} from "@/actions/admin/watchlist" +import { useToast } from "@/components/ui/toast" +import { useCallback, useEffect, useState } from "react" + +interface GlobalSettings { + watchlistSyncEnabled: boolean + watchlistSyncIntervalMinutes: number +} + +interface SyncStats { + usersWithSyncEnabled: number + totalItemsSynced: number + totalItemsRequested: number + recentHistory: Array<{ + id: string + title: string + mediaType: string + status: string + syncedAt: Date + user: { name: string | null; email: string | null } + }> +} + +export function WatchlistSyncSettings() { + const toast = useToast() + const [settings, setSettings] = useState(null) + const [stats, setStats] = useState(null) + const [isLoading, setIsLoading] = useState(true) + const [isSaving, setIsSaving] = useState(false) + const [intervalInput, setIntervalInput] = useState("") + + const loadData = useCallback(async () => { + const [settingsResult, statsResult] = await Promise.all([ + getGlobalWatchlistSyncSettings(), + getWatchlistSyncStats(), + ]) + + if (settingsResult.success && settingsResult.data) { + setSettings(settingsResult.data) + setIntervalInput(settingsResult.data.watchlistSyncIntervalMinutes.toString()) + } + + if (statsResult.success && statsResult.data) { + setStats(statsResult.data) + } + + setIsLoading(false) + }, []) + + useEffect(() => { + loadData() + }, [loadData]) + + const handleToggleEnabled = async () => { + if (!settings) return + + setIsSaving(true) + try { + const result = await updateGlobalWatchlistSyncSettings({ + watchlistSyncEnabled: !settings.watchlistSyncEnabled, + watchlistSyncIntervalMinutes: settings.watchlistSyncIntervalMinutes, + }) + + if (result.success) { + setSettings({ + ...settings, + watchlistSyncEnabled: !settings.watchlistSyncEnabled, + }) + toast.showSuccess( + settings.watchlistSyncEnabled + ? "Watchlist sync disabled" + : "Watchlist sync enabled" + ) + } else { + toast.showError(result.error || "Failed to update settings") + } + } finally { + setIsSaving(false) + } + } + + const handleIntervalChange = async () => { + if (!settings) return + + const interval = parseInt(intervalInput, 10) + if (isNaN(interval) || interval < 15 || interval > 1440) { + toast.showError("Interval must be between 15 and 1440 minutes") + return + } + + setIsSaving(true) + try { + const result = await updateGlobalWatchlistSyncSettings({ + watchlistSyncEnabled: settings.watchlistSyncEnabled, + watchlistSyncIntervalMinutes: interval, + }) + + if (result.success) { + setSettings({ + ...settings, + watchlistSyncIntervalMinutes: interval, + }) + toast.showSuccess("Sync interval updated") + } else { + toast.showError(result.error || "Failed to update settings") + } + } finally { + setIsSaving(false) + } + } + + if (isLoading) { + return ( +
+
+
+
+
+
+
+ ) + } + + if (!settings) { + return null + } + + return ( +
+ {/* Header */} +
+
+

Watchlist Sync

+

+ Automatically sync users' Plex watchlists to Overseerr requests. + Users must enable sync individually in their settings. +

+
+
+ + {/* Enable/Disable Toggle */} +
+
+ + + {settings.watchlistSyncEnabled ? "Enabled" : "Disabled"} + + {isSaving && ( + + + + + )} +
+ + {/* Interval Setting */} + {settings.watchlistSyncEnabled && ( +
+ +
+ setIntervalInput(e.target.value)} + className="w-24 rounded-lg border border-slate-600 bg-slate-700 px-3 py-2 text-sm text-white focus:border-cyan-500 focus:outline-none focus:ring-1 focus:ring-cyan-500" + data-testid="watchlist-sync-interval-input" + /> + +
+

+ How often to check for new items in users' watchlists (15-1440 minutes) +

+
+ )} + + {/* Statistics */} + {stats && settings.watchlistSyncEnabled && ( +
+

Statistics

+
+
+
{stats.usersWithSyncEnabled}
+
Users with sync enabled
+
+
+
{stats.totalItemsSynced}
+
Total items synced
+
+
+
{stats.totalItemsRequested}
+
Requests created
+
+
+ + {/* Recent Activity */} + {stats.recentHistory.length > 0 && ( +
+

Recent Activity

+
+ {stats.recentHistory.slice(0, 5).map((item) => ( +
+
+ {item.title} + + by {item.user.name || item.user.email || "Unknown"} + +
+ + {item.status.replace(/_/g, " ")} + +
+ ))} +
+
+ )} +
+ )} + + {/* Warning when disabled */} + {!settings.watchlistSyncEnabled && ( +
+

+ Watchlist sync is disabled. Users will not be able to enable sync in their settings. +

+
+ )} +
+
+ ) +} diff --git a/components/dashboard/user-dashboard.tsx b/components/dashboard/user-dashboard.tsx index 8078453d..9707a1a0 100644 --- a/components/dashboard/user-dashboard.tsx +++ b/components/dashboard/user-dashboard.tsx @@ -9,6 +9,7 @@ import { RequestsCard } from "@/components/dashboard/requests-card" import { StatusFooter } from "@/components/dashboard/status-background" import { WrappedCard } from "@/components/dashboard/wrapped-card" import type { DashboardDiscordConnection } from "@/components/discord/link-callout" +import { WatchlistSyncCard } from "@/components/watchlist/sync-settings-card" import { signOut } from "next-auth/react" import Link from "next/link" import { useRouter } from "next/navigation" @@ -117,6 +118,14 @@ export function UserDashboard({ )}
+ + {/* Features section */} +
+

Features

+
+ +
+
diff --git a/components/watchlist/sync-history-table.tsx b/components/watchlist/sync-history-table.tsx new file mode 100644 index 00000000..9b019b92 --- /dev/null +++ b/components/watchlist/sync-history-table.tsx @@ -0,0 +1,190 @@ +"use client" + +import { getWatchlistSyncHistory } from "@/actions/watchlist" +import { WatchlistSyncStatus } from "@/lib/generated/prisma/client" +import { useCallback, useEffect, useState } from "react" + +interface SyncHistoryItem { + id: string + title: string + year: number | null + mediaType: string + status: WatchlistSyncStatus + syncedAt: Date + requestedAt: Date | null + overseerrRequestId: number | null +} + +interface SyncHistoryData { + items: SyncHistoryItem[] + total: number + hasMore: boolean +} + +interface SyncHistoryTableProps { + limit?: number +} + +export function SyncHistoryTable({ limit = 10 }: SyncHistoryTableProps) { + const [data, setData] = useState(null) + const [isLoading, setIsLoading] = useState(true) + const [offset, setOffset] = useState(0) + + const loadHistory = useCallback(async () => { + setIsLoading(true) + const result = await getWatchlistSyncHistory({ limit, offset }) + if (result.success && result.data) { + setData(result.data) + } + setIsLoading(false) + }, [limit, offset]) + + useEffect(() => { + loadHistory() + }, [loadHistory]) + + const getStatusBadge = (status: WatchlistSyncStatus) => { + const styles: Record = { + SYNCED: { bg: "bg-slate-700/50", text: "text-slate-300", label: "Synced" }, + REQUESTED: { bg: "bg-green-900/50", text: "text-green-300", label: "Requested" }, + ALREADY_AVAILABLE: { bg: "bg-cyan-900/50", text: "text-cyan-300", label: "Available" }, + ALREADY_REQUESTED: { bg: "bg-amber-900/50", text: "text-amber-300", label: "Pending" }, + FAILED: { bg: "bg-red-900/50", text: "text-red-300", label: "Failed" }, + REMOVED_FROM_WATCHLIST: { bg: "bg-slate-800/50", text: "text-slate-500", label: "Removed" }, + } + const style = styles[status] + return ( + + {style.label} + + ) + } + + const getMediaTypeIcon = (mediaType: string) => { + if (mediaType === "MOVIE") { + return ( + + + + ) + } + return ( + + + + ) + } + + const formatDate = (date: Date) => { + const d = new Date(date) + return d.toLocaleDateString(undefined, { month: "short", day: "numeric", year: "numeric" }) + } + + if (isLoading) { + return ( +
+
+ {[...Array(3)].map((_, i) => ( +
+
+
+
+
+
+ ))} +
+
+ ) + } + + if (!data || data.items.length === 0) { + return ( +
+

No sync history yet

+

+ Items from your Plex watchlist will appear here once synced +

+
+ ) + } + + return ( +
+
+ + + + + + + + + + + {data.items.map((item) => ( + + + + + + + ))} + +
+ Title + + Type + + Status + + Synced +
+
+ {item.title} + {item.year && ({item.year})} +
+
+
+ {getMediaTypeIcon(item.mediaType)} + + {item.mediaType === "MOVIE" ? "Movie" : "TV Show"} + +
+
+ {getStatusBadge(item.status)} + + {formatDate(item.syncedAt)} +
+
+ + {/* Pagination */} + {(data.hasMore || offset > 0) && ( +
+ + + Showing {offset + 1}-{Math.min(offset + data.items.length, data.total)} of {data.total} + + +
+ )} +
+ ) +} diff --git a/components/watchlist/sync-settings-card.tsx b/components/watchlist/sync-settings-card.tsx new file mode 100644 index 00000000..4bb949d0 --- /dev/null +++ b/components/watchlist/sync-settings-card.tsx @@ -0,0 +1,362 @@ +"use client" + +import { getWatchlistSyncSettings, triggerWatchlistSync, updateWatchlistSyncSettings } from "@/actions/watchlist" +import { useToast } from "@/components/ui/toast" +import { motion } from "framer-motion" +import { useCallback, useEffect, useState, useTransition } from "react" + +interface RecentHistoryItem { + id: string + title: string + year: number | null + mediaType: string + status: string + syncedAt: Date +} + +interface SyncSettings { + hasPlexToken: boolean + hasOverseerr: boolean + globalSyncEnabled: boolean + settings: { + syncEnabled: boolean + lastSyncAt: Date | null + lastSyncStatus: string | null + lastSyncError: string | null + itemsSynced: number + itemsRequested: number + totalItemsSynced: number + totalItemsRequested: number + } | null + recentHistory: RecentHistoryItem[] +} + +export function WatchlistSyncCard() { + const { showSuccess, showError, showInfo } = useToast() + const [isPending, startTransition] = useTransition() + const [isSyncing, setIsSyncing] = useState(false) + const [settings, setSettings] = useState(null) + const [isLoading, setIsLoading] = useState(true) + + const loadSettings = useCallback(async () => { + const result = await getWatchlistSyncSettings() + if (result.success && result.data) { + setSettings(result.data) + } + setIsLoading(false) + }, []) + + useEffect(() => { + loadSettings() + }, [loadSettings]) + + const handleToggleSync = () => { + if (!settings) return + + const newEnabled = !settings.settings?.syncEnabled + + startTransition(async () => { + const result = await updateWatchlistSyncSettings({ syncEnabled: newEnabled }) + if (result.success) { + showSuccess(newEnabled ? "Watchlist sync enabled" : "Watchlist sync disabled") + await loadSettings() + } else { + showError(result.error || "Failed to update settings") + } + }) + } + + const handleManualSync = async () => { + setIsSyncing(true) + try { + const result = await triggerWatchlistSync() + if (result.success && result.data) { + const { itemsSynced, itemsRequested, itemsFailed } = result.data + if (itemsRequested > 0) { + showSuccess(`Synced ${itemsSynced} items, requested ${itemsRequested} new items`) + } else if (itemsSynced > 0) { + showInfo(`Synced ${itemsSynced} items (all already available or requested)`) + } else { + showInfo("Watchlist is up to date") + } + if (itemsFailed > 0) { + showError(`${itemsFailed} items failed to sync`) + } + await loadSettings() + } else { + showError(result.error || "Sync failed") + } + } finally { + setIsSyncing(false) + } + } + + // Not configured - don't show card + if (!isLoading && settings && !settings.hasOverseerr) { + return null + } + + // Loading state + if (isLoading) { + return ( + +
+
+
+
+ + ) + } + + if (!settings) { + return null + } + + const canEnable = settings.hasPlexToken && settings.hasOverseerr && settings.globalSyncEnabled + const isEnabled = settings.settings?.syncEnabled ?? false + const lastSync = settings.settings?.lastSyncAt + const lastStatus = settings.settings?.lastSyncStatus + + // Format last sync time + const formatLastSync = (date: Date | null | undefined) => { + if (!date) return "Never" + const d = new Date(date) + const now = new Date() + const diffMs = now.getTime() - d.getTime() + const diffMins = Math.floor(diffMs / 60000) + const diffHours = Math.floor(diffMs / 3600000) + const diffDays = Math.floor(diffMs / 86400000) + + if (diffMins < 1) return "Just now" + if (diffMins < 60) return `${diffMins}m ago` + if (diffHours < 24) return `${diffHours}h ago` + return `${diffDays}d ago` + } + + const getStatusColor = (status: string | null) => { + switch (status) { + case "success": + return "text-green-400" + case "partial": + return "text-amber-400" + case "failed": + return "text-red-400" + default: + return "text-slate-400" + } + } + + return ( + + {/* Ambient glow */} +
+ +
+ {/* Header */} +
+
+ {/* Watchlist Icon */} +
+ + + +
+
+

Watchlist Sync

+

+ Auto-request items from your Plex watchlist +

+
+
+ + {/* Toggle */} + +
+ + {/* Prerequisites warning */} + {!canEnable && ( +
+ {!settings.hasPlexToken && ( +

Log in with Plex to enable watchlist sync

+ )} + {settings.hasPlexToken && !settings.globalSyncEnabled && ( +

Watchlist sync is disabled by your administrator

+ )} + {settings.hasPlexToken && settings.globalSyncEnabled && !settings.hasOverseerr && ( +

Overseerr is not configured

+ )} +
+ )} + + {/* Status when enabled */} + {isEnabled && ( +
+ {/* Stats row */} +
+
+ + Last sync: {formatLastSync(lastSync)} + + {lastStatus && ( + + {lastStatus === "success" && "Success"} + {lastStatus === "partial" && "Partial"} + {lastStatus === "failed" && "Failed"} + + )} +
+
+ + {/* Cumulative stats */} + {settings.settings && (settings.settings.totalItemsSynced > 0 || settings.settings.totalItemsRequested > 0) && ( +
+ + {settings.settings.totalItemsSynced} items synced + + + {settings.settings.totalItemsRequested} requested + +
+ )} + + {/* Error message */} + {settings.settings?.lastSyncError && ( +

+ {settings.settings.lastSyncError} +

+ )} + + {/* Recent sync history */} + {settings.recentHistory && settings.recentHistory.length > 0 && ( +
+

Recent items

+
+ {settings.recentHistory.map((item) => ( +
+
+ + {item.mediaType === "MOVIE" ? "🎬" : "📺"} + + + {item.title} + {item.year && ({item.year})} + +
+ + {item.status === "REQUESTED" && "Requested"} + {item.status === "ALREADY_AVAILABLE" && "Available"} + {item.status === "ALREADY_REQUESTED" && "Pending"} + {item.status === "FAILED" && "Failed"} + {item.status === "SYNCED" && "Synced"} + +
+ ))} +
+
+ )} + + {/* Manual sync button */} + +
+ )} +
+ + ) +} diff --git a/instrumentation.ts b/instrumentation.ts index 26b1525a..1d7cdc2e 100644 --- a/instrumentation.ts +++ b/instrumentation.ts @@ -9,115 +9,8 @@ export async function register() { return } - // Check if bot should attempt to start (can be disabled via env var for manual control) - const envBotEnabled = process.env.ENABLE_DISCORD_BOT !== "false" - - if (!envBotEnabled) { - if (process.env.NODE_ENV === "development") { - console.log("Discord bot disabled via ENABLE_DISCORD_BOT=false") - } - return - } - - // Check database setting - if bot is disabled there, don't start polling - try { - const { isDiscordBotEnabled } = await import("./lib/discord/lock") - const botEnabled = await isDiscordBotEnabled() - if (!botEnabled) { - if (process.env.NODE_ENV === "development") { - console.log("Discord bot disabled in database settings") - } - return - } - } catch (error) { - // If we can't check the database, proceed anyway (database might not be ready yet) - // The polling loop will check the setting periodically - if (process.env.NODE_ENV === "development") { - console.warn("Could not check Discord bot enabled status:", error) - } - } - - // Use dynamic import with a string to prevent Next.js from analyzing the dependency tree - // This ensures Discord.js and its native dependencies aren't bundled - try { - const { startDiscordBotLockPolling, stopDiscordBotLockPolling, releaseDiscordBotLock } = await import("./lib/discord/lock") - const botModule = await import("./lib/discord/bot") - const bot = botModule.getDiscordBot() - - // Poll interval in milliseconds (default: 60 seconds) - const pollIntervalMs = parseInt(process.env.DISCORD_BOT_POLL_INTERVAL_MS || "60000", 10) - - // Store bot instance for cleanup - let botInstance: any = null - - // Start background polling - this doesn't block server startup - // The bot will initialize automatically when the lock is acquired - startDiscordBotLockPolling( - // onLockAcquired - called when we successfully acquire the lock - async () => { - try { - await bot.initialize() - botInstance = bot - console.log("Discord bot initialized successfully (holding distributed lock)") - } catch (error) { - console.error("Failed to initialize Discord bot:", error) - // Release lock if initialization fails - await releaseDiscordBotLock() - } - }, - // onLockLost - called if we lose the lock (e.g., another instance took it) - async () => { - try { - console.log("Discord bot lock lost - shutting down bot") - if (botInstance) { - await botInstance.destroy() - botInstance = null - } - } catch (error) { - console.error("Error shutting down bot after lock loss:", error) - } - }, - pollIntervalMs - ) - - console.log(`Discord bot lock polling started (checking every ${pollIntervalMs / 1000} seconds)`) - - // Graceful shutdown handlers - only register if we're in Node.js runtime - // Skip in test environments (Playwright) to avoid interference - if (process.env.NODE_ENV !== "test") { - const shutdown = async () => { - try { - await stopDiscordBotLockPolling() - // Bot instance will be destroyed by stopDiscordBotLockPolling if it exists - // But also try to destroy it here as a fallback - try { - await bot.destroy() - } catch { - // Ignore errors if bot wasn't initialized - } - } catch (error) { - console.error("Error during Discord bot shutdown:", error) - } - } - - // Use dynamic property access to avoid Next.js static analysis issues with Edge Runtime - // process.on is already verified to exist by the top-level check - // Using a helper function to make it harder for Next.js to statically analyze - const registerSignalHandler = (signal: string) => { - const onMethod = (process as any)["on"] - if (typeof onMethod === "function") { - onMethod.call(process, signal, shutdown) - } - } - registerSignalHandler("SIGINT") - registerSignalHandler("SIGTERM") - } - } catch (error) { - // Silently fail if Discord.js can't be loaded (e.g., missing native dependencies) - // This allows the app to start even if the bot can't be initialized - if (process.env.NODE_ENV === "development") { - console.warn("Discord bot module could not be loaded:", error) - } - } + // Dynamically import Node.js-specific instrumentation + // This prevents Edge Runtime from analyzing Node.js-only dependencies + const { startNodeInstrumentation } = await import("./lib/instrumentation/node") + await startNodeInstrumentation() } - diff --git a/lib/auth.ts b/lib/auth.ts index 69a23dfc..077ff9ce 100644 --- a/lib/auth.ts +++ b/lib/auth.ts @@ -158,6 +158,7 @@ export const authOptions: NextAuthOptions = { dbUser = await prisma.user.create({ data: { plexUserId: plexUser.id, + plexAuthToken: authToken, // Store for watchlist sync name: plexUser.username, email: plexUser.email, image: plexUser.thumb, @@ -181,6 +182,7 @@ export const authOptions: NextAuthOptions = { dbUser = await prisma.user.update({ where: { id: dbUser.id }, data: { + plexAuthToken: authToken, // Update token on each login for watchlist sync name: plexUser.username, email: plexUser.email, image: plexUser.thumb, diff --git a/lib/connections/overseerr.ts b/lib/connections/overseerr.ts index be11d1f1..427a301c 100644 --- a/lib/connections/overseerr.ts +++ b/lib/connections/overseerr.ts @@ -231,3 +231,143 @@ export async function batchGetOverseerrMediaStatus( return results } + +// Overseerr media status codes +export const OverseerrMediaStatus = { + UNKNOWN: 1, + PENDING: 2, + PROCESSING: 3, + PARTIALLY_AVAILABLE: 4, + AVAILABLE: 5, +} as const + +export interface SubmitOverseerrRequestPayload { + mediaType: "movie" | "tv" + mediaId: number // TMDB ID + seasons?: number[] // For TV shows, which seasons to request + is4k?: boolean +} + +export interface SubmitOverseerrRequestResult { + success: boolean + requestId?: number + error?: string + status: "created" | "already_requested" | "already_available" | "failed" +} + +/** + * Submit a media request to Overseerr + * @param config Overseerr configuration + * @param payload Request details (mediaType, mediaId, optional seasons) + */ +export async function submitOverseerrRequest( + config: OverseerrParsed, + payload: SubmitOverseerrRequestPayload +): Promise { + try { + // First check if media already exists/is requested + const existingStatus = await getOverseerrMediaByTmdbId(config, payload.mediaId, payload.mediaType) + + if (existingStatus.success && existingStatus.data) { + const status = existingStatus.data.status + + // Check if already available + if (status === OverseerrMediaStatus.AVAILABLE || status === OverseerrMediaStatus.PARTIALLY_AVAILABLE) { + logger.debug("Media already available in Overseerr", { + tmdbId: payload.mediaId, + mediaType: payload.mediaType, + status, + }) + return { + success: true, + status: "already_available", + } + } + + // Check if already requested (pending or processing) + if (status === OverseerrMediaStatus.PENDING || status === OverseerrMediaStatus.PROCESSING) { + logger.debug("Media already requested in Overseerr", { + tmdbId: payload.mediaId, + mediaType: payload.mediaType, + status, + }) + return { + success: true, + requestId: existingStatus.data.requests?.[0]?.id, + status: "already_requested", + } + } + } + + // Submit the request + const url = `${config.url}/api/v1/request` + const body: Record = { + mediaType: payload.mediaType, + mediaId: payload.mediaId, + } + + if (payload.seasons && payload.seasons.length > 0) { + body.seasons = payload.seasons + } + + if (payload.is4k !== undefined) { + body.is4k = payload.is4k + } + + const response = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + "X-Api-Key": config.apiKey, + }, + body: JSON.stringify(body), + }) + + if (!response.ok) { + const errorData = await response.json().catch(() => ({})) + const errorMessage = errorData.message || response.statusText + + // Check for specific error cases + if (response.status === 409 || errorMessage.includes("already")) { + return { + success: true, + status: "already_requested", + } + } + + logger.error("Overseerr request submission failed", undefined, { + status: response.status, + error: errorMessage, + tmdbId: payload.mediaId, + mediaType: payload.mediaType, + }) + + return { + success: false, + error: errorMessage, + status: "failed", + } + } + + const data = await response.json() + + logger.info("Overseerr request submitted successfully", { + requestId: data.id, + tmdbId: payload.mediaId, + mediaType: payload.mediaType, + }) + + return { + success: true, + requestId: data.id, + status: "created", + } + } catch (error) { + logger.error("Error submitting Overseerr request", error) + return { + success: false, + error: error instanceof Error ? error.message : "Unknown error", + status: "failed", + } + } +} diff --git a/lib/instrumentation/node.ts b/lib/instrumentation/node.ts new file mode 100644 index 00000000..9e891c2b --- /dev/null +++ b/lib/instrumentation/node.ts @@ -0,0 +1,185 @@ +/** + * Node.js runtime instrumentation - starts background jobs + * This file should only be imported dynamically from instrumentation.ts + * to prevent Edge Runtime from analyzing Node.js-only dependencies + */ + +export async function startNodeInstrumentation() { + // Start watchlist sync polling + await startWatchlistSyncPolling() + + // Start Discord bot polling + await startDiscordBotPolling() +} + +async function startWatchlistSyncPolling() { + // Check if watchlist sync should attempt to start + const envSyncEnabled = process.env.ENABLE_WATCHLIST_SYNC !== "false" + + if (!envSyncEnabled) { + if (process.env.NODE_ENV === "development") { + console.log("Watchlist sync disabled via ENABLE_WATCHLIST_SYNC=false") + } + return + } + + try { + const { isWatchlistSyncEnabled, startWatchlistSyncPolling: startPolling, stopWatchlistSyncPolling } = await import("@/lib/watchlist/lock") + const { syncAllEnabledUsers } = await import("@/lib/watchlist/sync-service") + + // Check database setting + const syncEnabled = await isWatchlistSyncEnabled() + if (!syncEnabled) { + if (process.env.NODE_ENV === "development") { + console.log("Watchlist sync disabled in database settings") + } + return + } + + // Poll interval in milliseconds (default: 60 seconds) + const pollIntervalMs = parseInt(process.env.WATCHLIST_SYNC_POLL_INTERVAL_MS || "60000", 10) + + // Start background polling + startPolling( + // onLockAcquired - called when we successfully acquire the lock + async () => { + try { + console.log("Watchlist sync lock acquired - running sync") + const result = await syncAllEnabledUsers() + console.log(`Watchlist sync completed: ${result.usersProcessed} users processed, ${result.usersSucceeded} succeeded, ${result.usersFailed} failed`) + } catch (error) { + console.error("Failed to run watchlist sync:", error) + } + }, + // onLockLost - called if we lose the lock + async () => { + console.log("Watchlist sync lock lost") + }, + pollIntervalMs + ) + + console.log(`Watchlist sync polling started (checking every ${pollIntervalMs / 1000} seconds)`) + + // Graceful shutdown handlers + if (process.env.NODE_ENV !== "test") { + const shutdown = async () => { + try { + await stopWatchlistSyncPolling() + } catch (error) { + console.error("Error during watchlist sync shutdown:", error) + } + } + + process.on("SIGINT", shutdown) + process.on("SIGTERM", shutdown) + } + } catch (error) { + if (process.env.NODE_ENV === "development") { + console.warn("Watchlist sync module could not be loaded:", error) + } + } +} + +async function startDiscordBotPolling() { + // Check if bot should attempt to start (can be disabled via env var for manual control) + const envBotEnabled = process.env.ENABLE_DISCORD_BOT !== "false" + + if (!envBotEnabled) { + if (process.env.NODE_ENV === "development") { + console.log("Discord bot disabled via ENABLE_DISCORD_BOT=false") + } + return + } + + // Check database setting - if bot is disabled there, don't start polling + try { + const { isDiscordBotEnabled } = await import("@/lib/discord/lock") + const botEnabled = await isDiscordBotEnabled() + if (!botEnabled) { + if (process.env.NODE_ENV === "development") { + console.log("Discord bot disabled in database settings") + } + return + } + } catch (error) { + // If we can't check the database, proceed anyway (database might not be ready yet) + // The polling loop will check the setting periodically + if (process.env.NODE_ENV === "development") { + console.warn("Could not check Discord bot enabled status:", error) + } + } + + // Use dynamic import with a string to prevent Next.js from analyzing the dependency tree + // This ensures Discord.js and its native dependencies aren't bundled + try { + const { startDiscordBotLockPolling, stopDiscordBotLockPolling, releaseDiscordBotLock } = await import("@/lib/discord/lock") + const botModule = await import("@/lib/discord/bot") + const bot = botModule.getDiscordBot() + + // Poll interval in milliseconds (default: 60 seconds) + const pollIntervalMs = parseInt(process.env.DISCORD_BOT_POLL_INTERVAL_MS || "60000", 10) + + // Store bot instance for cleanup + let botInstance: ReturnType | null = null + + // Start background polling - this doesn't block server startup + // The bot will initialize automatically when the lock is acquired + startDiscordBotLockPolling( + // onLockAcquired - called when we successfully acquire the lock + async () => { + try { + await bot.initialize() + botInstance = bot + console.log("Discord bot initialized successfully (holding distributed lock)") + } catch (error) { + console.error("Failed to initialize Discord bot:", error) + // Release lock if initialization fails + await releaseDiscordBotLock() + } + }, + // onLockLost - called if we lose the lock (e.g., another instance took it) + async () => { + try { + console.log("Discord bot lock lost - shutting down bot") + if (botInstance) { + await botInstance.destroy() + botInstance = null + } + } catch (error) { + console.error("Error shutting down bot after lock loss:", error) + } + }, + pollIntervalMs + ) + + console.log(`Discord bot lock polling started (checking every ${pollIntervalMs / 1000} seconds)`) + + // Graceful shutdown handlers - only register if we're in Node.js runtime + // Skip in test environments (Playwright) to avoid interference + if (process.env.NODE_ENV !== "test") { + const shutdown = async () => { + try { + await stopDiscordBotLockPolling() + // Bot instance will be destroyed by stopDiscordBotLockPolling if it exists + // But also try to destroy it here as a fallback + try { + await bot.destroy() + } catch { + // Ignore errors if bot wasn't initialized + } + } catch (error) { + console.error("Error during Discord bot shutdown:", error) + } + } + + process.on("SIGINT", shutdown) + process.on("SIGTERM", shutdown) + } + } catch (error) { + // Silently fail if Discord.js can't be loaded (e.g., missing native dependencies) + // This allows the app to start even if the bot can't be initialized + if (process.env.NODE_ENV === "development") { + console.warn("Discord bot module could not be loaded:", error) + } + } +} diff --git a/lib/validations/watchlist.ts b/lib/validations/watchlist.ts new file mode 100644 index 00000000..a6f33152 --- /dev/null +++ b/lib/validations/watchlist.ts @@ -0,0 +1,75 @@ +import { z } from "zod" + +/** + * Schema for a watchlist item from Plex API + */ +export const watchlistItemSchema = z.object({ + ratingKey: z.string(), + guid: z.string(), + type: z.enum(["movie", "show"]), + title: z.string(), + year: z.number().optional(), + // External IDs parsed from Guid array + tmdbId: z.number().optional(), + tvdbId: z.number().optional(), + imdbId: z.string().optional(), +}) + +export type WatchlistItem = z.infer + +/** + * Schema for updating user watchlist sync settings + */ +export const updateWatchlistSyncSettingsSchema = z.object({ + syncEnabled: z.boolean(), +}) + +export type UpdateWatchlistSyncSettings = z.infer + +/** + * Schema for global watchlist sync settings (admin) + */ +export const globalWatchlistSyncSettingsSchema = z.object({ + watchlistSyncEnabled: z.boolean(), + watchlistSyncIntervalMinutes: z.number().min(15).max(1440), // 15 min to 24 hours +}) + +export type GlobalWatchlistSyncSettings = z.infer + +/** + * Schema for sync result + */ +export const syncResultSchema = z.object({ + success: z.boolean(), + itemsSynced: z.number(), + itemsRequested: z.number(), + itemsSkipped: z.number(), + itemsFailed: z.number(), + errors: z.array(z.string()).optional(), +}) + +export type SyncResult = z.infer + +/** + * Schema for Overseerr request payload + */ +export const overseerrRequestPayloadSchema = z.object({ + mediaType: z.enum(["movie", "tv"]), + mediaId: z.number(), // TMDB ID + seasons: z.array(z.number()).optional(), // For TV shows + is4k: z.boolean().optional(), +}) + +export type OverseerrRequestPayload = z.infer + +/** + * Schema for Overseerr request result + */ +export const overseerrRequestResultSchema = z.object({ + success: z.boolean(), + requestId: z.number().optional(), + error: z.string().optional(), + status: z.enum(["created", "already_requested", "already_available", "failed"]), +}) + +export type OverseerrRequestResult = z.infer diff --git a/lib/watchlist/lock.ts b/lib/watchlist/lock.ts new file mode 100644 index 00000000..f49d84e9 --- /dev/null +++ b/lib/watchlist/lock.ts @@ -0,0 +1,395 @@ +import { prisma } from "@/lib/prisma" +import { createLogger } from "@/lib/utils/logger" +import { randomBytes } from "crypto" + +const logger = createLogger("WATCHLIST_SYNC_LOCK") + +/** + * Checks if watchlist sync is enabled globally in the database + */ +export async function isWatchlistSyncEnabled(): Promise { + try { + const config = await prisma.config.findUnique({ + where: { id: "config" }, + select: { watchlistSyncEnabled: true }, + }) + return config?.watchlistSyncEnabled ?? false + } catch (error) { + logger.debug("Error checking if watchlist sync is enabled", { error }) + return false + } +} + +/** + * Gets the global sync interval in minutes + */ +export async function getWatchlistSyncInterval(): Promise { + try { + const config = await prisma.config.findUnique({ + where: { id: "config" }, + select: { watchlistSyncIntervalMinutes: true }, + }) + return config?.watchlistSyncIntervalMinutes ?? 60 + } catch (error) { + logger.debug("Error getting watchlist sync interval", { error }) + return 60 + } +} + +// Lock lease duration in milliseconds (30 seconds) +const LOCK_LEASE_DURATION_MS = 30 * 1000 + +// How often to renew the lock (every 10 seconds) +const LOCK_RENEWAL_INTERVAL_MS = 10 * 1000 + +// Generate a unique instance ID for this pod/process +const INSTANCE_ID = `${process.env.HOSTNAME || "unknown"}-${process.pid}-${randomBytes(4).toString("hex")}` + +interface LockState { + isHeld: boolean + instanceId: string | null + renewalInterval?: NodeJS.Timeout + releaseLock?: () => Promise +} + +let lockState: LockState = { + isHeld: false, + instanceId: null, +} + +/** + * Attempts to acquire a distributed lock for watchlist sync using a database lease table. + * Only one instance across all pods can hold this lock at a time. + * Uses PostgreSQL's row-level locking for atomic operations. + */ +export async function acquireWatchlistSyncLock(): Promise { + if (lockState.isHeld && lockState.instanceId === INSTANCE_ID) { + logger.debug("Lock already held by this instance") + return true + } + + try { + const now = new Date() + const expiresAt = new Date(now.getTime() + LOCK_LEASE_DURATION_MS) + + // Use a transaction with row-level locking to atomically acquire the lock + const lockRecord = await prisma.$transaction(async (tx) => { + // First, clean up expired locks + await tx.watchlistSyncLock.deleteMany({ + where: { + expiresAt: { + lt: now, + }, + }, + }) + + // Try to acquire the lock + const existing = await tx.watchlistSyncLock.findUnique({ + where: { id: "watchlist-sync" }, + }) + + if (!existing) { + // No lock exists, create one + return await tx.watchlistSyncLock.create({ + data: { + id: "watchlist-sync", + instanceId: INSTANCE_ID, + acquiredAt: now, + expiresAt, + lastRenewedAt: now, + }, + }) + } + + // Lock exists - check if it's expired or if we own it + if (existing.expiresAt < now || existing.instanceId === INSTANCE_ID) { + // Lock is expired or we already own it - update it + return await tx.watchlistSyncLock.update({ + where: { id: "watchlist-sync" }, + data: { + instanceId: INSTANCE_ID, + expiresAt, + lastRenewedAt: now, + updatedAt: now, + }, + }) + } + + // Lock is held by another instance + return null + }) + + if (lockRecord && lockRecord.instanceId === INSTANCE_ID && lockRecord.expiresAt > now) { + logger.debug("Watchlist sync lock acquired successfully", { instanceId: INSTANCE_ID }) + lockState.isHeld = true + lockState.instanceId = INSTANCE_ID + + // Set up automatic lock renewal + lockState.renewalInterval = setInterval(async () => { + try { + const renewed = await renewWatchlistSyncLock() + if (!renewed) { + logger.debug("Failed to renew lock - another instance may have taken it") + lockState.isHeld = false + lockState.instanceId = null + if (lockState.renewalInterval) { + clearInterval(lockState.renewalInterval) + lockState.renewalInterval = undefined + } + } + } catch (error) { + logger.debug("Error renewing lock", { error }) + lockState.isHeld = false + lockState.instanceId = null + if (lockState.renewalInterval) { + clearInterval(lockState.renewalInterval) + lockState.renewalInterval = undefined + } + } + }, LOCK_RENEWAL_INTERVAL_MS) + + // Set up cleanup function + lockState.releaseLock = async () => { + await releaseWatchlistSyncLock() + } + + return true + } else { + logger.debug("Watchlist sync lock not available", { + currentInstanceId: lockRecord?.instanceId, + expiresAt: lockRecord?.expiresAt, + }) + return false + } + } catch (error) { + logger.debug("Error acquiring watchlist sync lock", { error }) + return false + } +} + +/** + * Renews the watchlist sync lock lease + */ +async function renewWatchlistSyncLock(): Promise { + if (!lockState.isHeld || lockState.instanceId !== INSTANCE_ID) { + return false + } + + try { + const now = new Date() + const expiresAt = new Date(now.getTime() + LOCK_LEASE_DURATION_MS) + + const result = await prisma.watchlistSyncLock.updateMany({ + where: { + id: "watchlist-sync", + instanceId: INSTANCE_ID, + expiresAt: { + gt: now, // Only renew if not expired + }, + }, + data: { + expiresAt, + lastRenewedAt: now, + updatedAt: now, + }, + }) + + if (result.count > 0) { + logger.debug("Lock renewed successfully") + return true + } else { + logger.debug("Lock renewal failed - lock may have been taken by another instance") + return false + } + } catch (error) { + logger.debug("Error renewing watchlist sync lock", { error }) + return false + } +} + +/** + * Releases the watchlist sync lock + */ +export async function releaseWatchlistSyncLock(): Promise { + if (!lockState.isHeld || lockState.instanceId !== INSTANCE_ID) { + return + } + + try { + // Clear renewal interval + if (lockState.renewalInterval) { + clearInterval(lockState.renewalInterval) + lockState.renewalInterval = undefined + } + + // Delete the lock record if we still own it + await prisma.watchlistSyncLock.deleteMany({ + where: { + id: "watchlist-sync", + instanceId: INSTANCE_ID, + }, + }) + + lockState.isHeld = false + lockState.instanceId = null + lockState.releaseLock = undefined + + logger.debug("Watchlist sync lock released successfully") + } catch (error) { + logger.debug("Error releasing watchlist sync lock", { error }) + // Reset state even if release fails + lockState.isHeld = false + lockState.instanceId = null + lockState.releaseLock = undefined + } +} + +/** + * Checks if this instance currently holds the lock + */ +export function hasWatchlistSyncLock(): boolean { + return lockState.isHeld && lockState.instanceId === INSTANCE_ID +} + +// Polling state for background lock acquisition +interface PollingState { + isPolling: boolean + pollingInterval?: NodeJS.Timeout + syncInitialized: boolean + onLockAcquired?: () => Promise + onLockLost?: () => Promise +} + +let pollingState: PollingState = { + isPolling: false, + syncInitialized: false, +} + +/** + * Starts background polling to acquire the watchlist sync lock + * Tries every minute until the lock is acquired + * + * @param onLockAcquired - Callback when lock is successfully acquired + * @param onLockLost - Callback when lock is lost (optional) + * @param pollIntervalMs - How often to poll (default: 60 seconds) + */ +export async function startWatchlistSyncPolling( + onLockAcquired: () => Promise, + onLockLost?: () => Promise, + pollIntervalMs: number = 60 * 1000 +): Promise { + if (pollingState.isPolling) { + logger.debug("Lock polling already started") + return + } + + pollingState.isPolling = true + pollingState.onLockAcquired = onLockAcquired + pollingState.onLockLost = onLockLost + + logger.debug(`Starting watchlist sync lock polling (every ${pollIntervalMs / 1000} seconds)`) + + // Check if sync is enabled before trying to acquire lock + const syncEnabled = await isWatchlistSyncEnabled() + if (!syncEnabled) { + logger.debug("Watchlist sync disabled in database - not attempting to acquire lock") + return + } + + // Try immediately on startup + const immediateAcquired = await acquireWatchlistSyncLock() + if (immediateAcquired) { + logger.debug("Lock acquired immediately on startup") + pollingState.syncInitialized = true + await onLockAcquired() + } else { + logger.debug("Lock not available on startup, will poll periodically") + } + + // Set up periodic polling + pollingState.pollingInterval = setInterval(async () => { + if (!pollingState.isPolling) { + return + } + + try { + // Check if sync is enabled in database + const syncEnabled = await isWatchlistSyncEnabled() + + if (!syncEnabled) { + // Sync is disabled - shut down if running + if (pollingState.syncInitialized) { + logger.debug("Watchlist sync disabled in database - shutting down sync") + pollingState.syncInitialized = false + if (onLockLost) { + await onLockLost() + } + await releaseWatchlistSyncLock() + } + return + } + + // Check if we still hold the lock if sync is initialized + if (pollingState.syncInitialized) { + const stillHoldsLock = hasWatchlistSyncLock() + if (!stillHoldsLock) { + // We lost the lock + logger.debug("Lock lost during polling - shutting down sync") + pollingState.syncInitialized = false + if (onLockLost) { + await onLockLost() + } + return + } + // Lock renewal is handled by the renewal interval in acquireWatchlistSyncLock + return + } + + // Sync not initialized but enabled - try to acquire the lock + const acquired = await acquireWatchlistSyncLock() + + if (acquired) { + logger.debug("Lock acquired during polling - initializing sync") + pollingState.syncInitialized = true + await onLockAcquired() + } + } catch (error) { + logger.debug("Error during lock polling", { error }) + } + }, pollIntervalMs) +} + +/** + * Stops the background lock polling + */ +export async function stopWatchlistSyncPolling(): Promise { + if (!pollingState.isPolling) { + return + } + + logger.debug("Stopping watchlist sync lock polling") + pollingState.isPolling = false + + if (pollingState.pollingInterval) { + clearInterval(pollingState.pollingInterval) + pollingState.pollingInterval = undefined + } + + // Call onLockLost if sync was initialized (to clean up) + if (pollingState.syncInitialized && pollingState.onLockLost) { + try { + await pollingState.onLockLost() + } catch (error) { + logger.debug("Error during lock lost callback on stop", { error }) + } + } + + // Release lock if we hold it + if (pollingState.syncInitialized) { + await releaseWatchlistSyncLock() + pollingState.syncInitialized = false + } + + pollingState.onLockAcquired = undefined + pollingState.onLockLost = undefined +} diff --git a/lib/watchlist/plex-watchlist.ts b/lib/watchlist/plex-watchlist.ts new file mode 100644 index 00000000..cd91e70d --- /dev/null +++ b/lib/watchlist/plex-watchlist.ts @@ -0,0 +1,190 @@ +/** + * Plex Watchlist API client + * + * Fetches user's watchlist from the Plex discover API + * API Endpoint: https://discover.provider.plex.tv/library/sections/watchlist/all + */ + +import { getClientIdentifier } from "@/lib/connections/plex-core" +import { createLogger } from "@/lib/utils/logger" +import { WatchlistItem } from "@/lib/validations/watchlist" + +const logger = createLogger("PLEX_WATCHLIST") + +const PLEX_DISCOVER_URL = "https://discover.provider.plex.tv" + +interface PlexWatchlistResponse { + MediaContainer?: { + size?: number + Metadata?: PlexWatchlistMetadata[] + } +} + +interface PlexWatchlistMetadata { + ratingKey: string + key: string + guid: string + type: "movie" | "show" + title: string + year?: number + Guid?: PlexGuid[] +} + +interface PlexGuid { + id: string // e.g., "tmdb://12345", "tvdb://12345", "imdb://tt12345" +} + +/** + * Parse external IDs from Plex GUID array + * @param guids Array of Plex GUIDs with format like "tmdb://12345" + */ +export function parseExternalIds(guids: PlexGuid[] | undefined): { + tmdbId?: number + tvdbId?: number + imdbId?: string +} { + const result: { tmdbId?: number; tvdbId?: number; imdbId?: string } = {} + + if (!guids || !Array.isArray(guids)) { + return result + } + + for (const guid of guids) { + if (!guid.id) continue + + if (guid.id.startsWith("tmdb://")) { + const id = parseInt(guid.id.replace("tmdb://", ""), 10) + if (!isNaN(id)) { + result.tmdbId = id + } + } else if (guid.id.startsWith("tvdb://")) { + const id = parseInt(guid.id.replace("tvdb://", ""), 10) + if (!isNaN(id)) { + result.tvdbId = id + } + } else if (guid.id.startsWith("imdb://")) { + result.imdbId = guid.id.replace("imdb://", "") + } + } + + return result +} + +/** + * Parse a single GUID string to extract external ID + * @param guid GUID string like "plex://movie/5d776833880197001ec939fa" + */ +export function parseWatchlistGuid(guid: string): { + type: "plex" | "tmdb" | "tvdb" | "imdb" | "unknown" + id: string +} { + if (guid.startsWith("plex://")) { + // Format: plex://movie/5d776833880197001ec939fa + const parts = guid.split("/") + return { type: "plex", id: parts[parts.length - 1] } + } else if (guid.startsWith("tmdb://")) { + return { type: "tmdb", id: guid.replace("tmdb://", "") } + } else if (guid.startsWith("tvdb://")) { + return { type: "tvdb", id: guid.replace("tvdb://", "") } + } else if (guid.startsWith("imdb://")) { + return { type: "imdb", id: guid.replace("imdb://", "") } + } + + return { type: "unknown", id: guid } +} + +/** + * Get standard Plex headers for API requests + */ +function getPlexHeaders(userToken: string): Record { + return { + Accept: "application/json", + "X-Plex-Token": userToken, + "X-Plex-Client-Identifier": getClientIdentifier(), + "X-Plex-Product": "Plex Wrapped", + "X-Plex-Version": "1.0.0", + "X-Plex-Platform": "Web", + } +} + +export interface GetWatchlistResult { + success: boolean + data?: WatchlistItem[] + error?: string +} + +/** + * Fetch user's watchlist from Plex discover API + * @param userToken User's Plex auth token + */ +export async function getPlexWatchlist(userToken: string): Promise { + try { + logger.debug("Fetching Plex watchlist") + + // Fetch watchlist from discover API + const url = `${PLEX_DISCOVER_URL}/library/sections/watchlist/all` + const response = await fetch(url, { + method: "GET", + headers: getPlexHeaders(userToken), + }) + + if (!response.ok) { + if (response.status === 401) { + logger.warn("Plex watchlist fetch failed - unauthorized (token may be expired)") + return { success: false, error: "Plex token is invalid or expired" } + } + logger.error("Plex watchlist fetch failed", undefined, { + status: response.status, + statusText: response.statusText, + }) + return { success: false, error: `Failed to fetch watchlist: ${response.statusText}` } + } + + const data: PlexWatchlistResponse = await response.json() + + if (!data.MediaContainer?.Metadata) { + logger.debug("Watchlist is empty") + return { success: true, data: [] } + } + + const items: WatchlistItem[] = data.MediaContainer.Metadata.map((item) => { + const externalIds = parseExternalIds(item.Guid) + + return { + ratingKey: item.ratingKey, + guid: item.guid, + type: item.type, + title: item.title, + year: item.year, + tmdbId: externalIds.tmdbId, + tvdbId: externalIds.tvdbId, + imdbId: externalIds.imdbId, + } + }) + + logger.debug("Fetched watchlist items", { count: items.length }) + return { success: true, data: items } + } catch (error) { + logger.error("Error fetching Plex watchlist", error) + return { success: false, error: "Failed to fetch watchlist" } + } +} + +/** + * Validate a user's Plex token is still valid + * @param userToken User's Plex auth token + */ +export async function validatePlexToken(userToken: string): Promise { + try { + const url = "https://plex.tv/api/v2/user" + const response = await fetch(url, { + method: "GET", + headers: getPlexHeaders(userToken), + }) + + return response.ok + } catch (error) { + logger.debug("Error validating Plex token", { error }) + return false + } +} diff --git a/lib/watchlist/sync-service.ts b/lib/watchlist/sync-service.ts new file mode 100644 index 00000000..5daedce5 --- /dev/null +++ b/lib/watchlist/sync-service.ts @@ -0,0 +1,363 @@ +/** + * Watchlist Sync Service + * + * Core orchestration for syncing Plex watchlists to Overseerr requests + */ + +import { submitOverseerrRequest } from "@/lib/connections/overseerr" +import { MediaType, WatchlistSyncStatus } from "@/lib/generated/prisma/client" +import { prisma } from "@/lib/prisma" +import { createLogger } from "@/lib/utils/logger" +import { SyncResult } from "@/lib/validations/watchlist" +import { getPlexWatchlist, validatePlexToken } from "./plex-watchlist" + +const logger = createLogger("WATCHLIST_SYNC") + +export interface SyncUserWatchlistResult { + success: boolean + data?: SyncResult + error?: string +} + +/** + * Sync a single user's watchlist + * @param userId Database user ID + */ +export async function syncUserWatchlist(userId: string): Promise { + const startTime = Date.now() + + try { + // Get user with their Plex token and sync settings + const user = await prisma.user.findUnique({ + where: { id: userId }, + select: { + id: true, + plexAuthToken: true, + email: true, + watchlistSyncSettings: true, + }, + }) + + if (!user) { + return { success: false, error: "User not found" } + } + + if (!user.plexAuthToken) { + logger.warn("User has no Plex auth token", { userId }) + await updateSyncSettings(userId, { + lastSyncStatus: "failed", + lastSyncError: "No Plex auth token - please log in again", + }) + return { success: false, error: "No Plex auth token" } + } + + // Validate token is still valid + const tokenValid = await validatePlexToken(user.plexAuthToken) + if (!tokenValid) { + logger.warn("Plex token is invalid or expired", { userId }) + await updateSyncSettings(userId, { + lastSyncStatus: "failed", + lastSyncError: "Plex token expired - please log in again", + }) + return { success: false, error: "Plex token is invalid or expired" } + } + + // Get Overseerr configuration + const overseerr = await prisma.overseerr.findFirst({ + where: { isActive: true }, + }) + + if (!overseerr) { + logger.warn("No active Overseerr configured", { userId }) + await updateSyncSettings(userId, { + lastSyncStatus: "failed", + lastSyncError: "No Overseerr server configured", + }) + return { success: false, error: "No Overseerr server configured" } + } + + // Fetch watchlist from Plex + const watchlistResult = await getPlexWatchlist(user.plexAuthToken) + if (!watchlistResult.success || !watchlistResult.data) { + const error = watchlistResult.error || "Failed to fetch watchlist" + logger.error("Failed to fetch Plex watchlist", undefined, { userId, error }) + await updateSyncSettings(userId, { + lastSyncStatus: "failed", + lastSyncError: error, + }) + return { success: false, error } + } + + const watchlistItems = watchlistResult.data + logger.info("Fetched Plex watchlist", { userId, itemCount: watchlistItems.length }) + + // Get existing sync history for comparison + const existingHistory = await prisma.watchlistSyncHistory.findMany({ + where: { userId }, + select: { plexRatingKey: true, status: true }, + }) + const existingKeys = new Map(existingHistory.map((h) => [h.plexRatingKey, h.status])) + + // Process each watchlist item + let itemsSynced = 0 + let itemsRequested = 0 + let itemsSkipped = 0 + let itemsFailed = 0 + const errors: string[] = [] + + for (const item of watchlistItems) { + try { + // Skip if already processed and in a terminal state + const existingStatus = existingKeys.get(item.ratingKey) + if ( + existingStatus && + (existingStatus === WatchlistSyncStatus.REQUESTED || + existingStatus === WatchlistSyncStatus.ALREADY_AVAILABLE || + existingStatus === WatchlistSyncStatus.ALREADY_REQUESTED) + ) { + itemsSkipped++ + continue + } + + // Need TMDB ID to request in Overseerr + if (!item.tmdbId) { + logger.debug("Skipping item without TMDB ID", { + title: item.title, + ratingKey: item.ratingKey, + }) + itemsSkipped++ + continue + } + + // Submit request to Overseerr + const mediaType = item.type === "movie" ? "movie" : "tv" + const requestResult = await submitOverseerrRequest( + { name: overseerr.name, url: overseerr.url, apiKey: overseerr.apiKey }, + { mediaType, mediaId: item.tmdbId } + ) + + // Map Prisma MediaType + const prismaMediaType = item.type === "movie" ? MediaType.MOVIE : MediaType.TV_SERIES + + // Determine status based on result + let syncStatus: WatchlistSyncStatus + let requestedAt: Date | null = null + + if (requestResult.status === "created") { + syncStatus = WatchlistSyncStatus.REQUESTED + requestedAt = new Date() + itemsRequested++ + } else if (requestResult.status === "already_available") { + syncStatus = WatchlistSyncStatus.ALREADY_AVAILABLE + } else if (requestResult.status === "already_requested") { + syncStatus = WatchlistSyncStatus.ALREADY_REQUESTED + } else { + syncStatus = WatchlistSyncStatus.FAILED + itemsFailed++ + if (requestResult.error) { + errors.push(`${item.title}: ${requestResult.error}`) + } + } + + // Upsert sync history record + await prisma.watchlistSyncHistory.upsert({ + where: { + userId_plexRatingKey: { + userId, + plexRatingKey: item.ratingKey, + }, + }, + create: { + userId, + plexRatingKey: item.ratingKey, + guid: item.guid, + mediaType: prismaMediaType, + title: item.title, + year: item.year, + tmdbId: item.tmdbId, + tvdbId: item.tvdbId, + imdbId: item.imdbId, + status: syncStatus, + requestedAt, + overseerrRequestId: requestResult.requestId, + }, + update: { + status: syncStatus, + requestedAt: requestedAt || undefined, + overseerrRequestId: requestResult.requestId || undefined, + }, + }) + + itemsSynced++ + } catch (error) { + logger.error("Error processing watchlist item", error, { + title: item.title, + userId, + }) + itemsFailed++ + errors.push(`${item.title}: Processing error`) + } + } + + // Update sync settings with results + const syncStatus = itemsFailed > 0 ? (itemsSynced > 0 ? "partial" : "failed") : "success" + await updateSyncSettings(userId, { + lastSyncAt: new Date(), + lastSyncStatus: syncStatus, + lastSyncError: errors.length > 0 ? errors.slice(0, 3).join("; ") : null, + itemsSynced, + itemsRequested, + }) + + const duration = Date.now() - startTime + logger.info("Watchlist sync completed", { + userId, + itemsSynced, + itemsRequested, + itemsSkipped, + itemsFailed, + durationMs: duration, + }) + + return { + success: true, + data: { + success: true, + itemsSynced, + itemsRequested, + itemsSkipped, + itemsFailed, + errors: errors.length > 0 ? errors : undefined, + }, + } + } catch (error) { + logger.error("Watchlist sync failed", error, { userId }) + await updateSyncSettings(userId, { + lastSyncStatus: "failed", + lastSyncError: error instanceof Error ? error.message : "Unknown error", + }) + return { + success: false, + error: error instanceof Error ? error.message : "Sync failed", + } + } +} + +/** + * Helper to update sync settings + */ +async function updateSyncSettings( + userId: string, + data: { + lastSyncAt?: Date + lastSyncStatus?: string + lastSyncError?: string | null + itemsSynced?: number + itemsRequested?: number + } +): Promise { + // Get current settings to calculate cumulative totals + const current = await prisma.watchlistSyncSettings.findUnique({ + where: { userId }, + select: { totalItemsSynced: true, totalItemsRequested: true }, + }) + + const incrementSynced = data.itemsSynced ?? 0 + const incrementRequested = data.itemsRequested ?? 0 + + await prisma.watchlistSyncSettings.upsert({ + where: { userId }, + create: { + userId, + syncEnabled: false, + ...data, + totalItemsSynced: incrementSynced, + totalItemsRequested: incrementRequested, + }, + update: { + ...data, + totalItemsSynced: (current?.totalItemsSynced ?? 0) + incrementSynced, + totalItemsRequested: (current?.totalItemsRequested ?? 0) + incrementRequested, + }, + }) +} + +/** + * Sync all users who have sync enabled and are due for a sync + * Called by the background job + */ +export async function syncAllEnabledUsers(): Promise<{ + usersProcessed: number + usersSucceeded: number + usersFailed: number +}> { + const startTime = Date.now() + + try { + // Get global sync interval + const config = await prisma.config.findUnique({ + where: { id: "config" }, + select: { watchlistSyncIntervalMinutes: true }, + }) + const intervalMinutes = config?.watchlistSyncIntervalMinutes ?? 60 + + // Find users who need to be synced + const cutoffTime = new Date(Date.now() - intervalMinutes * 60 * 1000) + + const usersToSync = await prisma.watchlistSyncSettings.findMany({ + where: { + syncEnabled: true, + OR: [{ lastSyncAt: null }, { lastSyncAt: { lt: cutoffTime } }], + }, + select: { userId: true }, + take: 50, // Limit batch size + }) + + logger.info("Starting batch watchlist sync", { + usersToSync: usersToSync.length, + intervalMinutes, + }) + + let usersSucceeded = 0 + let usersFailed = 0 + + // Process users sequentially to avoid rate limiting + for (const { userId } of usersToSync) { + try { + const result = await syncUserWatchlist(userId) + if (result.success) { + usersSucceeded++ + } else { + usersFailed++ + } + + // Small delay between users to be respectful of APIs + await new Promise((resolve) => setTimeout(resolve, 1000)) + } catch (error) { + logger.error("Error syncing user watchlist in batch", error, { userId }) + usersFailed++ + } + } + + const duration = Date.now() - startTime + logger.info("Batch watchlist sync completed", { + usersProcessed: usersToSync.length, + usersSucceeded, + usersFailed, + durationMs: duration, + }) + + return { + usersProcessed: usersToSync.length, + usersSucceeded, + usersFailed, + } + } catch (error) { + logger.error("Batch watchlist sync failed", error) + return { + usersProcessed: 0, + usersSucceeded: 0, + usersFailed: 0, + } + } +} diff --git a/prisma/migrations/20251230031502_add_watchlist_sync/migration.sql b/prisma/migrations/20251230031502_add_watchlist_sync/migration.sql new file mode 100644 index 00000000..d7798c61 --- /dev/null +++ b/prisma/migrations/20251230031502_add_watchlist_sync/migration.sql @@ -0,0 +1,82 @@ +-- CreateEnum +CREATE TYPE "WatchlistSyncStatus" AS ENUM ('SYNCED', 'REQUESTED', 'ALREADY_AVAILABLE', 'ALREADY_REQUESTED', 'FAILED', 'REMOVED_FROM_WATCHLIST'); + +-- AlterTable +ALTER TABLE "Config" ADD COLUMN "watchlistSyncEnabled" BOOLEAN NOT NULL DEFAULT false, +ADD COLUMN "watchlistSyncIntervalMinutes" INTEGER NOT NULL DEFAULT 60; + +-- AlterTable +ALTER TABLE "User" ADD COLUMN "plexAuthToken" TEXT; + +-- CreateTable +CREATE TABLE "WatchlistSyncSettings" ( + "id" TEXT NOT NULL, + "userId" TEXT NOT NULL, + "syncEnabled" BOOLEAN NOT NULL DEFAULT false, + "lastSyncAt" TIMESTAMP(3), + "lastSyncStatus" TEXT, + "lastSyncError" TEXT, + "itemsSynced" INTEGER NOT NULL DEFAULT 0, + "itemsRequested" INTEGER NOT NULL DEFAULT 0, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "WatchlistSyncSettings_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "WatchlistSyncHistory" ( + "id" TEXT NOT NULL, + "userId" TEXT NOT NULL, + "plexRatingKey" TEXT NOT NULL, + "guid" TEXT NOT NULL, + "mediaType" "MediaType" NOT NULL, + "title" TEXT NOT NULL, + "year" INTEGER, + "tmdbId" INTEGER, + "tvdbId" INTEGER, + "imdbId" TEXT, + "syncedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "requestedAt" TIMESTAMP(3), + "overseerrRequestId" INTEGER, + "status" "WatchlistSyncStatus" NOT NULL DEFAULT 'SYNCED', + + CONSTRAINT "WatchlistSyncHistory_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "WatchlistSyncLock" ( + "id" TEXT NOT NULL DEFAULT 'watchlist-sync', + "instanceId" TEXT NOT NULL, + "acquiredAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "expiresAt" TIMESTAMP(3) NOT NULL, + "lastRenewedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "WatchlistSyncLock_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "WatchlistSyncSettings_userId_key" ON "WatchlistSyncSettings"("userId"); + +-- CreateIndex +CREATE INDEX "WatchlistSyncSettings_syncEnabled_idx" ON "WatchlistSyncSettings"("syncEnabled"); + +-- CreateIndex +CREATE INDEX "WatchlistSyncHistory_userId_syncedAt_idx" ON "WatchlistSyncHistory"("userId", "syncedAt"); + +-- CreateIndex +CREATE INDEX "WatchlistSyncHistory_status_idx" ON "WatchlistSyncHistory"("status"); + +-- CreateIndex +CREATE UNIQUE INDEX "WatchlistSyncHistory_userId_plexRatingKey_key" ON "WatchlistSyncHistory"("userId", "plexRatingKey"); + +-- CreateIndex +CREATE INDEX "WatchlistSyncLock_expiresAt_idx" ON "WatchlistSyncLock"("expiresAt"); + +-- AddForeignKey +ALTER TABLE "WatchlistSyncSettings" ADD CONSTRAINT "WatchlistSyncSettings_userId_fkey" FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "WatchlistSyncHistory" ADD CONSTRAINT "WatchlistSyncHistory_userId_fkey" FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/prisma/migrations/20251230034346_add_watchlist_cumulative_counters/migration.sql b/prisma/migrations/20251230034346_add_watchlist_cumulative_counters/migration.sql new file mode 100644 index 00000000..6af495df --- /dev/null +++ b/prisma/migrations/20251230034346_add_watchlist_cumulative_counters/migration.sql @@ -0,0 +1,3 @@ +-- AlterTable +ALTER TABLE "WatchlistSyncSettings" ADD COLUMN "totalItemsRequested" INTEGER NOT NULL DEFAULT 0, +ADD COLUMN "totalItemsSynced" INTEGER NOT NULL DEFAULT 0; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 32d5b665..7b7b0624 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -12,26 +12,29 @@ datasource db { // User model model User { - id String @id @default(cuid()) - name String? - email String? @unique - emailVerified DateTime? - image String? - plexUserId String? @unique - jellyfinUserId String? @unique // Jellyfin user ID (if user was created via Jellyfin invite) - primaryAuthService String? // Track which service user authenticated with first: "plex" or "jellyfin" - isAdmin Boolean @default(false) - onboardingStatus Json? @default("{\"plex\": false, \"jellyfin\": false}") // Track onboarding completion per service - plexWrapped PlexWrapped[] - llmUsage LLMUsage[] - chatConversations ChatConversation[] - inviteUsages InviteUsage[] - discordConnection DiscordConnection? - discordOAuthStates DiscordOAuthState[] - userMediaMarks UserMediaMark[] - userWatchIntents UserWatchIntent[] - createdAt DateTime @default(now()) - updatedAt DateTime @updatedAt + id String @id @default(cuid()) + name String? + email String? @unique + emailVerified DateTime? + image String? + plexUserId String? @unique + plexAuthToken String? // Plex auth token (stored during login for watchlist API access) + jellyfinUserId String? @unique // Jellyfin user ID (if user was created via Jellyfin invite) + primaryAuthService String? // Track which service user authenticated with first: "plex" or "jellyfin" + isAdmin Boolean @default(false) + onboardingStatus Json? @default("{\"plex\": false, \"jellyfin\": false}") // Track onboarding completion per service + plexWrapped PlexWrapped[] + llmUsage LLMUsage[] + chatConversations ChatConversation[] + inviteUsages InviteUsage[] + discordConnection DiscordConnection? + discordOAuthStates DiscordOAuthState[] + userMediaMarks UserMediaMark[] + userWatchIntents UserWatchIntent[] + watchlistSyncSettings WatchlistSyncSettings? + watchlistSyncHistory WatchlistSyncHistory[] + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt @@index([plexUserId]) @@index([jellyfinUserId]) @@ -241,13 +244,15 @@ model LLMUsage { // Application Configuration (singleton pattern) model Config { - id String @id @default("config") - llmDisabled Boolean @default(false) // When true, LLM calls are disabled and mock data is returned - wrappedEnabled Boolean @default(true) // When false, wrapped generation is disabled - wrappedGenerationStartDate DateTime? // Optional start date for when generation is allowed (e.g., Nov 20). Year is auto-determined from this date. - wrappedGenerationEndDate DateTime? // Optional end date for when generation is allowed (e.g., Jan 31) - updatedAt DateTime @updatedAt - updatedBy String? // User ID who last updated this config + id String @id @default("config") + llmDisabled Boolean @default(false) // When true, LLM calls are disabled and mock data is returned + wrappedEnabled Boolean @default(true) // When false, wrapped generation is disabled + wrappedGenerationStartDate DateTime? // Optional start date for when generation is allowed (e.g., Nov 20). Year is auto-determined from this date. + wrappedGenerationEndDate DateTime? // Optional end date for when generation is allowed (e.g., Jan 31) + watchlistSyncEnabled Boolean @default(false) // Global toggle for watchlist sync feature + watchlistSyncIntervalMinutes Int @default(60) // Default sync interval in minutes (minimum 15) + updatedAt DateTime @updatedAt + updatedBy String? // User ID who last updated this config } model DiscordIntegration { @@ -556,3 +561,67 @@ model Announcement { @@index([isActive, expiresAt]) @@index([priority]) } + +// Watchlist Sync - Per-user sync settings +model WatchlistSyncSettings { + id String @id @default(cuid()) + userId String @unique + user User @relation(fields: [userId], references: [id], onDelete: Cascade) + syncEnabled Boolean @default(false) + lastSyncAt DateTime? + lastSyncStatus String? // "success", "partial", "failed" + lastSyncError String? + itemsSynced Int @default(0) // Items processed in last sync + itemsRequested Int @default(0) // Items requested in last sync + totalItemsSynced Int @default(0) // Cumulative total items synced + totalItemsRequested Int @default(0) // Cumulative total requests created + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@index([syncEnabled]) +} + +// Watchlist Sync - Track synced items to avoid duplicates +model WatchlistSyncHistory { + id String @id @default(cuid()) + userId String + user User @relation(fields: [userId], references: [id], onDelete: Cascade) + plexRatingKey String // Plex discover rating key + guid String // GUID string (e.g., "tmdb://12345") + mediaType MediaType + title String + year Int? + tmdbId Int? + tvdbId Int? + imdbId String? + syncedAt DateTime @default(now()) + requestedAt DateTime? // When submitted to Overseerr + overseerrRequestId Int? // Overseerr request ID if submitted + status WatchlistSyncStatus @default(SYNCED) + + @@unique([userId, plexRatingKey]) + @@index([userId, syncedAt]) + @@index([status]) +} + +// Watchlist Sync - Distributed lock (mirrors DiscordBotLock) +model WatchlistSyncLock { + id String @id @default("watchlist-sync") + instanceId String // Unique identifier for this pod/instance + acquiredAt DateTime @default(now()) + expiresAt DateTime // Lock expires if not renewed (handles crashed pods) + lastRenewedAt DateTime @default(now()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@index([expiresAt]) +} + +enum WatchlistSyncStatus { + SYNCED // Item tracked but not requested + REQUESTED // Submitted to Overseerr + ALREADY_AVAILABLE // Already in library + ALREADY_REQUESTED // Already has pending request + FAILED // Request submission failed + REMOVED_FROM_WATCHLIST // Removed from Plex watchlist +} From fc50db2c1bd1ff3446c9ce9e8198951ec021d31a Mon Sep 17 00:00:00 2001 From: Mike Chester Date: Mon, 29 Dec 2025 19:57:45 -0800 Subject: [PATCH 2/2] fix: address PR feedback - accessibility and centralized logging - Add aria-label to toggle buttons for screen reader accessibility - Replace console.error with centralized logger in watchlist actions Co-Authored-By: Claude --- actions/admin/watchlist.ts | 11 +++++++---- actions/watchlist.ts | 11 +++++++---- components/admin/settings/watchlist-sync-settings.tsx | 1 + components/watchlist/sync-settings-card.tsx | 1 + 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/actions/admin/watchlist.ts b/actions/admin/watchlist.ts index fda32568..39732c75 100644 --- a/actions/admin/watchlist.ts +++ b/actions/admin/watchlist.ts @@ -3,10 +3,13 @@ import { authOptions } from "@/lib/auth" import { prisma } from "@/lib/prisma" import { requireAdmin } from "@/lib/admin" +import { createLogger } from "@/lib/utils/logger" import { globalWatchlistSyncSettingsSchema } from "@/lib/validations/watchlist" import { syncUserWatchlist } from "@/lib/watchlist/sync-service" import { getServerSession } from "next-auth" +const logger = createLogger("ADMIN_WATCHLIST_ACTIONS") + /** * Get global watchlist sync settings (admin only) */ @@ -33,7 +36,7 @@ export async function getGlobalWatchlistSyncSettings() { }, } } catch (error) { - console.error("Error fetching global watchlist sync settings:", error) + logger.error("Error fetching global watchlist sync settings", error) return { success: false as const, error: "Failed to fetch settings" } } } @@ -86,7 +89,7 @@ export async function updateGlobalWatchlistSyncSettings(data: unknown) { return { success: true as const } } catch (error) { - console.error("Error updating global watchlist sync settings:", error) + logger.error("Error updating global watchlist sync settings", error) return { success: false as const, error: "Failed to update settings" } } } @@ -136,7 +139,7 @@ export async function getWatchlistSyncStats() { }, } } catch (error) { - console.error("Error fetching watchlist sync stats:", error) + logger.error("Error fetching watchlist sync stats", error) return { success: false as const, error: "Failed to fetch stats" } } } @@ -173,7 +176,7 @@ export async function forceUserWatchlistSync(userId: string) { data: result.data, } } catch (error) { - console.error("Error forcing user watchlist sync:", error) + logger.error("Error forcing user watchlist sync", error) return { success: false as const, error: "Failed to sync" } } } diff --git a/actions/watchlist.ts b/actions/watchlist.ts index 5fc7ca6f..9f285f5d 100644 --- a/actions/watchlist.ts +++ b/actions/watchlist.ts @@ -2,11 +2,14 @@ import { WatchlistSyncStatus } from "@/lib/generated/prisma/client" import { prisma } from "@/lib/prisma" +import { createLogger } from "@/lib/utils/logger" import { updateWatchlistSyncSettingsSchema } from "@/lib/validations/watchlist" import { syncUserWatchlist } from "@/lib/watchlist/sync-service" import { getServerSession } from "next-auth" import { authOptions } from "@/lib/auth" +const logger = createLogger("WATCHLIST_ACTIONS") + /** * Get current user's watchlist sync settings and status */ @@ -86,7 +89,7 @@ export async function getWatchlistSyncSettings() { }, } } catch (error) { - console.error("Error fetching watchlist sync settings:", error) + logger.error("Error fetching watchlist sync settings", error) return { success: false as const, error: "Failed to fetch settings" } } } @@ -159,7 +162,7 @@ export async function updateWatchlistSyncSettings(data: unknown) { return { success: true as const } } catch (error) { - console.error("Error updating watchlist sync settings:", error) + logger.error("Error updating watchlist sync settings", error) return { success: false as const, error: "Failed to update settings" } } } @@ -195,7 +198,7 @@ export async function triggerWatchlistSync() { data: result.data, } } catch (error) { - console.error("Error triggering watchlist sync:", error) + logger.error("Error triggering watchlist sync", error) return { success: false as const, error: "Failed to trigger sync" } } } @@ -251,7 +254,7 @@ export async function getWatchlistSyncHistory(options?: { }, } } catch (error) { - console.error("Error fetching watchlist sync history:", error) + logger.error("Error fetching watchlist sync history", error) return { success: false as const, error: "Failed to fetch history" } } } diff --git a/components/admin/settings/watchlist-sync-settings.tsx b/components/admin/settings/watchlist-sync-settings.tsx index 84d326d5..74097901 100644 --- a/components/admin/settings/watchlist-sync-settings.tsx +++ b/components/admin/settings/watchlist-sync-settings.tsx @@ -157,6 +157,7 @@ export function WatchlistSyncSettings() { `} role="switch" aria-checked={settings.watchlistSyncEnabled} + aria-label={settings.watchlistSyncEnabled ? "Disable watchlist sync" : "Enable watchlist sync"} data-testid="watchlist-sync-global-toggle" >