diff --git a/actions/admin/watchlist.ts b/actions/admin/watchlist.ts
new file mode 100644
index 0000000..39732c7
--- /dev/null
+++ b/actions/admin/watchlist.ts
@@ -0,0 +1,182 @@
+"use server"
+
+import { authOptions } from "@/lib/auth"
+import { prisma } from "@/lib/prisma"
+import { requireAdmin } from "@/lib/admin"
+import { createLogger } from "@/lib/utils/logger"
+import { globalWatchlistSyncSettingsSchema } from "@/lib/validations/watchlist"
+import { syncUserWatchlist } from "@/lib/watchlist/sync-service"
+import { getServerSession } from "next-auth"
+
+const logger = createLogger("ADMIN_WATCHLIST_ACTIONS")
+
+/**
+ * Get global watchlist sync settings (admin only)
+ */
+export async function getGlobalWatchlistSyncSettings() {
+ const isAdmin = await requireAdmin()
+ if (!isAdmin) {
+ return { success: false as const, error: "Unauthorized" }
+ }
+
+ try {
+ const config = await prisma.config.findUnique({
+ where: { id: "config" },
+ select: {
+ watchlistSyncEnabled: true,
+ watchlistSyncIntervalMinutes: true,
+ },
+ })
+
+ return {
+ success: true as const,
+ data: {
+ watchlistSyncEnabled: config?.watchlistSyncEnabled ?? false,
+ watchlistSyncIntervalMinutes: config?.watchlistSyncIntervalMinutes ?? 60,
+ },
+ }
+ } catch (error) {
+ logger.error("Error fetching global watchlist sync settings", error)
+ return { success: false as const, error: "Failed to fetch settings" }
+ }
+}
+
+/**
+ * Update global watchlist sync settings (admin only)
+ */
+export async function updateGlobalWatchlistSyncSettings(data: unknown) {
+ const session = await getServerSession(authOptions)
+ const isAdmin = await requireAdmin()
+ if (!isAdmin) {
+ return { success: false as const, error: "Unauthorized" }
+ }
+
+ // Validate input
+ const validated = globalWatchlistSyncSettingsSchema.safeParse(data)
+ if (!validated.success) {
+ return { success: false as const, error: "Invalid input" }
+ }
+
+ try {
+ // Check if Overseerr is configured before enabling
+ if (validated.data.watchlistSyncEnabled) {
+ const overseerr = await prisma.overseerr.findFirst({
+ where: { isActive: true },
+ })
+
+ if (!overseerr) {
+ return {
+ success: false as const,
+ error: "Cannot enable watchlist sync without an active Overseerr server",
+ }
+ }
+ }
+
+ await prisma.config.upsert({
+ where: { id: "config" },
+ create: {
+ id: "config",
+ watchlistSyncEnabled: validated.data.watchlistSyncEnabled,
+ watchlistSyncIntervalMinutes: validated.data.watchlistSyncIntervalMinutes,
+ updatedBy: session?.user?.id,
+ },
+ update: {
+ watchlistSyncEnabled: validated.data.watchlistSyncEnabled,
+ watchlistSyncIntervalMinutes: validated.data.watchlistSyncIntervalMinutes,
+ updatedBy: session?.user?.id,
+ },
+ })
+
+ return { success: true as const }
+ } catch (error) {
+ logger.error("Error updating global watchlist sync settings", error)
+ return { success: false as const, error: "Failed to update settings" }
+ }
+}
+
+/**
+ * Get watchlist sync statistics (admin only)
+ */
+export async function getWatchlistSyncStats() {
+ const isAdmin = await requireAdmin()
+ if (!isAdmin) {
+ return { success: false as const, error: "Unauthorized" }
+ }
+
+ try {
+ const [usersWithSyncEnabled, totalItemsSynced, totalItemsRequested, recentHistory] =
+ await Promise.all([
+ prisma.watchlistSyncSettings.count({
+ where: { syncEnabled: true },
+ }),
+ prisma.watchlistSyncHistory.count(),
+ prisma.watchlistSyncHistory.count({
+ where: { status: "REQUESTED" },
+ }),
+ prisma.watchlistSyncHistory.findMany({
+ orderBy: { syncedAt: "desc" },
+ take: 10,
+ select: {
+ id: true,
+ title: true,
+ mediaType: true,
+ status: true,
+ syncedAt: true,
+ user: {
+ select: { name: true, email: true },
+ },
+ },
+ }),
+ ])
+
+ return {
+ success: true as const,
+ data: {
+ usersWithSyncEnabled,
+ totalItemsSynced,
+ totalItemsRequested,
+ recentHistory,
+ },
+ }
+ } catch (error) {
+ logger.error("Error fetching watchlist sync stats", error)
+ return { success: false as const, error: "Failed to fetch stats" }
+ }
+}
+
+/**
+ * Force sync for a specific user (admin only)
+ */
+export async function forceUserWatchlistSync(userId: string) {
+ const isAdmin = await requireAdmin()
+ if (!isAdmin) {
+ return { success: false as const, error: "Unauthorized" }
+ }
+
+ try {
+ // Verify user exists
+ const user = await prisma.user.findUnique({
+ where: { id: userId },
+ select: { id: true },
+ })
+
+ if (!user) {
+ return { success: false as const, error: "User not found" }
+ }
+
+ // Perform sync
+ const result = await syncUserWatchlist(userId)
+
+ if (!result.success) {
+ return { success: false as const, error: result.error || "Sync failed" }
+ }
+
+ return {
+ success: true as const,
+ data: result.data,
+ }
+ } catch (error) {
+ logger.error("Error forcing user watchlist sync", error)
+ return { success: false as const, error: "Failed to sync" }
+ }
+}
diff --git a/actions/watchlist.ts b/actions/watchlist.ts
new file mode 100644
index 0000000..9f285f5
--- /dev/null
+++ b/actions/watchlist.ts
@@ -0,0 +1,260 @@
+"use server"
+
+import { WatchlistSyncStatus } from "@/lib/generated/prisma/client"
+import { prisma } from "@/lib/prisma"
+import { createLogger } from "@/lib/utils/logger"
+import { updateWatchlistSyncSettingsSchema } from "@/lib/validations/watchlist"
+import { syncUserWatchlist } from "@/lib/watchlist/sync-service"
+import { getServerSession } from "next-auth"
+import { authOptions } from "@/lib/auth"
+
+const logger = createLogger("WATCHLIST_ACTIONS")
+
+/**
+ * Get current user's watchlist sync settings and status
+ */
+export async function getWatchlistSyncSettings() {
+ const session = await getServerSession(authOptions)
+ if (!session?.user?.id) {
+ return { success: false as const, error: "Not authenticated" }
+ }
+
+ try {
+ // Get user's Plex auth token status and sync settings
+ const user = await prisma.user.findUnique({
+ where: { id: session.user.id },
+ select: {
+ plexAuthToken: true,
+ watchlistSyncSettings: true,
+ },
+ })
+
+ // Check if Overseerr is configured
+ const overseerr = await prisma.overseerr.findFirst({
+ where: { isActive: true },
+ select: { id: true },
+ })
+
+ // Check if global sync is enabled
+ const config = await prisma.config.findUnique({
+ where: { id: "config" },
+ select: { watchlistSyncEnabled: true },
+ })
+
+ // Get recent sync history if sync is enabled
+ let recentHistory: Array<{
+ id: string
+ title: string
+ year: number | null
+ mediaType: string
+ status: string
+ syncedAt: Date
+ }> = []
+
+ if (user?.watchlistSyncSettings?.syncEnabled) {
+ recentHistory = await prisma.watchlistSyncHistory.findMany({
+ where: { userId: session.user.id },
+ orderBy: { syncedAt: "desc" },
+ take: 5,
+ select: {
+ id: true,
+ title: true,
+ year: true,
+ mediaType: true,
+ status: true,
+ syncedAt: true,
+ },
+ })
+ }
+
+ return {
+ success: true as const,
+ data: {
+ hasPlexToken: !!user?.plexAuthToken,
+ hasOverseerr: !!overseerr,
+ globalSyncEnabled: config?.watchlistSyncEnabled ?? false,
+ settings: user?.watchlistSyncSettings
+ ? {
+ syncEnabled: user.watchlistSyncSettings.syncEnabled,
+ lastSyncAt: user.watchlistSyncSettings.lastSyncAt,
+ lastSyncStatus: user.watchlistSyncSettings.lastSyncStatus,
+ lastSyncError: user.watchlistSyncSettings.lastSyncError,
+ itemsSynced: user.watchlistSyncSettings.itemsSynced,
+ itemsRequested: user.watchlistSyncSettings.itemsRequested,
+ totalItemsSynced: user.watchlistSyncSettings.totalItemsSynced,
+ totalItemsRequested: user.watchlistSyncSettings.totalItemsRequested,
+ }
+ : null,
+ recentHistory,
+ },
+ }
+ } catch (error) {
+ logger.error("Error fetching watchlist sync settings", error)
+ return { success: false as const, error: "Failed to fetch settings" }
+ }
+}
+
+/**
+ * Update user's watchlist sync settings
+ */
+export async function updateWatchlistSyncSettings(data: unknown) {
+ const session = await getServerSession(authOptions)
+ if (!session?.user?.id) {
+ return { success: false as const, error: "Not authenticated" }
+ }
+
+ // Validate input
+ const validated = updateWatchlistSyncSettingsSchema.safeParse(data)
+ if (!validated.success) {
+ return { success: false as const, error: "Invalid input" }
+ }
+
+ try {
+ // Check prerequisites if enabling
+ if (validated.data.syncEnabled) {
+ const user = await prisma.user.findUnique({
+ where: { id: session.user.id },
+ select: { plexAuthToken: true },
+ })
+
+ if (!user?.plexAuthToken) {
+ return {
+ success: false as const,
+ error: "Please log in with Plex to enable watchlist sync",
+ }
+ }
+
+ const overseerr = await prisma.overseerr.findFirst({
+ where: { isActive: true },
+ })
+
+ if (!overseerr) {
+ return {
+ success: false as const,
+ error: "Overseerr is not configured. Please contact an administrator.",
+ }
+ }
+
+ const config = await prisma.config.findUnique({
+ where: { id: "config" },
+ select: { watchlistSyncEnabled: true },
+ })
+
+ if (!config?.watchlistSyncEnabled) {
+ return {
+ success: false as const,
+ error: "Watchlist sync is not enabled globally. Please contact an administrator.",
+ }
+ }
+ }
+
+ // Upsert settings
+ await prisma.watchlistSyncSettings.upsert({
+ where: { userId: session.user.id },
+ create: {
+ userId: session.user.id,
+ syncEnabled: validated.data.syncEnabled,
+ },
+ update: {
+ syncEnabled: validated.data.syncEnabled,
+ },
+ })
+
+ return { success: true as const }
+ } catch (error) {
+ logger.error("Error updating watchlist sync settings", error)
+ return { success: false as const, error: "Failed to update settings" }
+ }
+}
+
+/**
+ * Trigger a manual sync for the current user
+ */
+export async function triggerWatchlistSync() {
+ const session = await getServerSession(authOptions)
+ if (!session?.user?.id) {
+ return { success: false as const, error: "Not authenticated" }
+ }
+
+ try {
+ // Check if sync is enabled for user
+ const settings = await prisma.watchlistSyncSettings.findUnique({
+ where: { userId: session.user.id },
+ })
+
+ if (!settings?.syncEnabled) {
+ return { success: false as const, error: "Watchlist sync is not enabled" }
+ }
+
+ // Perform sync
+ const result = await syncUserWatchlist(session.user.id)
+
+ if (!result.success) {
+ return { success: false as const, error: result.error || "Sync failed" }
+ }
+
+ return {
+ success: true as const,
+ data: result.data,
+ }
+ } catch (error) {
+ logger.error("Error triggering watchlist sync", error)
+ return { success: false as const, error: "Failed to trigger sync" }
+ }
+}
+
+/**
+ * Get user's watchlist sync history
+ */
+export async function getWatchlistSyncHistory(options?: {
+ limit?: number
+ offset?: number
+ status?: WatchlistSyncStatus
+}) {
+ const session = await getServerSession(authOptions)
+ if (!session?.user?.id) {
+ return { success: false as const, error: "Not authenticated" }
+ }
+
+ const limit = options?.limit ?? 20
+ const offset = options?.offset ?? 0
+
+ try {
+ const where = {
+ userId: session.user.id,
+ ...(options?.status ? { status: options.status } : {}),
+ }
+
+ const [items, total] = await Promise.all([
+ prisma.watchlistSyncHistory.findMany({
+ where,
+ orderBy: { syncedAt: "desc" },
+ take: limit,
+ skip: offset,
+ select: {
+ id: true,
+ title: true,
+ year: true,
+ mediaType: true,
+ status: true,
+ syncedAt: true,
+ requestedAt: true,
+ overseerrRequestId: true,
+ },
+ }),
+ prisma.watchlistSyncHistory.count({ where }),
+ ])
+
+ return {
+ success: true as const,
+ data: {
+ items,
+ total,
+ hasMore: offset + items.length < total,
+ },
+ }
+ } catch (error) {
+ logger.error("Error fetching watchlist sync history", error)
+ return { success: false as const, error: "Failed to fetch history" }
+ }
+}
diff --git a/app/admin/settings/page.tsx b/app/admin/settings/page.tsx
index a1924bc..ec4243a 100644
--- a/app/admin/settings/page.tsx
+++ b/app/admin/settings/page.tsx
@@ -1,5 +1,6 @@
import { getAdminSettings } from "@/actions/admin"
import { DiscordIntegrationForm, LLMProviderForm, LLMToggle, ServerForm } from "@/components/admin/settings/settings-edit-forms"
+import { WatchlistSyncSettings } from "@/components/admin/settings/watchlist-sync-settings"
import { WrappedSettingsForm } from "@/components/admin/settings/wrapped-settings-form"
import { getBaseUrl } from "@/lib/utils"
@@ -355,6 +356,9 @@ export default async function SettingsPage() {
+ {/* Watchlist Sync Settings */}
+
+
{/* System Information */}
diff --git a/components/admin/settings/watchlist-sync-settings.tsx b/components/admin/settings/watchlist-sync-settings.tsx
new file mode 100644
index 0000000..7409790
--- /dev/null
+++ b/components/admin/settings/watchlist-sync-settings.tsx
@@ -0,0 +1,283 @@
+"use client"
+
+import {
+ getGlobalWatchlistSyncSettings,
+ getWatchlistSyncStats,
+ updateGlobalWatchlistSyncSettings,
+} from "@/actions/admin/watchlist"
+import { useToast } from "@/components/ui/toast"
+import { useCallback, useEffect, useState } from "react"
+
+interface GlobalSettings {
+ watchlistSyncEnabled: boolean
+ watchlistSyncIntervalMinutes: number
+}
+
+interface SyncStats {
+ usersWithSyncEnabled: number
+ totalItemsSynced: number
+ totalItemsRequested: number
+ recentHistory: Array<{
+ id: string
+ title: string
+ mediaType: string
+ status: string
+ syncedAt: Date
+ user: { name: string | null; email: string | null }
+ }>
+}
+
+export function WatchlistSyncSettings() {
+ const toast = useToast()
+ const [settings, setSettings] = useState
(null)
+ const [stats, setStats] = useState(null)
+ const [isLoading, setIsLoading] = useState(true)
+ const [isSaving, setIsSaving] = useState(false)
+ const [intervalInput, setIntervalInput] = useState("")
+
+ const loadData = useCallback(async () => {
+ const [settingsResult, statsResult] = await Promise.all([
+ getGlobalWatchlistSyncSettings(),
+ getWatchlistSyncStats(),
+ ])
+
+ if (settingsResult.success && settingsResult.data) {
+ setSettings(settingsResult.data)
+ setIntervalInput(settingsResult.data.watchlistSyncIntervalMinutes.toString())
+ }
+
+ if (statsResult.success && statsResult.data) {
+ setStats(statsResult.data)
+ }
+
+ setIsLoading(false)
+ }, [])
+
+ useEffect(() => {
+ loadData()
+ }, [loadData])
+
+ const handleToggleEnabled = async () => {
+ if (!settings) return
+
+ setIsSaving(true)
+ try {
+ const result = await updateGlobalWatchlistSyncSettings({
+ watchlistSyncEnabled: !settings.watchlistSyncEnabled,
+ watchlistSyncIntervalMinutes: settings.watchlistSyncIntervalMinutes,
+ })
+
+ if (result.success) {
+ setSettings({
+ ...settings,
+ watchlistSyncEnabled: !settings.watchlistSyncEnabled,
+ })
+ toast.showSuccess(
+ settings.watchlistSyncEnabled
+ ? "Watchlist sync disabled"
+ : "Watchlist sync enabled"
+ )
+ } else {
+ toast.showError(result.error || "Failed to update settings")
+ }
+ } finally {
+ setIsSaving(false)
+ }
+ }
+
+ const handleIntervalChange = async () => {
+ if (!settings) return
+
+ const interval = parseInt(intervalInput, 10)
+ if (isNaN(interval) || interval < 15 || interval > 1440) {
+ toast.showError("Interval must be between 15 and 1440 minutes")
+ return
+ }
+
+ setIsSaving(true)
+ try {
+ const result = await updateGlobalWatchlistSyncSettings({
+ watchlistSyncEnabled: settings.watchlistSyncEnabled,
+ watchlistSyncIntervalMinutes: interval,
+ })
+
+ if (result.success) {
+ setSettings({
+ ...settings,
+ watchlistSyncIntervalMinutes: interval,
+ })
+ toast.showSuccess("Sync interval updated")
+ } else {
+ toast.showError(result.error || "Failed to update settings")
+ }
+ } finally {
+ setIsSaving(false)
+ }
+ }
+
+ if (isLoading) {
+ return (
+
+ )
+ }
+
+ if (!settings) {
+ return null
+ }
+
+ return (
+
+ {/* Header */}
+
+
+
Watchlist Sync
+
+ Automatically sync users' Plex watchlists to Overseerr requests.
+ Users must enable sync individually in their settings.
+
+
+
+
+ {/* Enable/Disable Toggle */}
+
+
+
+
+ {settings.watchlistSyncEnabled ? "Enabled" : "Disabled"}
+
+ {isSaving && (
+
+ )}
+
+
+ {/* Interval Setting */}
+ {settings.watchlistSyncEnabled && (
+
+
+
+ setIntervalInput(e.target.value)}
+ className="w-24 rounded-lg border border-slate-600 bg-slate-700 px-3 py-2 text-sm text-white focus:border-cyan-500 focus:outline-none focus:ring-1 focus:ring-cyan-500"
+ data-testid="watchlist-sync-interval-input"
+ />
+
+
+
+ How often to check for new items in users' watchlists (15-1440 minutes)
+
+
+ )}
+
+ {/* Statistics */}
+ {stats && settings.watchlistSyncEnabled && (
+
+
Statistics
+
+
+
{stats.usersWithSyncEnabled}
+
Users with sync enabled
+
+
+
{stats.totalItemsSynced}
+
Total items synced
+
+
+
{stats.totalItemsRequested}
+
Requests created
+
+
+
+ {/* Recent Activity */}
+ {stats.recentHistory.length > 0 && (
+
+
Recent Activity
+
+ {stats.recentHistory.slice(0, 5).map((item) => (
+
+
+ {item.title}
+
+ by {item.user.name || item.user.email || "Unknown"}
+
+
+
+ {item.status.replace(/_/g, " ")}
+
+
+ ))}
+
+
+ )}
+
+ )}
+
+ {/* Warning when disabled */}
+ {!settings.watchlistSyncEnabled && (
+
+
+ Watchlist sync is disabled. Users will not be able to enable sync in their settings.
+
+
+ )}
+
+
+ )
+}
diff --git a/components/dashboard/user-dashboard.tsx b/components/dashboard/user-dashboard.tsx
index 8078453..9707a1a 100644
--- a/components/dashboard/user-dashboard.tsx
+++ b/components/dashboard/user-dashboard.tsx
@@ -9,6 +9,7 @@ import { RequestsCard } from "@/components/dashboard/requests-card"
import { StatusFooter } from "@/components/dashboard/status-background"
import { WrappedCard } from "@/components/dashboard/wrapped-card"
import type { DashboardDiscordConnection } from "@/components/discord/link-callout"
+import { WatchlistSyncCard } from "@/components/watchlist/sync-settings-card"
import { signOut } from "next-auth/react"
import Link from "next/link"
import { useRouter } from "next/navigation"
@@ -117,6 +118,14 @@ export function UserDashboard({
)}
+
+ {/* Features section */}
+
diff --git a/components/watchlist/sync-history-table.tsx b/components/watchlist/sync-history-table.tsx
new file mode 100644
index 0000000..9b019b9
--- /dev/null
+++ b/components/watchlist/sync-history-table.tsx
@@ -0,0 +1,190 @@
+"use client"
+
+import { getWatchlistSyncHistory } from "@/actions/watchlist"
+import { WatchlistSyncStatus } from "@/lib/generated/prisma/client"
+import { useCallback, useEffect, useState } from "react"
+
+interface SyncHistoryItem {
+ id: string
+ title: string
+ year: number | null
+ mediaType: string
+ status: WatchlistSyncStatus
+ syncedAt: Date
+ requestedAt: Date | null
+ overseerrRequestId: number | null
+}
+
+interface SyncHistoryData {
+ items: SyncHistoryItem[]
+ total: number
+ hasMore: boolean
+}
+
+interface SyncHistoryTableProps {
+ limit?: number
+}
+
+export function SyncHistoryTable({ limit = 10 }: SyncHistoryTableProps) {
+ const [data, setData] = useState(null)
+ const [isLoading, setIsLoading] = useState(true)
+ const [offset, setOffset] = useState(0)
+
+ const loadHistory = useCallback(async () => {
+ setIsLoading(true)
+ const result = await getWatchlistSyncHistory({ limit, offset })
+ if (result.success && result.data) {
+ setData(result.data)
+ }
+ setIsLoading(false)
+ }, [limit, offset])
+
+ useEffect(() => {
+ loadHistory()
+ }, [loadHistory])
+
+ const getStatusBadge = (status: WatchlistSyncStatus) => {
+ const styles: Record = {
+ SYNCED: { bg: "bg-slate-700/50", text: "text-slate-300", label: "Synced" },
+ REQUESTED: { bg: "bg-green-900/50", text: "text-green-300", label: "Requested" },
+ ALREADY_AVAILABLE: { bg: "bg-cyan-900/50", text: "text-cyan-300", label: "Available" },
+ ALREADY_REQUESTED: { bg: "bg-amber-900/50", text: "text-amber-300", label: "Pending" },
+ FAILED: { bg: "bg-red-900/50", text: "text-red-300", label: "Failed" },
+ REMOVED_FROM_WATCHLIST: { bg: "bg-slate-800/50", text: "text-slate-500", label: "Removed" },
+ }
+ const style = styles[status]
+ return (
+
+ {style.label}
+
+ )
+ }
+
+ const getMediaTypeIcon = (mediaType: string) => {
+ if (mediaType === "MOVIE") {
+ return (
+
+ )
+ }
+ return (
+
+ )
+ }
+
+ const formatDate = (date: Date) => {
+ const d = new Date(date)
+ return d.toLocaleDateString(undefined, { month: "short", day: "numeric", year: "numeric" })
+ }
+
+ if (isLoading) {
+ return (
+
+
+ {[...Array(3)].map((_, i) => (
+
+ ))}
+
+
+ )
+ }
+
+ if (!data || data.items.length === 0) {
+ return (
+
+
No sync history yet
+
+ Items from your Plex watchlist will appear here once synced
+
+
+ )
+ }
+
+ return (
+
+
+
+
+
+ |
+ Title
+ |
+
+ Type
+ |
+
+ Status
+ |
+
+ Synced
+ |
+
+
+
+ {data.items.map((item) => (
+
+ |
+
+ {item.title}
+ {item.year && ({item.year})}
+
+ |
+
+
+ {getMediaTypeIcon(item.mediaType)}
+
+ {item.mediaType === "MOVIE" ? "Movie" : "TV Show"}
+
+
+ |
+
+ {getStatusBadge(item.status)}
+ |
+
+ {formatDate(item.syncedAt)}
+ |
+
+ ))}
+
+
+
+
+ {/* Pagination */}
+ {(data.hasMore || offset > 0) && (
+
+
+
+ Showing {offset + 1}-{Math.min(offset + data.items.length, data.total)} of {data.total}
+
+
+
+ )}
+
+ )
+}
diff --git a/components/watchlist/sync-settings-card.tsx b/components/watchlist/sync-settings-card.tsx
new file mode 100644
index 0000000..bc00a76
--- /dev/null
+++ b/components/watchlist/sync-settings-card.tsx
@@ -0,0 +1,363 @@
+"use client"
+
+import { getWatchlistSyncSettings, triggerWatchlistSync, updateWatchlistSyncSettings } from "@/actions/watchlist"
+import { useToast } from "@/components/ui/toast"
+import { motion } from "framer-motion"
+import { useCallback, useEffect, useState, useTransition } from "react"
+
+interface RecentHistoryItem {
+ id: string
+ title: string
+ year: number | null
+ mediaType: string
+ status: string
+ syncedAt: Date
+}
+
+interface SyncSettings {
+ hasPlexToken: boolean
+ hasOverseerr: boolean
+ globalSyncEnabled: boolean
+ settings: {
+ syncEnabled: boolean
+ lastSyncAt: Date | null
+ lastSyncStatus: string | null
+ lastSyncError: string | null
+ itemsSynced: number
+ itemsRequested: number
+ totalItemsSynced: number
+ totalItemsRequested: number
+ } | null
+ recentHistory: RecentHistoryItem[]
+}
+
+export function WatchlistSyncCard() {
+ const { showSuccess, showError, showInfo } = useToast()
+ const [isPending, startTransition] = useTransition()
+ const [isSyncing, setIsSyncing] = useState(false)
+ const [settings, setSettings] = useState(null)
+ const [isLoading, setIsLoading] = useState(true)
+
+ const loadSettings = useCallback(async () => {
+ const result = await getWatchlistSyncSettings()
+ if (result.success && result.data) {
+ setSettings(result.data)
+ }
+ setIsLoading(false)
+ }, [])
+
+ useEffect(() => {
+ loadSettings()
+ }, [loadSettings])
+
+ const handleToggleSync = () => {
+ if (!settings) return
+
+ const newEnabled = !settings.settings?.syncEnabled
+
+ startTransition(async () => {
+ const result = await updateWatchlistSyncSettings({ syncEnabled: newEnabled })
+ if (result.success) {
+ showSuccess(newEnabled ? "Watchlist sync enabled" : "Watchlist sync disabled")
+ await loadSettings()
+ } else {
+ showError(result.error || "Failed to update settings")
+ }
+ })
+ }
+
+ const handleManualSync = async () => {
+ setIsSyncing(true)
+ try {
+ const result = await triggerWatchlistSync()
+ if (result.success && result.data) {
+ const { itemsSynced, itemsRequested, itemsFailed } = result.data
+ if (itemsRequested > 0) {
+ showSuccess(`Synced ${itemsSynced} items, requested ${itemsRequested} new items`)
+ } else if (itemsSynced > 0) {
+ showInfo(`Synced ${itemsSynced} items (all already available or requested)`)
+ } else {
+ showInfo("Watchlist is up to date")
+ }
+ if (itemsFailed > 0) {
+ showError(`${itemsFailed} items failed to sync`)
+ }
+ await loadSettings()
+ } else {
+ showError(result.error || "Sync failed")
+ }
+ } finally {
+ setIsSyncing(false)
+ }
+ }
+
+ // Not configured - don't show card
+ if (!isLoading && settings && !settings.hasOverseerr) {
+ return null
+ }
+
+ // Loading state
+ if (isLoading) {
+ return (
+
+
+
+ )
+ }
+
+ if (!settings) {
+ return null
+ }
+
+ const canEnable = settings.hasPlexToken && settings.hasOverseerr && settings.globalSyncEnabled
+ const isEnabled = settings.settings?.syncEnabled ?? false
+ const lastSync = settings.settings?.lastSyncAt
+ const lastStatus = settings.settings?.lastSyncStatus
+
+ // Format last sync time
+ const formatLastSync = (date: Date | null | undefined) => {
+ if (!date) return "Never"
+ const d = new Date(date)
+ const now = new Date()
+ const diffMs = now.getTime() - d.getTime()
+ const diffMins = Math.floor(diffMs / 60000)
+ const diffHours = Math.floor(diffMs / 3600000)
+ const diffDays = Math.floor(diffMs / 86400000)
+
+ if (diffMins < 1) return "Just now"
+ if (diffMins < 60) return `${diffMins}m ago`
+ if (diffHours < 24) return `${diffHours}h ago`
+ return `${diffDays}d ago`
+ }
+
+ const getStatusColor = (status: string | null) => {
+ switch (status) {
+ case "success":
+ return "text-green-400"
+ case "partial":
+ return "text-amber-400"
+ case "failed":
+ return "text-red-400"
+ default:
+ return "text-slate-400"
+ }
+ }
+
+ return (
+
+ {/* Ambient glow */}
+
+
+
+ {/* Header */}
+
+
+ {/* Watchlist Icon */}
+
+
+
Watchlist Sync
+
+ Auto-request items from your Plex watchlist
+
+
+
+
+ {/* Toggle */}
+
+
+
+ {/* Prerequisites warning */}
+ {!canEnable && (
+
+ {!settings.hasPlexToken && (
+
Log in with Plex to enable watchlist sync
+ )}
+ {settings.hasPlexToken && !settings.globalSyncEnabled && (
+
Watchlist sync is disabled by your administrator
+ )}
+ {settings.hasPlexToken && settings.globalSyncEnabled && !settings.hasOverseerr && (
+
Overseerr is not configured
+ )}
+
+ )}
+
+ {/* Status when enabled */}
+ {isEnabled && (
+
+ {/* Stats row */}
+
+
+
+ Last sync: {formatLastSync(lastSync)}
+
+ {lastStatus && (
+
+ {lastStatus === "success" && "Success"}
+ {lastStatus === "partial" && "Partial"}
+ {lastStatus === "failed" && "Failed"}
+
+ )}
+
+
+
+ {/* Cumulative stats */}
+ {settings.settings && (settings.settings.totalItemsSynced > 0 || settings.settings.totalItemsRequested > 0) && (
+
+
+ {settings.settings.totalItemsSynced} items synced
+
+
+ {settings.settings.totalItemsRequested} requested
+
+
+ )}
+
+ {/* Error message */}
+ {settings.settings?.lastSyncError && (
+
+ {settings.settings.lastSyncError}
+
+ )}
+
+ {/* Recent sync history */}
+ {settings.recentHistory && settings.recentHistory.length > 0 && (
+
+
Recent items
+
+ {settings.recentHistory.map((item) => (
+
+
+
+ {item.mediaType === "MOVIE" ? "🎬" : "📺"}
+
+
+ {item.title}
+ {item.year && ({item.year})}
+
+
+
+ {item.status === "REQUESTED" && "Requested"}
+ {item.status === "ALREADY_AVAILABLE" && "Available"}
+ {item.status === "ALREADY_REQUESTED" && "Pending"}
+ {item.status === "FAILED" && "Failed"}
+ {item.status === "SYNCED" && "Synced"}
+
+
+ ))}
+
+
+ )}
+
+ {/* Manual sync button */}
+
+
+ )}
+
+
+ )
+}
diff --git a/instrumentation.ts b/instrumentation.ts
index 26b1525..1d7cdc2 100644
--- a/instrumentation.ts
+++ b/instrumentation.ts
@@ -9,115 +9,8 @@ export async function register() {
return
}
- // Check if bot should attempt to start (can be disabled via env var for manual control)
- const envBotEnabled = process.env.ENABLE_DISCORD_BOT !== "false"
-
- if (!envBotEnabled) {
- if (process.env.NODE_ENV === "development") {
- console.log("Discord bot disabled via ENABLE_DISCORD_BOT=false")
- }
- return
- }
-
- // Check database setting - if bot is disabled there, don't start polling
- try {
- const { isDiscordBotEnabled } = await import("./lib/discord/lock")
- const botEnabled = await isDiscordBotEnabled()
- if (!botEnabled) {
- if (process.env.NODE_ENV === "development") {
- console.log("Discord bot disabled in database settings")
- }
- return
- }
- } catch (error) {
- // If we can't check the database, proceed anyway (database might not be ready yet)
- // The polling loop will check the setting periodically
- if (process.env.NODE_ENV === "development") {
- console.warn("Could not check Discord bot enabled status:", error)
- }
- }
-
- // Use dynamic import with a string to prevent Next.js from analyzing the dependency tree
- // This ensures Discord.js and its native dependencies aren't bundled
- try {
- const { startDiscordBotLockPolling, stopDiscordBotLockPolling, releaseDiscordBotLock } = await import("./lib/discord/lock")
- const botModule = await import("./lib/discord/bot")
- const bot = botModule.getDiscordBot()
-
- // Poll interval in milliseconds (default: 60 seconds)
- const pollIntervalMs = parseInt(process.env.DISCORD_BOT_POLL_INTERVAL_MS || "60000", 10)
-
- // Store bot instance for cleanup
- let botInstance: any = null
-
- // Start background polling - this doesn't block server startup
- // The bot will initialize automatically when the lock is acquired
- startDiscordBotLockPolling(
- // onLockAcquired - called when we successfully acquire the lock
- async () => {
- try {
- await bot.initialize()
- botInstance = bot
- console.log("Discord bot initialized successfully (holding distributed lock)")
- } catch (error) {
- console.error("Failed to initialize Discord bot:", error)
- // Release lock if initialization fails
- await releaseDiscordBotLock()
- }
- },
- // onLockLost - called if we lose the lock (e.g., another instance took it)
- async () => {
- try {
- console.log("Discord bot lock lost - shutting down bot")
- if (botInstance) {
- await botInstance.destroy()
- botInstance = null
- }
- } catch (error) {
- console.error("Error shutting down bot after lock loss:", error)
- }
- },
- pollIntervalMs
- )
-
- console.log(`Discord bot lock polling started (checking every ${pollIntervalMs / 1000} seconds)`)
-
- // Graceful shutdown handlers - only register if we're in Node.js runtime
- // Skip in test environments (Playwright) to avoid interference
- if (process.env.NODE_ENV !== "test") {
- const shutdown = async () => {
- try {
- await stopDiscordBotLockPolling()
- // Bot instance will be destroyed by stopDiscordBotLockPolling if it exists
- // But also try to destroy it here as a fallback
- try {
- await bot.destroy()
- } catch {
- // Ignore errors if bot wasn't initialized
- }
- } catch (error) {
- console.error("Error during Discord bot shutdown:", error)
- }
- }
-
- // Use dynamic property access to avoid Next.js static analysis issues with Edge Runtime
- // process.on is already verified to exist by the top-level check
- // Using a helper function to make it harder for Next.js to statically analyze
- const registerSignalHandler = (signal: string) => {
- const onMethod = (process as any)["on"]
- if (typeof onMethod === "function") {
- onMethod.call(process, signal, shutdown)
- }
- }
- registerSignalHandler("SIGINT")
- registerSignalHandler("SIGTERM")
- }
- } catch (error) {
- // Silently fail if Discord.js can't be loaded (e.g., missing native dependencies)
- // This allows the app to start even if the bot can't be initialized
- if (process.env.NODE_ENV === "development") {
- console.warn("Discord bot module could not be loaded:", error)
- }
- }
+ // Dynamically import Node.js-specific instrumentation
+ // This prevents Edge Runtime from analyzing Node.js-only dependencies
+ const { startNodeInstrumentation } = await import("./lib/instrumentation/node")
+ await startNodeInstrumentation()
}
-
diff --git a/lib/auth.ts b/lib/auth.ts
index 69a23df..077ff9c 100644
--- a/lib/auth.ts
+++ b/lib/auth.ts
@@ -158,6 +158,7 @@ export const authOptions: NextAuthOptions = {
dbUser = await prisma.user.create({
data: {
plexUserId: plexUser.id,
+ plexAuthToken: authToken, // Store for watchlist sync
name: plexUser.username,
email: plexUser.email,
image: plexUser.thumb,
@@ -181,6 +182,7 @@ export const authOptions: NextAuthOptions = {
dbUser = await prisma.user.update({
where: { id: dbUser.id },
data: {
+ plexAuthToken: authToken, // Update token on each login for watchlist sync
name: plexUser.username,
email: plexUser.email,
image: plexUser.thumb,
diff --git a/lib/connections/overseerr.ts b/lib/connections/overseerr.ts
index be11d1f..427a301 100644
--- a/lib/connections/overseerr.ts
+++ b/lib/connections/overseerr.ts
@@ -231,3 +231,143 @@ export async function batchGetOverseerrMediaStatus(
return results
}
+
+// Overseerr media status codes
+export const OverseerrMediaStatus = {
+ UNKNOWN: 1,
+ PENDING: 2,
+ PROCESSING: 3,
+ PARTIALLY_AVAILABLE: 4,
+ AVAILABLE: 5,
+} as const
+
+export interface SubmitOverseerrRequestPayload {
+ mediaType: "movie" | "tv"
+ mediaId: number // TMDB ID
+ seasons?: number[] // For TV shows, which seasons to request
+ is4k?: boolean
+}
+
+export interface SubmitOverseerrRequestResult {
+ success: boolean
+ requestId?: number
+ error?: string
+ status: "created" | "already_requested" | "already_available" | "failed"
+}
+
+/**
+ * Submit a media request to Overseerr
+ * @param config Overseerr configuration
+ * @param payload Request details (mediaType, mediaId, optional seasons)
+ */
+export async function submitOverseerrRequest(
+ config: OverseerrParsed,
+ payload: SubmitOverseerrRequestPayload
+): Promise {
+ try {
+ // First check if media already exists/is requested
+ const existingStatus = await getOverseerrMediaByTmdbId(config, payload.mediaId, payload.mediaType)
+
+ if (existingStatus.success && existingStatus.data) {
+ const status = existingStatus.data.status
+
+ // Check if already available
+ if (status === OverseerrMediaStatus.AVAILABLE || status === OverseerrMediaStatus.PARTIALLY_AVAILABLE) {
+ logger.debug("Media already available in Overseerr", {
+ tmdbId: payload.mediaId,
+ mediaType: payload.mediaType,
+ status,
+ })
+ return {
+ success: true,
+ status: "already_available",
+ }
+ }
+
+ // Check if already requested (pending or processing)
+ if (status === OverseerrMediaStatus.PENDING || status === OverseerrMediaStatus.PROCESSING) {
+ logger.debug("Media already requested in Overseerr", {
+ tmdbId: payload.mediaId,
+ mediaType: payload.mediaType,
+ status,
+ })
+ return {
+ success: true,
+ requestId: existingStatus.data.requests?.[0]?.id,
+ status: "already_requested",
+ }
+ }
+ }
+
+ // Submit the request
+ const url = `${config.url}/api/v1/request`
+ const body: Record = {
+ mediaType: payload.mediaType,
+ mediaId: payload.mediaId,
+ }
+
+ if (payload.seasons && payload.seasons.length > 0) {
+ body.seasons = payload.seasons
+ }
+
+ if (payload.is4k !== undefined) {
+ body.is4k = payload.is4k
+ }
+
+ const response = await fetch(url, {
+ method: "POST",
+ headers: {
+ "Content-Type": "application/json",
+ "X-Api-Key": config.apiKey,
+ },
+ body: JSON.stringify(body),
+ })
+
+ if (!response.ok) {
+ const errorData = await response.json().catch(() => ({}))
+ const errorMessage = errorData.message || response.statusText
+
+ // Check for specific error cases
+ if (response.status === 409 || errorMessage.includes("already")) {
+ return {
+ success: true,
+ status: "already_requested",
+ }
+ }
+
+ logger.error("Overseerr request submission failed", undefined, {
+ status: response.status,
+ error: errorMessage,
+ tmdbId: payload.mediaId,
+ mediaType: payload.mediaType,
+ })
+
+ return {
+ success: false,
+ error: errorMessage,
+ status: "failed",
+ }
+ }
+
+ const data = await response.json()
+
+ logger.info("Overseerr request submitted successfully", {
+ requestId: data.id,
+ tmdbId: payload.mediaId,
+ mediaType: payload.mediaType,
+ })
+
+ return {
+ success: true,
+ requestId: data.id,
+ status: "created",
+ }
+ } catch (error) {
+ logger.error("Error submitting Overseerr request", error)
+ return {
+ success: false,
+ error: error instanceof Error ? error.message : "Unknown error",
+ status: "failed",
+ }
+ }
+}
diff --git a/lib/instrumentation/node.ts b/lib/instrumentation/node.ts
new file mode 100644
index 0000000..9e891c2
--- /dev/null
+++ b/lib/instrumentation/node.ts
@@ -0,0 +1,185 @@
+/**
+ * Node.js runtime instrumentation - starts background jobs
+ * This file should only be imported dynamically from instrumentation.ts
+ * to prevent Edge Runtime from analyzing Node.js-only dependencies
+ */
+
+export async function startNodeInstrumentation() {
+ // Start watchlist sync polling
+ await startWatchlistSyncPolling()
+
+ // Start Discord bot polling
+ await startDiscordBotPolling()
+}
+
+async function startWatchlistSyncPolling() {
+ // Check if watchlist sync should attempt to start
+ const envSyncEnabled = process.env.ENABLE_WATCHLIST_SYNC !== "false"
+
+ if (!envSyncEnabled) {
+ if (process.env.NODE_ENV === "development") {
+ console.log("Watchlist sync disabled via ENABLE_WATCHLIST_SYNC=false")
+ }
+ return
+ }
+
+ try {
+ const { isWatchlistSyncEnabled, startWatchlistSyncPolling: startPolling, stopWatchlistSyncPolling } = await import("@/lib/watchlist/lock")
+ const { syncAllEnabledUsers } = await import("@/lib/watchlist/sync-service")
+
+ // Check database setting
+ const syncEnabled = await isWatchlistSyncEnabled()
+ if (!syncEnabled) {
+ if (process.env.NODE_ENV === "development") {
+ console.log("Watchlist sync disabled in database settings")
+ }
+ return
+ }
+
+ // Poll interval in milliseconds (default: 60 seconds)
+ const pollIntervalMs = parseInt(process.env.WATCHLIST_SYNC_POLL_INTERVAL_MS || "60000", 10)
+
+ // Start background polling
+ startPolling(
+ // onLockAcquired - called when we successfully acquire the lock
+ async () => {
+ try {
+ console.log("Watchlist sync lock acquired - running sync")
+ const result = await syncAllEnabledUsers()
+ console.log(`Watchlist sync completed: ${result.usersProcessed} users processed, ${result.usersSucceeded} succeeded, ${result.usersFailed} failed`)
+ } catch (error) {
+ console.error("Failed to run watchlist sync:", error)
+ }
+ },
+ // onLockLost - called if we lose the lock
+ async () => {
+ console.log("Watchlist sync lock lost")
+ },
+ pollIntervalMs
+ )
+
+ console.log(`Watchlist sync polling started (checking every ${pollIntervalMs / 1000} seconds)`)
+
+ // Graceful shutdown handlers
+ if (process.env.NODE_ENV !== "test") {
+ const shutdown = async () => {
+ try {
+ await stopWatchlistSyncPolling()
+ } catch (error) {
+ console.error("Error during watchlist sync shutdown:", error)
+ }
+ }
+
+ process.on("SIGINT", shutdown)
+ process.on("SIGTERM", shutdown)
+ }
+ } catch (error) {
+ if (process.env.NODE_ENV === "development") {
+ console.warn("Watchlist sync module could not be loaded:", error)
+ }
+ }
+}
+
+async function startDiscordBotPolling() {
+ // Check if bot should attempt to start (can be disabled via env var for manual control)
+ const envBotEnabled = process.env.ENABLE_DISCORD_BOT !== "false"
+
+ if (!envBotEnabled) {
+ if (process.env.NODE_ENV === "development") {
+ console.log("Discord bot disabled via ENABLE_DISCORD_BOT=false")
+ }
+ return
+ }
+
+ // Check database setting - if bot is disabled there, don't start polling
+ try {
+ const { isDiscordBotEnabled } = await import("@/lib/discord/lock")
+ const botEnabled = await isDiscordBotEnabled()
+ if (!botEnabled) {
+ if (process.env.NODE_ENV === "development") {
+ console.log("Discord bot disabled in database settings")
+ }
+ return
+ }
+ } catch (error) {
+ // If we can't check the database, proceed anyway (database might not be ready yet)
+ // The polling loop will check the setting periodically
+ if (process.env.NODE_ENV === "development") {
+ console.warn("Could not check Discord bot enabled status:", error)
+ }
+ }
+
+ // Use dynamic import with a string to prevent Next.js from analyzing the dependency tree
+ // This ensures Discord.js and its native dependencies aren't bundled
+ try {
+ const { startDiscordBotLockPolling, stopDiscordBotLockPolling, releaseDiscordBotLock } = await import("@/lib/discord/lock")
+ const botModule = await import("@/lib/discord/bot")
+ const bot = botModule.getDiscordBot()
+
+ // Poll interval in milliseconds (default: 60 seconds)
+ const pollIntervalMs = parseInt(process.env.DISCORD_BOT_POLL_INTERVAL_MS || "60000", 10)
+
+ // Store bot instance for cleanup
+ let botInstance: ReturnType | null = null
+
+ // Start background polling - this doesn't block server startup
+ // The bot will initialize automatically when the lock is acquired
+ startDiscordBotLockPolling(
+ // onLockAcquired - called when we successfully acquire the lock
+ async () => {
+ try {
+ await bot.initialize()
+ botInstance = bot
+ console.log("Discord bot initialized successfully (holding distributed lock)")
+ } catch (error) {
+ console.error("Failed to initialize Discord bot:", error)
+ // Release lock if initialization fails
+ await releaseDiscordBotLock()
+ }
+ },
+ // onLockLost - called if we lose the lock (e.g., another instance took it)
+ async () => {
+ try {
+ console.log("Discord bot lock lost - shutting down bot")
+ if (botInstance) {
+ await botInstance.destroy()
+ botInstance = null
+ }
+ } catch (error) {
+ console.error("Error shutting down bot after lock loss:", error)
+ }
+ },
+ pollIntervalMs
+ )
+
+ console.log(`Discord bot lock polling started (checking every ${pollIntervalMs / 1000} seconds)`)
+
+ // Graceful shutdown handlers - only register if we're in Node.js runtime
+ // Skip in test environments (Playwright) to avoid interference
+ if (process.env.NODE_ENV !== "test") {
+ const shutdown = async () => {
+ try {
+ await stopDiscordBotLockPolling()
+ // Bot instance will be destroyed by stopDiscordBotLockPolling if it exists
+ // But also try to destroy it here as a fallback
+ try {
+ await bot.destroy()
+ } catch {
+ // Ignore errors if bot wasn't initialized
+ }
+ } catch (error) {
+ console.error("Error during Discord bot shutdown:", error)
+ }
+ }
+
+ process.on("SIGINT", shutdown)
+ process.on("SIGTERM", shutdown)
+ }
+ } catch (error) {
+ // Silently fail if Discord.js can't be loaded (e.g., missing native dependencies)
+ // This allows the app to start even if the bot can't be initialized
+ if (process.env.NODE_ENV === "development") {
+ console.warn("Discord bot module could not be loaded:", error)
+ }
+ }
+}
diff --git a/lib/validations/watchlist.ts b/lib/validations/watchlist.ts
new file mode 100644
index 0000000..a6f3315
--- /dev/null
+++ b/lib/validations/watchlist.ts
@@ -0,0 +1,75 @@
+import { z } from "zod"
+
+/**
+ * Schema for a watchlist item from Plex API
+ */
+export const watchlistItemSchema = z.object({
+ ratingKey: z.string(),
+ guid: z.string(),
+ type: z.enum(["movie", "show"]),
+ title: z.string(),
+ year: z.number().optional(),
+ // External IDs parsed from Guid array
+ tmdbId: z.number().optional(),
+ tvdbId: z.number().optional(),
+ imdbId: z.string().optional(),
+})
+
+export type WatchlistItem = z.infer
+
+/**
+ * Schema for updating user watchlist sync settings
+ */
+export const updateWatchlistSyncSettingsSchema = z.object({
+ syncEnabled: z.boolean(),
+})
+
+export type UpdateWatchlistSyncSettings = z.infer
+
+/**
+ * Schema for global watchlist sync settings (admin)
+ */
+export const globalWatchlistSyncSettingsSchema = z.object({
+ watchlistSyncEnabled: z.boolean(),
+ watchlistSyncIntervalMinutes: z.number().min(15).max(1440), // 15 min to 24 hours
+})
+
+export type GlobalWatchlistSyncSettings = z.infer
+
+/**
+ * Schema for sync result
+ */
+export const syncResultSchema = z.object({
+ success: z.boolean(),
+ itemsSynced: z.number(),
+ itemsRequested: z.number(),
+ itemsSkipped: z.number(),
+ itemsFailed: z.number(),
+ errors: z.array(z.string()).optional(),
+})
+
+export type SyncResult = z.infer
+
+/**
+ * Schema for Overseerr request payload
+ */
+export const overseerrRequestPayloadSchema = z.object({
+ mediaType: z.enum(["movie", "tv"]),
+ mediaId: z.number(), // TMDB ID
+ seasons: z.array(z.number()).optional(), // For TV shows
+ is4k: z.boolean().optional(),
+})
+
+export type OverseerrRequestPayload = z.infer
+
+/**
+ * Schema for Overseerr request result
+ */
+export const overseerrRequestResultSchema = z.object({
+ success: z.boolean(),
+ requestId: z.number().optional(),
+ error: z.string().optional(),
+ status: z.enum(["created", "already_requested", "already_available", "failed"]),
+})
+
+export type OverseerrRequestResult = z.infer
diff --git a/lib/watchlist/lock.ts b/lib/watchlist/lock.ts
new file mode 100644
index 0000000..f49d84e
--- /dev/null
+++ b/lib/watchlist/lock.ts
@@ -0,0 +1,395 @@
+import { prisma } from "@/lib/prisma"
+import { createLogger } from "@/lib/utils/logger"
+import { randomBytes } from "crypto"
+
+const logger = createLogger("WATCHLIST_SYNC_LOCK")
+
+/**
+ * Checks if watchlist sync is enabled globally in the database
+ */
+export async function isWatchlistSyncEnabled(): Promise {
+ try {
+ const config = await prisma.config.findUnique({
+ where: { id: "config" },
+ select: { watchlistSyncEnabled: true },
+ })
+ return config?.watchlistSyncEnabled ?? false
+ } catch (error) {
+ logger.debug("Error checking if watchlist sync is enabled", { error })
+ return false
+ }
+}
+
+/**
+ * Gets the global sync interval in minutes
+ */
+export async function getWatchlistSyncInterval(): Promise {
+ try {
+ const config = await prisma.config.findUnique({
+ where: { id: "config" },
+ select: { watchlistSyncIntervalMinutes: true },
+ })
+ return config?.watchlistSyncIntervalMinutes ?? 60
+ } catch (error) {
+ logger.debug("Error getting watchlist sync interval", { error })
+ return 60
+ }
+}
+
+// Lock lease duration in milliseconds (30 seconds)
+const LOCK_LEASE_DURATION_MS = 30 * 1000
+
+// How often to renew the lock (every 10 seconds)
+const LOCK_RENEWAL_INTERVAL_MS = 10 * 1000
+
+// Generate a unique instance ID for this pod/process
+const INSTANCE_ID = `${process.env.HOSTNAME || "unknown"}-${process.pid}-${randomBytes(4).toString("hex")}`
+
+interface LockState {
+ isHeld: boolean
+ instanceId: string | null
+ renewalInterval?: NodeJS.Timeout
+ releaseLock?: () => Promise
+}
+
+let lockState: LockState = {
+ isHeld: false,
+ instanceId: null,
+}
+
+/**
+ * Attempts to acquire a distributed lock for watchlist sync using a database lease table.
+ * Only one instance across all pods can hold this lock at a time.
+ * Uses PostgreSQL's row-level locking for atomic operations.
+ */
+export async function acquireWatchlistSyncLock(): Promise {
+ if (lockState.isHeld && lockState.instanceId === INSTANCE_ID) {
+ logger.debug("Lock already held by this instance")
+ return true
+ }
+
+ try {
+ const now = new Date()
+ const expiresAt = new Date(now.getTime() + LOCK_LEASE_DURATION_MS)
+
+ // Use a transaction with row-level locking to atomically acquire the lock
+ const lockRecord = await prisma.$transaction(async (tx) => {
+ // First, clean up expired locks
+ await tx.watchlistSyncLock.deleteMany({
+ where: {
+ expiresAt: {
+ lt: now,
+ },
+ },
+ })
+
+ // Try to acquire the lock
+ const existing = await tx.watchlistSyncLock.findUnique({
+ where: { id: "watchlist-sync" },
+ })
+
+ if (!existing) {
+ // No lock exists, create one
+ return await tx.watchlistSyncLock.create({
+ data: {
+ id: "watchlist-sync",
+ instanceId: INSTANCE_ID,
+ acquiredAt: now,
+ expiresAt,
+ lastRenewedAt: now,
+ },
+ })
+ }
+
+ // Lock exists - check if it's expired or if we own it
+ if (existing.expiresAt < now || existing.instanceId === INSTANCE_ID) {
+ // Lock is expired or we already own it - update it
+ return await tx.watchlistSyncLock.update({
+ where: { id: "watchlist-sync" },
+ data: {
+ instanceId: INSTANCE_ID,
+ expiresAt,
+ lastRenewedAt: now,
+ updatedAt: now,
+ },
+ })
+ }
+
+ // Lock is held by another instance
+ return null
+ })
+
+ if (lockRecord && lockRecord.instanceId === INSTANCE_ID && lockRecord.expiresAt > now) {
+ logger.debug("Watchlist sync lock acquired successfully", { instanceId: INSTANCE_ID })
+ lockState.isHeld = true
+ lockState.instanceId = INSTANCE_ID
+
+ // Set up automatic lock renewal
+ lockState.renewalInterval = setInterval(async () => {
+ try {
+ const renewed = await renewWatchlistSyncLock()
+ if (!renewed) {
+ logger.debug("Failed to renew lock - another instance may have taken it")
+ lockState.isHeld = false
+ lockState.instanceId = null
+ if (lockState.renewalInterval) {
+ clearInterval(lockState.renewalInterval)
+ lockState.renewalInterval = undefined
+ }
+ }
+ } catch (error) {
+ logger.debug("Error renewing lock", { error })
+ lockState.isHeld = false
+ lockState.instanceId = null
+ if (lockState.renewalInterval) {
+ clearInterval(lockState.renewalInterval)
+ lockState.renewalInterval = undefined
+ }
+ }
+ }, LOCK_RENEWAL_INTERVAL_MS)
+
+ // Set up cleanup function
+ lockState.releaseLock = async () => {
+ await releaseWatchlistSyncLock()
+ }
+
+ return true
+ } else {
+ logger.debug("Watchlist sync lock not available", {
+ currentInstanceId: lockRecord?.instanceId,
+ expiresAt: lockRecord?.expiresAt,
+ })
+ return false
+ }
+ } catch (error) {
+ logger.debug("Error acquiring watchlist sync lock", { error })
+ return false
+ }
+}
+
+/**
+ * Renews the watchlist sync lock lease
+ */
+async function renewWatchlistSyncLock(): Promise {
+ if (!lockState.isHeld || lockState.instanceId !== INSTANCE_ID) {
+ return false
+ }
+
+ try {
+ const now = new Date()
+ const expiresAt = new Date(now.getTime() + LOCK_LEASE_DURATION_MS)
+
+ const result = await prisma.watchlistSyncLock.updateMany({
+ where: {
+ id: "watchlist-sync",
+ instanceId: INSTANCE_ID,
+ expiresAt: {
+ gt: now, // Only renew if not expired
+ },
+ },
+ data: {
+ expiresAt,
+ lastRenewedAt: now,
+ updatedAt: now,
+ },
+ })
+
+ if (result.count > 0) {
+ logger.debug("Lock renewed successfully")
+ return true
+ } else {
+ logger.debug("Lock renewal failed - lock may have been taken by another instance")
+ return false
+ }
+ } catch (error) {
+ logger.debug("Error renewing watchlist sync lock", { error })
+ return false
+ }
+}
+
+/**
+ * Releases the watchlist sync lock
+ */
+export async function releaseWatchlistSyncLock(): Promise {
+ if (!lockState.isHeld || lockState.instanceId !== INSTANCE_ID) {
+ return
+ }
+
+ try {
+ // Clear renewal interval
+ if (lockState.renewalInterval) {
+ clearInterval(lockState.renewalInterval)
+ lockState.renewalInterval = undefined
+ }
+
+ // Delete the lock record if we still own it
+ await prisma.watchlistSyncLock.deleteMany({
+ where: {
+ id: "watchlist-sync",
+ instanceId: INSTANCE_ID,
+ },
+ })
+
+ lockState.isHeld = false
+ lockState.instanceId = null
+ lockState.releaseLock = undefined
+
+ logger.debug("Watchlist sync lock released successfully")
+ } catch (error) {
+ logger.debug("Error releasing watchlist sync lock", { error })
+ // Reset state even if release fails
+ lockState.isHeld = false
+ lockState.instanceId = null
+ lockState.releaseLock = undefined
+ }
+}
+
+/**
+ * Checks if this instance currently holds the lock
+ */
+export function hasWatchlistSyncLock(): boolean {
+ return lockState.isHeld && lockState.instanceId === INSTANCE_ID
+}
+
+// Polling state for background lock acquisition
+interface PollingState {
+ isPolling: boolean
+ pollingInterval?: NodeJS.Timeout
+ syncInitialized: boolean
+ onLockAcquired?: () => Promise
+ onLockLost?: () => Promise
+}
+
+let pollingState: PollingState = {
+ isPolling: false,
+ syncInitialized: false,
+}
+
+/**
+ * Starts background polling to acquire the watchlist sync lock
+ * Tries every minute until the lock is acquired
+ *
+ * @param onLockAcquired - Callback when lock is successfully acquired
+ * @param onLockLost - Callback when lock is lost (optional)
+ * @param pollIntervalMs - How often to poll (default: 60 seconds)
+ */
+export async function startWatchlistSyncPolling(
+ onLockAcquired: () => Promise,
+ onLockLost?: () => Promise,
+ pollIntervalMs: number = 60 * 1000
+): Promise {
+ if (pollingState.isPolling) {
+ logger.debug("Lock polling already started")
+ return
+ }
+
+ pollingState.isPolling = true
+ pollingState.onLockAcquired = onLockAcquired
+ pollingState.onLockLost = onLockLost
+
+ logger.debug(`Starting watchlist sync lock polling (every ${pollIntervalMs / 1000} seconds)`)
+
+ // Check if sync is enabled before trying to acquire lock
+ const syncEnabled = await isWatchlistSyncEnabled()
+ if (!syncEnabled) {
+ logger.debug("Watchlist sync disabled in database - not attempting to acquire lock")
+ return
+ }
+
+ // Try immediately on startup
+ const immediateAcquired = await acquireWatchlistSyncLock()
+ if (immediateAcquired) {
+ logger.debug("Lock acquired immediately on startup")
+ pollingState.syncInitialized = true
+ await onLockAcquired()
+ } else {
+ logger.debug("Lock not available on startup, will poll periodically")
+ }
+
+ // Set up periodic polling
+ pollingState.pollingInterval = setInterval(async () => {
+ if (!pollingState.isPolling) {
+ return
+ }
+
+ try {
+ // Check if sync is enabled in database
+ const syncEnabled = await isWatchlistSyncEnabled()
+
+ if (!syncEnabled) {
+ // Sync is disabled - shut down if running
+ if (pollingState.syncInitialized) {
+ logger.debug("Watchlist sync disabled in database - shutting down sync")
+ pollingState.syncInitialized = false
+ if (onLockLost) {
+ await onLockLost()
+ }
+ await releaseWatchlistSyncLock()
+ }
+ return
+ }
+
+ // Check if we still hold the lock if sync is initialized
+ if (pollingState.syncInitialized) {
+ const stillHoldsLock = hasWatchlistSyncLock()
+ if (!stillHoldsLock) {
+ // We lost the lock
+ logger.debug("Lock lost during polling - shutting down sync")
+ pollingState.syncInitialized = false
+ if (onLockLost) {
+ await onLockLost()
+ }
+ return
+ }
+ // Lock renewal is handled by the renewal interval in acquireWatchlistSyncLock
+ return
+ }
+
+ // Sync not initialized but enabled - try to acquire the lock
+ const acquired = await acquireWatchlistSyncLock()
+
+ if (acquired) {
+ logger.debug("Lock acquired during polling - initializing sync")
+ pollingState.syncInitialized = true
+ await onLockAcquired()
+ }
+ } catch (error) {
+ logger.debug("Error during lock polling", { error })
+ }
+ }, pollIntervalMs)
+}
+
+/**
+ * Stops the background lock polling
+ */
+export async function stopWatchlistSyncPolling(): Promise {
+ if (!pollingState.isPolling) {
+ return
+ }
+
+ logger.debug("Stopping watchlist sync lock polling")
+ pollingState.isPolling = false
+
+ if (pollingState.pollingInterval) {
+ clearInterval(pollingState.pollingInterval)
+ pollingState.pollingInterval = undefined
+ }
+
+ // Call onLockLost if sync was initialized (to clean up)
+ if (pollingState.syncInitialized && pollingState.onLockLost) {
+ try {
+ await pollingState.onLockLost()
+ } catch (error) {
+ logger.debug("Error during lock lost callback on stop", { error })
+ }
+ }
+
+ // Release lock if we hold it
+ if (pollingState.syncInitialized) {
+ await releaseWatchlistSyncLock()
+ pollingState.syncInitialized = false
+ }
+
+ pollingState.onLockAcquired = undefined
+ pollingState.onLockLost = undefined
+}
diff --git a/lib/watchlist/plex-watchlist.ts b/lib/watchlist/plex-watchlist.ts
new file mode 100644
index 0000000..cd91e70
--- /dev/null
+++ b/lib/watchlist/plex-watchlist.ts
@@ -0,0 +1,190 @@
+/**
+ * Plex Watchlist API client
+ *
+ * Fetches user's watchlist from the Plex discover API
+ * API Endpoint: https://discover.provider.plex.tv/library/sections/watchlist/all
+ */
+
+import { getClientIdentifier } from "@/lib/connections/plex-core"
+import { createLogger } from "@/lib/utils/logger"
+import { WatchlistItem } from "@/lib/validations/watchlist"
+
+const logger = createLogger("PLEX_WATCHLIST")
+
+const PLEX_DISCOVER_URL = "https://discover.provider.plex.tv"
+
+interface PlexWatchlistResponse {
+ MediaContainer?: {
+ size?: number
+ Metadata?: PlexWatchlistMetadata[]
+ }
+}
+
+interface PlexWatchlistMetadata {
+ ratingKey: string
+ key: string
+ guid: string
+ type: "movie" | "show"
+ title: string
+ year?: number
+ Guid?: PlexGuid[]
+}
+
+interface PlexGuid {
+ id: string // e.g., "tmdb://12345", "tvdb://12345", "imdb://tt12345"
+}
+
+/**
+ * Parse external IDs from Plex GUID array
+ * @param guids Array of Plex GUIDs with format like "tmdb://12345"
+ */
+export function parseExternalIds(guids: PlexGuid[] | undefined): {
+ tmdbId?: number
+ tvdbId?: number
+ imdbId?: string
+} {
+ const result: { tmdbId?: number; tvdbId?: number; imdbId?: string } = {}
+
+ if (!guids || !Array.isArray(guids)) {
+ return result
+ }
+
+ for (const guid of guids) {
+ if (!guid.id) continue
+
+ if (guid.id.startsWith("tmdb://")) {
+ const id = parseInt(guid.id.replace("tmdb://", ""), 10)
+ if (!isNaN(id)) {
+ result.tmdbId = id
+ }
+ } else if (guid.id.startsWith("tvdb://")) {
+ const id = parseInt(guid.id.replace("tvdb://", ""), 10)
+ if (!isNaN(id)) {
+ result.tvdbId = id
+ }
+ } else if (guid.id.startsWith("imdb://")) {
+ result.imdbId = guid.id.replace("imdb://", "")
+ }
+ }
+
+ return result
+}
+
+/**
+ * Parse a single GUID string to extract external ID
+ * @param guid GUID string like "plex://movie/5d776833880197001ec939fa"
+ */
+export function parseWatchlistGuid(guid: string): {
+ type: "plex" | "tmdb" | "tvdb" | "imdb" | "unknown"
+ id: string
+} {
+ if (guid.startsWith("plex://")) {
+ // Format: plex://movie/5d776833880197001ec939fa
+ const parts = guid.split("/")
+ return { type: "plex", id: parts[parts.length - 1] }
+ } else if (guid.startsWith("tmdb://")) {
+ return { type: "tmdb", id: guid.replace("tmdb://", "") }
+ } else if (guid.startsWith("tvdb://")) {
+ return { type: "tvdb", id: guid.replace("tvdb://", "") }
+ } else if (guid.startsWith("imdb://")) {
+ return { type: "imdb", id: guid.replace("imdb://", "") }
+ }
+
+ return { type: "unknown", id: guid }
+}
+
+/**
+ * Get standard Plex headers for API requests
+ */
+function getPlexHeaders(userToken: string): Record {
+ return {
+ Accept: "application/json",
+ "X-Plex-Token": userToken,
+ "X-Plex-Client-Identifier": getClientIdentifier(),
+ "X-Plex-Product": "Plex Wrapped",
+ "X-Plex-Version": "1.0.0",
+ "X-Plex-Platform": "Web",
+ }
+}
+
+export interface GetWatchlistResult {
+ success: boolean
+ data?: WatchlistItem[]
+ error?: string
+}
+
+/**
+ * Fetch user's watchlist from Plex discover API
+ * @param userToken User's Plex auth token
+ */
+export async function getPlexWatchlist(userToken: string): Promise {
+ try {
+ logger.debug("Fetching Plex watchlist")
+
+ // Fetch watchlist from discover API
+ const url = `${PLEX_DISCOVER_URL}/library/sections/watchlist/all`
+ const response = await fetch(url, {
+ method: "GET",
+ headers: getPlexHeaders(userToken),
+ })
+
+ if (!response.ok) {
+ if (response.status === 401) {
+ logger.warn("Plex watchlist fetch failed - unauthorized (token may be expired)")
+ return { success: false, error: "Plex token is invalid or expired" }
+ }
+ logger.error("Plex watchlist fetch failed", undefined, {
+ status: response.status,
+ statusText: response.statusText,
+ })
+ return { success: false, error: `Failed to fetch watchlist: ${response.statusText}` }
+ }
+
+ const data: PlexWatchlistResponse = await response.json()
+
+ if (!data.MediaContainer?.Metadata) {
+ logger.debug("Watchlist is empty")
+ return { success: true, data: [] }
+ }
+
+ const items: WatchlistItem[] = data.MediaContainer.Metadata.map((item) => {
+ const externalIds = parseExternalIds(item.Guid)
+
+ return {
+ ratingKey: item.ratingKey,
+ guid: item.guid,
+ type: item.type,
+ title: item.title,
+ year: item.year,
+ tmdbId: externalIds.tmdbId,
+ tvdbId: externalIds.tvdbId,
+ imdbId: externalIds.imdbId,
+ }
+ })
+
+ logger.debug("Fetched watchlist items", { count: items.length })
+ return { success: true, data: items }
+ } catch (error) {
+ logger.error("Error fetching Plex watchlist", error)
+ return { success: false, error: "Failed to fetch watchlist" }
+ }
+}
+
+/**
+ * Validate a user's Plex token is still valid
+ * @param userToken User's Plex auth token
+ */
+export async function validatePlexToken(userToken: string): Promise {
+ try {
+ const url = "https://plex.tv/api/v2/user"
+ const response = await fetch(url, {
+ method: "GET",
+ headers: getPlexHeaders(userToken),
+ })
+
+ return response.ok
+ } catch (error) {
+ logger.debug("Error validating Plex token", { error })
+ return false
+ }
+}
diff --git a/lib/watchlist/sync-service.ts b/lib/watchlist/sync-service.ts
new file mode 100644
index 0000000..5daedce
--- /dev/null
+++ b/lib/watchlist/sync-service.ts
@@ -0,0 +1,363 @@
+/**
+ * Watchlist Sync Service
+ *
+ * Core orchestration for syncing Plex watchlists to Overseerr requests
+ */
+
+import { submitOverseerrRequest } from "@/lib/connections/overseerr"
+import { MediaType, WatchlistSyncStatus } from "@/lib/generated/prisma/client"
+import { prisma } from "@/lib/prisma"
+import { createLogger } from "@/lib/utils/logger"
+import { SyncResult } from "@/lib/validations/watchlist"
+import { getPlexWatchlist, validatePlexToken } from "./plex-watchlist"
+
+const logger = createLogger("WATCHLIST_SYNC")
+
+export interface SyncUserWatchlistResult {
+ success: boolean
+ data?: SyncResult
+ error?: string
+}
+
+/**
+ * Sync a single user's watchlist
+ * @param userId Database user ID
+ */
+export async function syncUserWatchlist(userId: string): Promise {
+ const startTime = Date.now()
+
+ try {
+ // Get user with their Plex token and sync settings
+ const user = await prisma.user.findUnique({
+ where: { id: userId },
+ select: {
+ id: true,
+ plexAuthToken: true,
+ email: true,
+ watchlistSyncSettings: true,
+ },
+ })
+
+ if (!user) {
+ return { success: false, error: "User not found" }
+ }
+
+ if (!user.plexAuthToken) {
+ logger.warn("User has no Plex auth token", { userId })
+ await updateSyncSettings(userId, {
+ lastSyncStatus: "failed",
+ lastSyncError: "No Plex auth token - please log in again",
+ })
+ return { success: false, error: "No Plex auth token" }
+ }
+
+ // Validate token is still valid
+ const tokenValid = await validatePlexToken(user.plexAuthToken)
+ if (!tokenValid) {
+ logger.warn("Plex token is invalid or expired", { userId })
+ await updateSyncSettings(userId, {
+ lastSyncStatus: "failed",
+ lastSyncError: "Plex token expired - please log in again",
+ })
+ return { success: false, error: "Plex token is invalid or expired" }
+ }
+
+ // Get Overseerr configuration
+ const overseerr = await prisma.overseerr.findFirst({
+ where: { isActive: true },
+ })
+
+ if (!overseerr) {
+ logger.warn("No active Overseerr configured", { userId })
+ await updateSyncSettings(userId, {
+ lastSyncStatus: "failed",
+ lastSyncError: "No Overseerr server configured",
+ })
+ return { success: false, error: "No Overseerr server configured" }
+ }
+
+ // Fetch watchlist from Plex
+ const watchlistResult = await getPlexWatchlist(user.plexAuthToken)
+ if (!watchlistResult.success || !watchlistResult.data) {
+ const error = watchlistResult.error || "Failed to fetch watchlist"
+ logger.error("Failed to fetch Plex watchlist", undefined, { userId, error })
+ await updateSyncSettings(userId, {
+ lastSyncStatus: "failed",
+ lastSyncError: error,
+ })
+ return { success: false, error }
+ }
+
+ const watchlistItems = watchlistResult.data
+ logger.info("Fetched Plex watchlist", { userId, itemCount: watchlistItems.length })
+
+ // Get existing sync history for comparison
+ const existingHistory = await prisma.watchlistSyncHistory.findMany({
+ where: { userId },
+ select: { plexRatingKey: true, status: true },
+ })
+ const existingKeys = new Map(existingHistory.map((h) => [h.plexRatingKey, h.status]))
+
+ // Process each watchlist item
+ let itemsSynced = 0
+ let itemsRequested = 0
+ let itemsSkipped = 0
+ let itemsFailed = 0
+ const errors: string[] = []
+
+ for (const item of watchlistItems) {
+ try {
+ // Skip if already processed and in a terminal state
+ const existingStatus = existingKeys.get(item.ratingKey)
+ if (
+ existingStatus &&
+ (existingStatus === WatchlistSyncStatus.REQUESTED ||
+ existingStatus === WatchlistSyncStatus.ALREADY_AVAILABLE ||
+ existingStatus === WatchlistSyncStatus.ALREADY_REQUESTED)
+ ) {
+ itemsSkipped++
+ continue
+ }
+
+ // Need TMDB ID to request in Overseerr
+ if (!item.tmdbId) {
+ logger.debug("Skipping item without TMDB ID", {
+ title: item.title,
+ ratingKey: item.ratingKey,
+ })
+ itemsSkipped++
+ continue
+ }
+
+ // Submit request to Overseerr
+ const mediaType = item.type === "movie" ? "movie" : "tv"
+ const requestResult = await submitOverseerrRequest(
+ { name: overseerr.name, url: overseerr.url, apiKey: overseerr.apiKey },
+ { mediaType, mediaId: item.tmdbId }
+ )
+
+ // Map Prisma MediaType
+ const prismaMediaType = item.type === "movie" ? MediaType.MOVIE : MediaType.TV_SERIES
+
+ // Determine status based on result
+ let syncStatus: WatchlistSyncStatus
+ let requestedAt: Date | null = null
+
+ if (requestResult.status === "created") {
+ syncStatus = WatchlistSyncStatus.REQUESTED
+ requestedAt = new Date()
+ itemsRequested++
+ } else if (requestResult.status === "already_available") {
+ syncStatus = WatchlistSyncStatus.ALREADY_AVAILABLE
+ } else if (requestResult.status === "already_requested") {
+ syncStatus = WatchlistSyncStatus.ALREADY_REQUESTED
+ } else {
+ syncStatus = WatchlistSyncStatus.FAILED
+ itemsFailed++
+ if (requestResult.error) {
+ errors.push(`${item.title}: ${requestResult.error}`)
+ }
+ }
+
+ // Upsert sync history record
+ await prisma.watchlistSyncHistory.upsert({
+ where: {
+ userId_plexRatingKey: {
+ userId,
+ plexRatingKey: item.ratingKey,
+ },
+ },
+ create: {
+ userId,
+ plexRatingKey: item.ratingKey,
+ guid: item.guid,
+ mediaType: prismaMediaType,
+ title: item.title,
+ year: item.year,
+ tmdbId: item.tmdbId,
+ tvdbId: item.tvdbId,
+ imdbId: item.imdbId,
+ status: syncStatus,
+ requestedAt,
+ overseerrRequestId: requestResult.requestId,
+ },
+ update: {
+ status: syncStatus,
+ requestedAt: requestedAt || undefined,
+ overseerrRequestId: requestResult.requestId || undefined,
+ },
+ })
+
+ itemsSynced++
+ } catch (error) {
+ logger.error("Error processing watchlist item", error, {
+ title: item.title,
+ userId,
+ })
+ itemsFailed++
+ errors.push(`${item.title}: Processing error`)
+ }
+ }
+
+ // Update sync settings with results
+ const syncStatus = itemsFailed > 0 ? (itemsSynced > 0 ? "partial" : "failed") : "success"
+ await updateSyncSettings(userId, {
+ lastSyncAt: new Date(),
+ lastSyncStatus: syncStatus,
+ lastSyncError: errors.length > 0 ? errors.slice(0, 3).join("; ") : null,
+ itemsSynced,
+ itemsRequested,
+ })
+
+ const duration = Date.now() - startTime
+ logger.info("Watchlist sync completed", {
+ userId,
+ itemsSynced,
+ itemsRequested,
+ itemsSkipped,
+ itemsFailed,
+ durationMs: duration,
+ })
+
+ return {
+ success: true,
+ data: {
+ success: true,
+ itemsSynced,
+ itemsRequested,
+ itemsSkipped,
+ itemsFailed,
+ errors: errors.length > 0 ? errors : undefined,
+ },
+ }
+ } catch (error) {
+ logger.error("Watchlist sync failed", error, { userId })
+ await updateSyncSettings(userId, {
+ lastSyncStatus: "failed",
+ lastSyncError: error instanceof Error ? error.message : "Unknown error",
+ })
+ return {
+ success: false,
+ error: error instanceof Error ? error.message : "Sync failed",
+ }
+ }
+}
+
+/**
+ * Helper to update sync settings
+ */
+async function updateSyncSettings(
+ userId: string,
+ data: {
+ lastSyncAt?: Date
+ lastSyncStatus?: string
+ lastSyncError?: string | null
+ itemsSynced?: number
+ itemsRequested?: number
+ }
+): Promise {
+ // Get current settings to calculate cumulative totals
+ const current = await prisma.watchlistSyncSettings.findUnique({
+ where: { userId },
+ select: { totalItemsSynced: true, totalItemsRequested: true },
+ })
+
+ const incrementSynced = data.itemsSynced ?? 0
+ const incrementRequested = data.itemsRequested ?? 0
+
+ await prisma.watchlistSyncSettings.upsert({
+ where: { userId },
+ create: {
+ userId,
+ syncEnabled: false,
+ ...data,
+ totalItemsSynced: incrementSynced,
+ totalItemsRequested: incrementRequested,
+ },
+ update: {
+ ...data,
+ totalItemsSynced: (current?.totalItemsSynced ?? 0) + incrementSynced,
+ totalItemsRequested: (current?.totalItemsRequested ?? 0) + incrementRequested,
+ },
+ })
+}
+
+/**
+ * Sync all users who have sync enabled and are due for a sync
+ * Called by the background job
+ */
+export async function syncAllEnabledUsers(): Promise<{
+ usersProcessed: number
+ usersSucceeded: number
+ usersFailed: number
+}> {
+ const startTime = Date.now()
+
+ try {
+ // Get global sync interval
+ const config = await prisma.config.findUnique({
+ where: { id: "config" },
+ select: { watchlistSyncIntervalMinutes: true },
+ })
+ const intervalMinutes = config?.watchlistSyncIntervalMinutes ?? 60
+
+ // Find users who need to be synced
+ const cutoffTime = new Date(Date.now() - intervalMinutes * 60 * 1000)
+
+ const usersToSync = await prisma.watchlistSyncSettings.findMany({
+ where: {
+ syncEnabled: true,
+ OR: [{ lastSyncAt: null }, { lastSyncAt: { lt: cutoffTime } }],
+ },
+ select: { userId: true },
+ take: 50, // Limit batch size
+ })
+
+ logger.info("Starting batch watchlist sync", {
+ usersToSync: usersToSync.length,
+ intervalMinutes,
+ })
+
+ let usersSucceeded = 0
+ let usersFailed = 0
+
+ // Process users sequentially to avoid rate limiting
+ for (const { userId } of usersToSync) {
+ try {
+ const result = await syncUserWatchlist(userId)
+ if (result.success) {
+ usersSucceeded++
+ } else {
+ usersFailed++
+ }
+
+ // Small delay between users to be respectful of APIs
+ await new Promise((resolve) => setTimeout(resolve, 1000))
+ } catch (error) {
+ logger.error("Error syncing user watchlist in batch", error, { userId })
+ usersFailed++
+ }
+ }
+
+ const duration = Date.now() - startTime
+ logger.info("Batch watchlist sync completed", {
+ usersProcessed: usersToSync.length,
+ usersSucceeded,
+ usersFailed,
+ durationMs: duration,
+ })
+
+ return {
+ usersProcessed: usersToSync.length,
+ usersSucceeded,
+ usersFailed,
+ }
+ } catch (error) {
+ logger.error("Batch watchlist sync failed", error)
+ return {
+ usersProcessed: 0,
+ usersSucceeded: 0,
+ usersFailed: 0,
+ }
+ }
+}
diff --git a/prisma/migrations/20251230031502_add_watchlist_sync/migration.sql b/prisma/migrations/20251230031502_add_watchlist_sync/migration.sql
new file mode 100644
index 0000000..d7798c6
--- /dev/null
+++ b/prisma/migrations/20251230031502_add_watchlist_sync/migration.sql
@@ -0,0 +1,82 @@
+-- CreateEnum
+CREATE TYPE "WatchlistSyncStatus" AS ENUM ('SYNCED', 'REQUESTED', 'ALREADY_AVAILABLE', 'ALREADY_REQUESTED', 'FAILED', 'REMOVED_FROM_WATCHLIST');
+
+-- AlterTable
+ALTER TABLE "Config" ADD COLUMN "watchlistSyncEnabled" BOOLEAN NOT NULL DEFAULT false,
+ADD COLUMN "watchlistSyncIntervalMinutes" INTEGER NOT NULL DEFAULT 60;
+
+-- AlterTable
+ALTER TABLE "User" ADD COLUMN "plexAuthToken" TEXT;
+
+-- CreateTable
+CREATE TABLE "WatchlistSyncSettings" (
+ "id" TEXT NOT NULL,
+ "userId" TEXT NOT NULL,
+ "syncEnabled" BOOLEAN NOT NULL DEFAULT false,
+ "lastSyncAt" TIMESTAMP(3),
+ "lastSyncStatus" TEXT,
+ "lastSyncError" TEXT,
+ "itemsSynced" INTEGER NOT NULL DEFAULT 0,
+ "itemsRequested" INTEGER NOT NULL DEFAULT 0,
+ "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ "updatedAt" TIMESTAMP(3) NOT NULL,
+
+ CONSTRAINT "WatchlistSyncSettings_pkey" PRIMARY KEY ("id")
+);
+
+-- CreateTable
+CREATE TABLE "WatchlistSyncHistory" (
+ "id" TEXT NOT NULL,
+ "userId" TEXT NOT NULL,
+ "plexRatingKey" TEXT NOT NULL,
+ "guid" TEXT NOT NULL,
+ "mediaType" "MediaType" NOT NULL,
+ "title" TEXT NOT NULL,
+ "year" INTEGER,
+ "tmdbId" INTEGER,
+ "tvdbId" INTEGER,
+ "imdbId" TEXT,
+ "syncedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ "requestedAt" TIMESTAMP(3),
+ "overseerrRequestId" INTEGER,
+ "status" "WatchlistSyncStatus" NOT NULL DEFAULT 'SYNCED',
+
+ CONSTRAINT "WatchlistSyncHistory_pkey" PRIMARY KEY ("id")
+);
+
+-- CreateTable
+CREATE TABLE "WatchlistSyncLock" (
+ "id" TEXT NOT NULL DEFAULT 'watchlist-sync',
+ "instanceId" TEXT NOT NULL,
+ "acquiredAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ "expiresAt" TIMESTAMP(3) NOT NULL,
+ "lastRenewedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ "updatedAt" TIMESTAMP(3) NOT NULL,
+
+ CONSTRAINT "WatchlistSyncLock_pkey" PRIMARY KEY ("id")
+);
+
+-- CreateIndex
+CREATE UNIQUE INDEX "WatchlistSyncSettings_userId_key" ON "WatchlistSyncSettings"("userId");
+
+-- CreateIndex
+CREATE INDEX "WatchlistSyncSettings_syncEnabled_idx" ON "WatchlistSyncSettings"("syncEnabled");
+
+-- CreateIndex
+CREATE INDEX "WatchlistSyncHistory_userId_syncedAt_idx" ON "WatchlistSyncHistory"("userId", "syncedAt");
+
+-- CreateIndex
+CREATE INDEX "WatchlistSyncHistory_status_idx" ON "WatchlistSyncHistory"("status");
+
+-- CreateIndex
+CREATE UNIQUE INDEX "WatchlistSyncHistory_userId_plexRatingKey_key" ON "WatchlistSyncHistory"("userId", "plexRatingKey");
+
+-- CreateIndex
+CREATE INDEX "WatchlistSyncLock_expiresAt_idx" ON "WatchlistSyncLock"("expiresAt");
+
+-- AddForeignKey
+ALTER TABLE "WatchlistSyncSettings" ADD CONSTRAINT "WatchlistSyncSettings_userId_fkey" FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE;
+
+-- AddForeignKey
+ALTER TABLE "WatchlistSyncHistory" ADD CONSTRAINT "WatchlistSyncHistory_userId_fkey" FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE;
diff --git a/prisma/migrations/20251230034346_add_watchlist_cumulative_counters/migration.sql b/prisma/migrations/20251230034346_add_watchlist_cumulative_counters/migration.sql
new file mode 100644
index 0000000..6af495d
--- /dev/null
+++ b/prisma/migrations/20251230034346_add_watchlist_cumulative_counters/migration.sql
@@ -0,0 +1,3 @@
+-- AlterTable
+ALTER TABLE "WatchlistSyncSettings" ADD COLUMN "totalItemsRequested" INTEGER NOT NULL DEFAULT 0,
+ADD COLUMN "totalItemsSynced" INTEGER NOT NULL DEFAULT 0;
diff --git a/prisma/schema.prisma b/prisma/schema.prisma
index 32d5b66..7b7b062 100644
--- a/prisma/schema.prisma
+++ b/prisma/schema.prisma
@@ -12,26 +12,29 @@ datasource db {
// User model
model User {
- id String @id @default(cuid())
- name String?
- email String? @unique
- emailVerified DateTime?
- image String?
- plexUserId String? @unique
- jellyfinUserId String? @unique // Jellyfin user ID (if user was created via Jellyfin invite)
- primaryAuthService String? // Track which service user authenticated with first: "plex" or "jellyfin"
- isAdmin Boolean @default(false)
- onboardingStatus Json? @default("{\"plex\": false, \"jellyfin\": false}") // Track onboarding completion per service
- plexWrapped PlexWrapped[]
- llmUsage LLMUsage[]
- chatConversations ChatConversation[]
- inviteUsages InviteUsage[]
- discordConnection DiscordConnection?
- discordOAuthStates DiscordOAuthState[]
- userMediaMarks UserMediaMark[]
- userWatchIntents UserWatchIntent[]
- createdAt DateTime @default(now())
- updatedAt DateTime @updatedAt
+ id String @id @default(cuid())
+ name String?
+ email String? @unique
+ emailVerified DateTime?
+ image String?
+ plexUserId String? @unique
+ plexAuthToken String? // Plex auth token (stored during login for watchlist API access)
+ jellyfinUserId String? @unique // Jellyfin user ID (if user was created via Jellyfin invite)
+ primaryAuthService String? // Track which service user authenticated with first: "plex" or "jellyfin"
+ isAdmin Boolean @default(false)
+ onboardingStatus Json? @default("{\"plex\": false, \"jellyfin\": false}") // Track onboarding completion per service
+ plexWrapped PlexWrapped[]
+ llmUsage LLMUsage[]
+ chatConversations ChatConversation[]
+ inviteUsages InviteUsage[]
+ discordConnection DiscordConnection?
+ discordOAuthStates DiscordOAuthState[]
+ userMediaMarks UserMediaMark[]
+ userWatchIntents UserWatchIntent[]
+ watchlistSyncSettings WatchlistSyncSettings?
+ watchlistSyncHistory WatchlistSyncHistory[]
+ createdAt DateTime @default(now())
+ updatedAt DateTime @updatedAt
@@index([plexUserId])
@@index([jellyfinUserId])
@@ -241,13 +244,15 @@ model LLMUsage {
// Application Configuration (singleton pattern)
model Config {
- id String @id @default("config")
- llmDisabled Boolean @default(false) // When true, LLM calls are disabled and mock data is returned
- wrappedEnabled Boolean @default(true) // When false, wrapped generation is disabled
- wrappedGenerationStartDate DateTime? // Optional start date for when generation is allowed (e.g., Nov 20). Year is auto-determined from this date.
- wrappedGenerationEndDate DateTime? // Optional end date for when generation is allowed (e.g., Jan 31)
- updatedAt DateTime @updatedAt
- updatedBy String? // User ID who last updated this config
+ id String @id @default("config")
+ llmDisabled Boolean @default(false) // When true, LLM calls are disabled and mock data is returned
+ wrappedEnabled Boolean @default(true) // When false, wrapped generation is disabled
+ wrappedGenerationStartDate DateTime? // Optional start date for when generation is allowed (e.g., Nov 20). Year is auto-determined from this date.
+ wrappedGenerationEndDate DateTime? // Optional end date for when generation is allowed (e.g., Jan 31)
+ watchlistSyncEnabled Boolean @default(false) // Global toggle for watchlist sync feature
+ watchlistSyncIntervalMinutes Int @default(60) // Default sync interval in minutes (minimum 15)
+ updatedAt DateTime @updatedAt
+ updatedBy String? // User ID who last updated this config
}
model DiscordIntegration {
@@ -556,3 +561,67 @@ model Announcement {
@@index([isActive, expiresAt])
@@index([priority])
}
+
+// Watchlist Sync - Per-user sync settings
+model WatchlistSyncSettings {
+ id String @id @default(cuid())
+ userId String @unique
+ user User @relation(fields: [userId], references: [id], onDelete: Cascade)
+ syncEnabled Boolean @default(false)
+ lastSyncAt DateTime?
+ lastSyncStatus String? // "success", "partial", "failed"
+ lastSyncError String?
+ itemsSynced Int @default(0) // Items processed in last sync
+ itemsRequested Int @default(0) // Items requested in last sync
+ totalItemsSynced Int @default(0) // Cumulative total items synced
+ totalItemsRequested Int @default(0) // Cumulative total requests created
+ createdAt DateTime @default(now())
+ updatedAt DateTime @updatedAt
+
+ @@index([syncEnabled])
+}
+
+// Watchlist Sync - Track synced items to avoid duplicates
+model WatchlistSyncHistory {
+ id String @id @default(cuid())
+ userId String
+ user User @relation(fields: [userId], references: [id], onDelete: Cascade)
+ plexRatingKey String // Plex discover rating key
+ guid String // GUID string (e.g., "tmdb://12345")
+ mediaType MediaType
+ title String
+ year Int?
+ tmdbId Int?
+ tvdbId Int?
+ imdbId String?
+ syncedAt DateTime @default(now())
+ requestedAt DateTime? // When submitted to Overseerr
+ overseerrRequestId Int? // Overseerr request ID if submitted
+ status WatchlistSyncStatus @default(SYNCED)
+
+ @@unique([userId, plexRatingKey])
+ @@index([userId, syncedAt])
+ @@index([status])
+}
+
+// Watchlist Sync - Distributed lock (mirrors DiscordBotLock)
+model WatchlistSyncLock {
+ id String @id @default("watchlist-sync")
+ instanceId String // Unique identifier for this pod/instance
+ acquiredAt DateTime @default(now())
+ expiresAt DateTime // Lock expires if not renewed (handles crashed pods)
+ lastRenewedAt DateTime @default(now())
+ createdAt DateTime @default(now())
+ updatedAt DateTime @updatedAt
+
+ @@index([expiresAt])
+}
+
+enum WatchlistSyncStatus {
+ SYNCED // Item tracked but not requested
+ REQUESTED // Submitted to Overseerr
+ ALREADY_AVAILABLE // Already in library
+ ALREADY_REQUESTED // Already has pending request
+ FAILED // Request submission failed
+ REMOVED_FROM_WATCHLIST // Removed from Plex watchlist
+}