diff --git a/Dockerfile b/Dockerfile index 039a7210..bac187e4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,13 +19,16 @@ RUN --mount=type=cache,target=/var/cache/apt/ \ WORKDIR /app/ +# Create the VERSION file directly in the final stage +RUN echo "dockerfile-build-$(date +%Y%m%d)" > VERSION + COPY hotspot-entrypoint.sh docker-healthcheck.sh / COPY --from=build /app/build/libs/piped-1.0-all.jar /app/piped.jar -COPY VERSION . +# COPY VERSION . # Remove or comment out this line EXPOSE 8080 HEALTHCHECK --interval=30s --timeout=10s --start-period=30s --retries=3 CMD /docker-healthcheck.sh -ENTRYPOINT ["/hotspot-entrypoint.sh"] +ENTRYPOINT ["/hotspot-entrypoint.sh"] \ No newline at end of file diff --git a/README.md b/README.md index b7f68b3e..a17856f8 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,46 @@ +# How to Use + +This is a temporary hack to fix new videos not appearing on the feed. + +1. Follow self-hosting instructions [here](https://docs.piped.video/docs/self-hosting/) up to `cd Piped-Docker`. +2. Inside `Piped-Docker/template`, modify the relevant template with the following: +```dockerfile + piped: # This is the backend service + # image: 1337kavin/piped:latest # Removed this line + build: # Build the image with the workaround + context: https://github.com/firejoust/Piped-Backend.git # Tells docker-compose where to get the source + dockerfile: Dockerfile # Specifies the Dockerfile within the context +``` +3. Append the following to `Piped-Docker/config/config.properties`: +```sh +# Enable feed polling workaround +ENABLE_FEED_POLLING=true +POLLING_INTERVAL_MINUTES=15 +POLLING_FETCH_LIMIT_PER_CHANNEL=10 +FEED_RETENTION=30 +``` +4. After running `./configure-instance.sh` and selecting the relevant template, run the following to start the container: +```sh +sudo DOCKER_BUILDKIT=1 docker-compose up -d +``` + +### Workaround Configuration (`config.properties`) + +* **`ENABLE_FEED_POLLING`**: + * Set to `true` to enable the backend to periodically check subscribed channels for new videos, bypassing PubSub. Set to `false` (default) to disable this polling. + +* **`POLLING_INTERVAL_MINUTES`**: + * Specifies how often (in minutes) the backend should perform the polling cycle for all subscribed channels. Default: `15`. + +* **`POLLING_FETCH_LIMIT_PER_CHANNEL`**: + * Limits how many of the *most recent* videos are checked for each channel during a polling cycle. Helps improve performance and reduce load. Default: `10`. + +* **`FEED_RETENTION`**: + * Determines how many days old a video can be and still be added to the feed database (by polling or PubSub). Also affects how long videos are kept before cleanup. Default: `30`. + +# README.md: + +```markdown # Piped-Backend An advanced open-source privacy friendly alternative to YouTube, crafted with the help of [NewPipeExtractor](https://github.com/TeamNewPipe/NewPipeExtractor). @@ -9,3 +52,4 @@ An advanced open-source privacy friendly alternative to YouTube, crafted with th ## Community Projects - See https://github.com/TeamPiped/Piped#made-with-piped +``` \ No newline at end of file diff --git a/src/main/java/me/kavin/piped/Main.java b/src/main/java/me/kavin/piped/Main.java index 624d5f8e..d24a3da1 100644 --- a/src/main/java/me/kavin/piped/Main.java +++ b/src/main/java/me/kavin/piped/Main.java @@ -18,14 +18,21 @@ import org.hibernate.Session; import org.hibernate.StatelessSession; import org.schabi.newpipe.extractor.NewPipe; +import org.schabi.newpipe.extractor.channel.ChannelInfo; // Added import +import org.schabi.newpipe.extractor.channel.tabs.ChannelTabInfo; // Added import +import org.schabi.newpipe.extractor.channel.tabs.ChannelTabs; // Added import +import org.schabi.newpipe.extractor.exceptions.ExtractionException; // Added import +import org.schabi.newpipe.extractor.linkhandler.ListLinkHandler; // Added import import org.schabi.newpipe.extractor.localization.ContentCountry; import org.schabi.newpipe.extractor.localization.Localization; import org.schabi.newpipe.extractor.services.youtube.YoutubeJavaScriptPlayerManager; import org.schabi.newpipe.extractor.services.youtube.YoutubeParsingHelper; import org.schabi.newpipe.extractor.services.youtube.extractors.YoutubeStreamExtractor; import org.schabi.newpipe.extractor.stream.StreamInfo; +import org.schabi.newpipe.extractor.stream.StreamInfoItem; // Added import import rocks.kavin.reqwest4j.ReqwestUtils; +import java.io.IOException; // Added import import java.security.Security; import java.util.*; import java.util.concurrent.CompletableFuture; @@ -92,13 +99,13 @@ public static void main(String[] args) throws Exception { MatrixHelper.MATRIX_TOKEN) )); - new Timer().scheduleAtFixedRate(new TimerTask() { + new Timer("ThrottlingCache-Clear", true).scheduleAtFixedRate(new TimerTask() { @Override public void run() { System.out.printf("ThrottlingCache: %o entries%n", YoutubeJavaScriptPlayerManager.getThrottlingParametersCacheSize()); YoutubeJavaScriptPlayerManager.clearThrottlingParametersCache(); } - }, 0, TimeUnit.MINUTES.toMillis(60)); + }, TimeUnit.MINUTES.toMillis(1), TimeUnit.MINUTES.toMillis(60)); // Start after 1 min, repeat hourly if (!Constants.DISABLE_SERVER) new Thread(() -> { @@ -119,143 +126,278 @@ public void run() { // Close the HikariCP connection pool Runtime.getRuntime().addShutdownHook(new Thread(DatabaseSessionFactory::close)); - if (Constants.DISABLE_TIMERS) - return; - - new Timer().scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) { - - List channelIds = s.createNativeQuery("SELECT id FROM pubsub WHERE subbed_at < :subbedTime AND id IN (" + - "SELECT DISTINCT channel FROM users_subscribed" + - " UNION " + - "SELECT id FROM unauthenticated_subscriptions WHERE subscribed_at > :unauthSubbed" + - ")", String.class) - .setParameter("subbedTime", System.currentTimeMillis() - TimeUnit.DAYS.toMillis(4)) - .setParameter("unauthSubbed", System.currentTimeMillis() - TimeUnit.DAYS.toMillis(Constants.SUBSCRIPTIONS_EXPIRY)) - .stream() - .filter(Objects::nonNull) - .distinct() - .collect(Collectors.toCollection(ObjectArrayList::new)); - - Collections.shuffle(channelIds); - - var queue = new ConcurrentLinkedQueue<>(channelIds); - - System.out.println("PubSub: queue size - " + queue.size() + " channels"); - - for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) { - new Thread(() -> { - - Object o = new Object(); - - String channelId; - while ((channelId = queue.poll()) != null) { - try { - CompletableFuture future = PubSubHelper.subscribePubSub(channelId); - - if (future == null) - continue; + if (Constants.DISABLE_TIMERS) { + System.out.println("Timers are disabled."); // Add log message + } else { + // --- Existing Timer Tasks --- + // PubSub Resubscription Timer + new Timer("PubSub-Resubscribe", true).scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) { + + List channelIds = s.createNativeQuery("SELECT id FROM pubsub WHERE subbed_at < :subbedTime AND id IN (" + + "SELECT DISTINCT channel FROM users_subscribed" + + " UNION " + + "SELECT id FROM unauthenticated_subscriptions WHERE subscribed_at > :unauthSubbed" + + ")", String.class) + .setParameter("subbedTime", System.currentTimeMillis() - TimeUnit.DAYS.toMillis(4)) + .setParameter("unauthSubbed", System.currentTimeMillis() - TimeUnit.DAYS.toMillis(Constants.SUBSCRIPTIONS_EXPIRY)) + .stream() + .filter(Objects::nonNull) + .distinct() + .collect(Collectors.toCollection(ObjectArrayList::new)); + + Collections.shuffle(channelIds); + + var queue = new ConcurrentLinkedQueue<>(channelIds); + + System.out.println("PubSub: queue size - " + queue.size() + " channels"); + + for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) { + new Thread(() -> { + + Object o = new Object(); + + String channelId; + while ((channelId = queue.poll()) != null) { + try { + CompletableFuture future = PubSubHelper.subscribePubSub(channelId); + + if (future == null) + continue; + + future.whenComplete((resp, throwable) -> { + synchronized (o) { + o.notify(); + } + }); - future.whenComplete((resp, throwable) -> { synchronized (o) { - o.notify(); + o.wait(); } - }); - synchronized (o) { - o.wait(); + } catch (Exception e) { + ExceptionHandler.handle(e); } - - } catch (Exception e) { - ExceptionHandler.handle(e); } - } - }, "PubSub-" + i).start(); - } + }, "PubSub-" + i).start(); + } - } catch (Exception e) { - e.printStackTrace(); + } catch (Exception e) { + e.printStackTrace(); + } } - } - }, 0, TimeUnit.MINUTES.toMillis(90)); - - new Timer().scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) { - - s.createNativeQuery("SELECT channel_id.channel FROM " + - "(SELECT DISTINCT channel FROM users_subscribed UNION SELECT id FROM unauthenticated_subscriptions WHERE subscribed_at > :unauthSubbed) " + - "channel_id LEFT JOIN pubsub on pubsub.id = channel_id.channel " + - "WHERE pubsub.id IS NULL", String.class) - .setParameter("unauthSubbed", System.currentTimeMillis() - TimeUnit.DAYS.toMillis(Constants.SUBSCRIPTIONS_EXPIRY)) - .getResultStream() - .parallel() - .filter(ChannelHelpers::isValidId) - .forEach(id -> Multithreading.runAsyncLimitedPubSub(() -> { - try (StatelessSession sess = DatabaseSessionFactory.createStatelessSession()) { - var pubsub = new PubSub(id, -1); - var tr = sess.beginTransaction(); - sess.insert(pubsub); - tr.commit(); - } - })); + }, TimeUnit.MINUTES.toMillis(5), TimeUnit.MINUTES.toMillis(90)); // Start after 5 min, repeat every 90 min + + // PubSub Missing Channel Check Timer + new Timer("PubSub-MissingCheck", true).scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) { + + s.createNativeQuery("SELECT channel_id.channel FROM " + + "(SELECT DISTINCT channel FROM users_subscribed UNION SELECT id FROM unauthenticated_subscriptions WHERE subscribed_at > :unauthSubbed) " + + "channel_id LEFT JOIN pubsub on pubsub.id = channel_id.channel " + + "WHERE pubsub.id IS NULL", String.class) + .setParameter("unauthSubbed", System.currentTimeMillis() - TimeUnit.DAYS.toMillis(Constants.SUBSCRIPTIONS_EXPIRY)) + .getResultStream() + .parallel() + .filter(ChannelHelpers::isValidId) + .forEach(id -> Multithreading.runAsyncLimitedPubSub(() -> { + try (StatelessSession sess = DatabaseSessionFactory.createStatelessSession()) { + var pubsub = new PubSub(id, -1); + var tr = sess.beginTransaction(); + sess.insert(pubsub); + tr.commit(); + } + })); - } catch (Exception e) { - e.printStackTrace(); + } catch (Exception e) { + e.printStackTrace(); + } } - } - }, 0, TimeUnit.DAYS.toMillis(1)); + }, TimeUnit.MINUTES.toMillis(10), TimeUnit.DAYS.toMillis(1)); // Start after 10 min, repeat daily - new Timer().scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) { + // Video Cleanup Timer + new Timer("Video-Cleanup", true).scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) { - var cb = s.getCriteriaBuilder(); - var cd = cb.createCriteriaDelete(Video.class); - var root = cd.from(Video.class); - cd.where(cb.lessThan(root.get("uploaded"), System.currentTimeMillis() - TimeUnit.DAYS.toMillis(Constants.FEED_RETENTION))); + var cb = s.getCriteriaBuilder(); + var cd = cb.createCriteriaDelete(Video.class); + var root = cd.from(Video.class); + cd.where(cb.lessThan(root.get("uploaded"), System.currentTimeMillis() - TimeUnit.DAYS.toMillis(Constants.FEED_RETENTION))); - var tr = s.beginTransaction(); + var tr = s.beginTransaction(); - var query = s.createMutationQuery(cd); + var query = s.createMutationQuery(cd); - System.out.printf("Cleanup: Removed %o old videos%n", query.executeUpdate()); + System.out.printf("Cleanup: Removed %o old videos%n", query.executeUpdate()); - tr.commit(); + tr.commit(); - } catch (Exception e) { - e.printStackTrace(); + } catch (Exception e) { + e.printStackTrace(); + } } - } - }, 0, TimeUnit.MINUTES.toMillis(60)); + }, TimeUnit.MINUTES.toMillis(15), TimeUnit.MINUTES.toMillis(60)); // Start after 15 min, repeat hourly - new Timer().scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) { + // Playlist Video Cleanup Timer + new Timer("PlaylistVideo-Cleanup", true).scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) { - CriteriaBuilder cb = s.getCriteriaBuilder(); + CriteriaBuilder cb = s.getCriteriaBuilder(); - var pvQuery = cb.createCriteriaDelete(PlaylistVideo.class); - var pvRoot = pvQuery.from(PlaylistVideo.class); + var pvQuery = cb.createCriteriaDelete(PlaylistVideo.class); + var pvRoot = pvQuery.from(PlaylistVideo.class); - var subQuery = pvQuery.subquery(String.class); - var subRoot = subQuery.from(me.kavin.piped.utils.obj.db.Playlist.class); + var subQuery = pvQuery.subquery(String.class); + var subRoot = subQuery.from(me.kavin.piped.utils.obj.db.Playlist.class); - subQuery.select(subRoot.join("videos").get("id")).distinct(true); + subQuery.select(subRoot.join("videos").get("id")).distinct(true); - pvQuery.where(cb.not(pvRoot.get("id").in(subQuery))); + pvQuery.where(cb.not(pvRoot.get("id").in(subQuery))); - var tr = s.beginTransaction(); - s.createMutationQuery(pvQuery).executeUpdate(); - tr.commit(); + var tr = s.beginTransaction(); + s.createMutationQuery(pvQuery).executeUpdate(); + tr.commit(); + } } + }, TimeUnit.MINUTES.toMillis(20), TimeUnit.MINUTES.toMillis(60)); // Start after 20 min, repeat hourly + + + // --- NEW Feed Polling Timer --- + if (Constants.ENABLE_FEED_POLLING) { + System.out.println("Feed polling enabled. Interval: " + Constants.POLLING_INTERVAL_MINUTES + " minutes."); + new Timer("Feed-Polling", true).scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + System.out.println("Starting feed polling cycle..."); + long startTime = System.currentTimeMillis(); + Set uniqueChannelIds; + + // 1. Get all unique subscribed channel IDs + try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) { + long expiryTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(Constants.SUBSCRIPTIONS_EXPIRY); + // Combine authenticated and unauthenticated subscriptions + List subscribedIds = s.createNativeQuery( + "SELECT DISTINCT channel FROM users_subscribed " + + "UNION " + + "SELECT id FROM unauthenticated_subscriptions WHERE subscribed_at > :unauthSubbed", + String.class) + .setParameter("unauthSubbed", expiryTimestamp) + .list(); + uniqueChannelIds = new HashSet<>(subscribedIds); // Use HashSet for efficient distinctness + } catch (Exception e) { + System.err.println("Error fetching subscribed channels for polling:"); + ExceptionHandler.handle(e); + return; // Stop this cycle if we can't get channels + } + + if (uniqueChannelIds.isEmpty()) { + System.out.println("No channels to poll."); + return; + } + + System.out.println("Polling " + uniqueChannelIds.size() + " unique channels."); + + // 2. Process each channel in parallel + List> futures = uniqueChannelIds.stream() + .map(channelId -> CompletableFuture.runAsync(() -> pollChannel(channelId), Multithreading.getCachedExecutor())) + .toList(); + + // Wait for all polling tasks to complete for this cycle + try { + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); // Wait for completion + long duration = System.currentTimeMillis() - startTime; + System.out.println("Feed polling cycle finished in " + duration + " ms."); + } catch (Exception e) { + System.err.println("Error waiting for polling tasks to complete:"); + ExceptionHandler.handle(e); + } + } + }, TimeUnit.MINUTES.toMillis(1), TimeUnit.MINUTES.toMillis(Constants.POLLING_INTERVAL_MINUTES)); // Start after 1 min, repeat per config + } else { + System.out.println("Feed polling is disabled via configuration."); + } + } // End of !DISABLE_TIMERS block + } // End of main method + + /** + * Polls a single channel for recent videos and updates the database. + * Designed to be run asynchronously. + * @param channelId The YouTube channel ID to poll. + */ + private static void pollChannel(String channelId) { + if (!ChannelHelpers.isValidId(channelId)) { + // System.err.println("Skipping invalid channel ID during poll: " + channelId); + return; + } + + // System.out.println("Polling channel: " + channelId); // Optional: verbose logging + long pollStartTime = System.currentTimeMillis(); + int videosProcessed = 0; + // int videosAdded = 0; // Hard to track accurately without more logic + + try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) { + // Fetch the Channel entity first - needed for foreign key in Video table + me.kavin.piped.utils.obj.db.Channel channelEntity = DatabaseHelper.getChannelFromId(s, channelId); + if (channelEntity == null) { + // Attempt to save the channel if it's missing (might happen with unauthenticated subs) + System.out.println("Channel " + channelId + " not found in DB, attempting to save."); + channelEntity = DatabaseHelper.saveChannel(channelId); // This might be slow, consider alternatives if performance is critical + if (channelEntity == null) { + System.err.println("Failed to find or save channel " + channelId + " for polling. Skipping."); + return; + } + } + + // Fetch only the videos tab for efficiency + ChannelInfo channelInfo = ChannelInfo.getInfo("https://youtube.com/channel/" + channelId); + ListLinkHandler videosTabHandler = channelInfo.getTabs() + .stream() + .filter(tab -> tab.getContentFilters().contains(ChannelTabs.VIDEOS)) + .findFirst() + .orElse(null); + + if (videosTabHandler == null) { + System.err.println("Could not find videos tab for channel: " + channelId); + return; } - }, 0, TimeUnit.MINUTES.toMillis(60)); + // Fetch the first page of the videos tab + ChannelTabInfo tabInfo = ChannelTabInfo.getInfo(YOUTUBE_SERVICE, videosTabHandler); + List items = tabInfo.getRelatedItems() + .stream() + .filter(StreamInfoItem.class::isInstance) + .map(StreamInfoItem.class::cast) + .limit(Constants.POLLING_FETCH_LIMIT_PER_CHANNEL) // Limit fetched items + .toList(); + + // Process the fetched items + for (StreamInfoItem item : items) { + // Use the efficient helper that avoids re-fetching + VideoHelpers.handleNewVideo(item, System.currentTimeMillis(), channelEntity); + videosProcessed++; + // Note: We can't easily tell if it was *added* vs updated here without more complex logic + } + + } catch (ExtractionException | IOException e) { + System.err.println("Error polling channel " + channelId + ": " + e.getMessage()); + // Don't print full stack trace for common extraction errors unless debugging + // ExceptionHandler.handle(e); + } catch (Exception e) { + // Catch unexpected errors + System.err.println("Unexpected error polling channel " + channelId + ":"); + ExceptionHandler.handle(e); + } finally { + long pollDuration = System.currentTimeMillis() - pollStartTime; + // Optional: Log duration per channel if needed for performance tuning + // System.out.println("Finished polling channel " + channelId + " in " + pollDuration + "ms. Processed: " + videosProcessed); + } } -} +} // End of Main class \ No newline at end of file diff --git a/src/main/java/me/kavin/piped/consts/Constants.java b/src/main/java/me/kavin/piped/consts/Constants.java index 984088ff..ac9a8f14 100644 --- a/src/main/java/me/kavin/piped/consts/Constants.java +++ b/src/main/java/me/kavin/piped/consts/Constants.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Optional; import java.util.Properties; +import java.util.concurrent.TimeUnit; // Added import public class Constants { @@ -106,6 +107,12 @@ public class Constants { public static final String VERSION; + // --- Polling Configuration --- + public static final boolean ENABLE_FEED_POLLING; + public static final int POLLING_INTERVAL_MINUTES; + public static final int POLLING_FETCH_LIMIT_PER_CHANNEL; + // --- End Polling Configuration --- + public static final ObjectMapper mapper = JsonMapper.builder() .addMixIn(Page.class, PageMixin.class) .addMixIn(ListLinkHandler.class, ListLinkHandlerMixin.class) @@ -173,6 +180,13 @@ public class Constants { MATRIX_TOKEN = getProperty(prop, "MATRIX_TOKEN"); GEO_RESTRICTION_CHECKER_URL = getProperty(prop, "GEO_RESTRICTION_CHECKER_URL"); BG_HELPER_URL = getProperty(prop, "BG_HELPER_URL"); + + // --- Polling Configuration --- + ENABLE_FEED_POLLING = Boolean.parseBoolean(getProperty(prop, "ENABLE_FEED_POLLING", "false")); + POLLING_INTERVAL_MINUTES = Integer.parseInt(getProperty(prop, "POLLING_INTERVAL_MINUTES", "15")); + POLLING_FETCH_LIMIT_PER_CHANNEL = Integer.parseInt(getProperty(prop, "POLLING_FETCH_LIMIT_PER_CHANNEL", "10")); + // --- End Polling Configuration --- + prop.forEach((_key, _value) -> { String key = String.valueOf(_key), value = String.valueOf(_value); if (key.startsWith("hibernate")) @@ -223,4 +237,4 @@ private static String getProperty(final Properties prop, String key, String def) return prop.getProperty(key, def); } -} +} \ No newline at end of file diff --git a/src/main/java/me/kavin/piped/utils/VideoHelpers.java b/src/main/java/me/kavin/piped/utils/VideoHelpers.java index 1ba540f6..c47020d0 100644 --- a/src/main/java/me/kavin/piped/utils/VideoHelpers.java +++ b/src/main/java/me/kavin/piped/utils/VideoHelpers.java @@ -6,6 +6,7 @@ import me.kavin.piped.utils.obj.db.Video; import org.apache.commons.lang3.StringUtils; import org.hibernate.StatelessSession; +import org.schabi.newpipe.extractor.exceptions.ParsingException; import org.schabi.newpipe.extractor.stream.StreamExtractor; import org.schabi.newpipe.extractor.stream.StreamInfo; import org.schabi.newpipe.extractor.stream.StreamInfoItem; @@ -37,26 +38,30 @@ public static void handleNewVideo(StreamInfo info, long time, me.kavin.piped.uti channel = DatabaseHelper.getChannelFromId( info.getUploaderUrl().substring("https://www.youtube.com/channel/".length())); - long infoTime = info.getUploadDate() != null ? info.getUploadDate().offsetDateTime().toInstant().toEpochMilli() - : System.currentTimeMillis(); + // Prioritize actual upload date + long infoTime = Optional.ofNullable(info.getUploadDate()) + .map(date -> date.offsetDateTime().toInstant().toEpochMilli()) + .orElse(time); // Fallback to provided time only if upload date is null if (channel != null && (System.currentTimeMillis() - infoTime) < TimeUnit.DAYS.toMillis(Constants.FEED_RETENTION)) { info.setShortFormContent(isShort(info.getId())); + String latestThumbnailUrl = info.getThumbnails() != null && !info.getThumbnails().isEmpty() ? info.getThumbnails().getLast().getUrl() : null; try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) { if (!DatabaseHelper.doesVideoExist(s, info.getId())) { Video video = new Video(info.getId(), info.getName(), info.getViewCount(), info.getDuration(), - Math.max(infoTime, time), info.getThumbnails().getLast().getUrl(), info.isShortFormContent(), channel); + infoTime, // Use actual upload date + latestThumbnailUrl, info.isShortFormContent(), channel); - insertVideo(video); - return; + insertVideo(video); // This will insert with the correct infoTime + return; // No need to update immediately after insert } } - - updateVideo(info.getId(), info, time); + // Video exists, update it (including potentially correcting the timestamp if it was wrong before) + updateVideo(info.getId(), info, infoTime); // Pass infoTime here as well } } @@ -67,27 +72,95 @@ public static void handleNewVideo(StreamExtractor extractor, long time, me.kavin channel = DatabaseHelper.getChannelFromId( extractor.getUploaderUrl().substring("https://www.youtube.com/channel/".length())); + // Prioritize actual upload date long infoTime = Optional.ofNullable(extractor.getUploadDate()) .map(date -> date.offsetDateTime().toInstant().toEpochMilli()) - .orElseGet(System::currentTimeMillis); + .orElse(time); // Fallback to provided time only if upload date is null if (channel != null && (System.currentTimeMillis() - infoTime) < TimeUnit.DAYS.toMillis(Constants.FEED_RETENTION)) { + String latestThumbnailUrl = extractor.getThumbnails() != null && !extractor.getThumbnails().isEmpty() ? extractor.getThumbnails().getLast().getUrl() : null; + try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) { if (!DatabaseHelper.doesVideoExist(s, extractor.getId())) { boolean isShort = extractor.isShortFormContent() || isShort(extractor.getId()); Video video = new Video(extractor.getId(), extractor.getName(), extractor.getViewCount(), extractor.getLength(), - Math.max(infoTime, time), extractor.getThumbnails().getLast().getUrl(), isShort, channel); - - insertVideo(video); - + infoTime, // Use actual upload date + latestThumbnailUrl, isShort, channel); + + insertVideo(video); // Inserts with correct infoTime + // No need to update if just inserted + } else { + // Video exists, update it (thumbnail might have changed, ensure timestamp is correct) + // We call insertVideo again, relying on ON CONFLICT to update the timestamp if needed. + // This is slightly less direct than calling updateVideo, but ensures the timestamp logic + // is centralized in the insert/conflict handling. + boolean isShort = extractor.isShortFormContent() || isShort(extractor.getId()); + Video videoForUpdate = new Video(extractor.getId(), extractor.getName(), extractor.getViewCount(), extractor.getLength(), + infoTime, // Use actual upload date + latestThumbnailUrl, isShort, channel); + insertVideo(videoForUpdate); // Rely on ON CONFLICT to update timestamp and other fields } } } + } + + /** + * Handles inserting or updating a video based on a StreamInfoItem, + * avoiding redundant fetching if the item details are sufficient. + * Used primarily by the feed polling mechanism. + * + * @param item The StreamInfoItem containing video details. + * @param time The timestamp to use if creating a new video record (e.g., from PubSub). + * @param channel The Channel entity this video belongs to. + */ + public static void handleNewVideo(StreamInfoItem item, long time, me.kavin.piped.utils.obj.db.Channel channel) { + if (item == null || channel == null) return; + + // Prioritize actual upload date + long infoTime = Optional.ofNullable(item.getUploadDate()) + .map(date -> date.offsetDateTime().toInstant().toEpochMilli()) + .orElse(time); // Fallback to provided time only if upload date is null + + if ((System.currentTimeMillis() - infoTime) >= TimeUnit.DAYS.toMillis(Constants.FEED_RETENTION)) { + return; // Skip videos older than retention period + } + + String videoId = null; + try { + videoId = YOUTUBE_SERVICE.getStreamLHFactory().getId(item.getUrl()); + } catch (ParsingException e) { + ExceptionHandler.handle(e, "Failed to parse video ID from StreamInfoItem URL: " + item.getUrl()); + return; + } + + if (StringUtils.isBlank(videoId)) return; + String latestThumbnailUrl = item.getThumbnails() != null && !item.getThumbnails().isEmpty() ? item.getThumbnails().getLast().getUrl() : null; + + try { + // We directly call insertVideo, which handles both insertion and update (via ON CONFLICT) + // This ensures the timestamp logic is consistent. + boolean isShort = item.isShortFormContent(); + Video videoForInsertOrUpdate = new Video( + videoId, + item.getName(), + item.getViewCount() > 0 ? item.getViewCount() : 0, // Ensure non-negative views + item.getDuration() > 0 ? item.getDuration() : 0, // Ensure non-negative duration + infoTime, // Use actual upload date + latestThumbnailUrl, + isShort, + channel + ); + insertVideo(videoForInsertOrUpdate); // Handles both insert and update via ON CONFLICT + + } catch (Exception e) { + // Handle potential exceptions during DB operation + ExceptionHandler.handle(e, "Error handling new video from StreamInfoItem: " + videoId); + } } public static boolean isShort(String videoId) throws Exception { @@ -104,13 +177,19 @@ public static boolean isShort(String videoId) throws Exception { return jsonResponse.getObject("endpoint").has("reelWatchEndpoint"); } - public static void updateVideo(String id, StreamInfo info, long time) { + // This updateVideo overload is primarily called when a video *already exists* + // and we get new info (like view count) from StreamInfo. + // It should ensure the timestamp isn't accidentally overwritten with the fetch time. + public static void updateVideo(String id, StreamInfo info, long infoTime) { // Changed 'time' to 'infoTime' for clarity Multithreading.runAsync(() -> { try { - if (!updateVideo(id, info.getViewCount(), info.getDuration(), info.getName())) { + String latestThumbnailUrl = info.getThumbnails() != null && !info.getThumbnails().isEmpty() ? info.getThumbnails().getLast().getUrl() : null; + // Call the update method that includes thumbnail, but *not* timestamp + if (!updateVideo(id, info.getViewCount(), info.getDuration(), info.getName(), latestThumbnailUrl)) { + // If update failed (video didn't exist), try inserting it using the correct infoTime var channel = DatabaseHelper.getChannelFromId(StringUtils.substring(info.getUploaderUrl(), -24)); if (channel != null) - handleNewVideo(info, time, channel); + handleNewVideo(info, infoTime, channel); // Pass correct infoTime } } catch (Exception e) { ExceptionHandler.handle(e); @@ -118,11 +197,23 @@ public static void updateVideo(String id, StreamInfo info, long time) { }); } + // This overload is called by the polling mechanism when a video exists. + // It should update metadata but *not* the timestamp. public static void updateVideo(String id, StreamInfoItem item) { - updateVideo(id, item.getViewCount(), item.getDuration(), item.getName()); + String latestThumbnailUrl = item.getThumbnails() != null && !item.getThumbnails().isEmpty() ? item.getThumbnails().getLast().getUrl() : null; + // Call the update method that includes thumbnail, but *not* timestamp + updateVideo(id, item.getViewCount(), item.getDuration(), item.getName(), latestThumbnailUrl); } + // Overload for backward compatibility or cases where thumbnail isn't available/needed + // This should NOT update the timestamp. public static boolean updateVideo(String id, long views, long duration, String title) { + return updateVideo(id, views, duration, title, null); // Pass null for thumbnail + } + + // Main update method - *DOES NOT* update the 'uploaded' timestamp. + // Timestamp correction is handled by the ON CONFLICT clause in insertVideo. + public static boolean updateVideo(String id, long views, long duration, String title, String thumbnail) { try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) { var cb = s.getCriteriaBuilder(); @@ -130,15 +221,27 @@ public static boolean updateVideo(String id, long views, long duration, String t var root = cu.from(Video.class); cu.where(cb.equal(root.get("id"), id)); - + boolean changed = false; if (duration > 0) { cu.set(root.get("duration"), duration); + changed = true; } if (title != null) { cu.set(root.get("title"), title); + changed = true; } if (views > 0) { cu.set(root.get("views"), views); + changed = true; + } + if (thumbnail != null) { + cu.set(root.get("thumbnail"), thumbnail); + changed = true; + } + + if (!changed) { + // Check if the video actually exists before returning true + return DatabaseHelper.doesVideoExist(s, id); } long updated; @@ -149,8 +252,8 @@ public static boolean updateVideo(String id, long views, long duration, String t tr.commit(); } catch (Exception e) { tr.rollback(); - - // return true, so that we don't try to insert a video! + ExceptionHandler.handle(e, "Error updating video: " + id); + // return true, as the record likely exists but update failed return true; } @@ -158,21 +261,26 @@ public static boolean updateVideo(String id, long views, long duration, String t } } + // insertVideo now handles timestamp correction via ON CONFLICT public static void insertVideo(Video video) { try (StatelessSession s = DatabaseSessionFactory.createStatelessSession()) { var tr = s.beginTransaction(); try { + // Ensure thumbnail is not null before inserting + String thumbnailToInsert = video.getThumbnail() != null ? video.getThumbnail() : ""; // Use empty string if null + + // *** THE KEY FIX IS HERE: Added 'uploaded = excluded.uploaded' *** s.createNativeMutationQuery( "INSERT INTO videos (uploader_id,duration,is_short,thumbnail,title,uploaded,views,id) values " + "(:uploader_id,:duration,:is_short,:thumbnail,:title,:uploaded,:views,:id) ON CONFLICT (id) DO UPDATE SET " + - "duration = excluded.duration, title = excluded.title, views = excluded.views" + "duration = excluded.duration, title = excluded.title, views = excluded.views, thumbnail = excluded.thumbnail, uploaded = excluded.uploaded" // <-- Added uploaded update ) .setParameter("uploader_id", video.getChannel().getUploaderId()) .setParameter("duration", video.getDuration()) .setParameter("is_short", video.isShort()) - .setParameter("thumbnail", video.getThumbnail()) + .setParameter("thumbnail", thumbnailToInsert) .setParameter("title", video.getTitle()) - .setParameter("uploaded", video.getUploaded()) + .setParameter("uploaded", video.getUploaded()) // This is the correct infoTime .setParameter("views", video.getViews()) .setParameter("id", video.getId()) .executeUpdate(); @@ -183,4 +291,4 @@ public static void insertVideo(Video video) { } } } -} +} \ No newline at end of file