diff --git a/server/src/main/java/org/elasticsearch/watcher/FileGroupChangesListener.java b/server/src/main/java/org/elasticsearch/watcher/FileGroupChangesListener.java new file mode 100644 index 0000000000000..f4fb4e90c00ca --- /dev/null +++ b/server/src/main/java/org/elasticsearch/watcher/FileGroupChangesListener.java @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.watcher; + +import java.nio.file.Path; +import java.util.List; + +/** + * Callback interface that FileGroupWatcher is using to notify listeners about changes. + */ +public interface FileGroupChangesListener { + /** + * Called only once for any number of files that were changed (creates/updates/deletes) in the watched directory + */ + default void onFileChanged(List file) {} + +} diff --git a/server/src/main/java/org/elasticsearch/watcher/FileGroupWatcher.java b/server/src/main/java/org/elasticsearch/watcher/FileGroupWatcher.java new file mode 100644 index 0000000000000..882f04142ace6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/watcher/FileGroupWatcher.java @@ -0,0 +1,118 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.watcher; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * An abstraction that coalesces any number of updates, deletes or creations for any number of files into ONE update + * per configured Resource Watcher period. To this end, the semantics are such that distinctions between + * updates, deletes or create actions are all collapsed and everything is referred to as a "change". + *

+ * The implementation relies on FileWatcher for the specifics of monitoring files. This class acts like an + * orchestrator, initiating and reacting to the results of a collection of file watchers. + */ +public class FileGroupWatcher extends AbstractResourceWatcher { + + private static final Logger logger = LogManager.getLogger(FileGroupWatcher.class); + + private final List watchedFiles; + private final CoalescingFileChangesListener coalescingListener; + + /** + * @param files a group of files to be watched + */ + public FileGroupWatcher(Collection files) { + this.coalescingListener = new CoalescingFileChangesListener(); + this.watchedFiles = new ArrayList<>(); + for (Path file : files) { + final FileWatcher watcher = new FileWatcher(file); + watcher.addListener(coalescingListener); + watchedFiles.add(watcher); + } + } + + @Override + protected void doInit() throws IOException { + for (FileWatcher watcher : watchedFiles) { + watcher.doInit(); + } + } + + @Override + protected void doCheckAndNotify() throws IOException { + if (coalescingListener.filesChanged()) { + // fallback in case state wasn't cleared correctly in previous iteration + logger.error("Detected inconsistent state in the File Group Watcher, clearing it and proceeding."); + coalescingListener.reset(); + } + for (FileWatcher watcher : watchedFiles) { + watcher.doCheckAndNotify(); + } + // at this point the results of all file changes has coalesced + if (coalescingListener.filesChanged()) { + logger.info("Number of changed files: {}", coalescingListener.getFilesChangedCount()); + try { + for (FileGroupChangesListener listener : listeners()) { + try { + listener.onFileChanged(coalescingListener.getFiles()); + } catch (Exception e) { + logger.warn("Cannot notify file group changes listener {}", listener.getClass().getSimpleName(), e); + } + } + } finally { + coalescingListener.reset(); + } + } + } + + static class CoalescingFileChangesListener implements FileChangesListener { + + private final ThreadLocal> filesChanged = ThreadLocal.withInitial(ArrayList::new); + + @Override + public void onFileChanged(Path file) { + filesChanged.get().add(file); + } + + @Override + public void onFileCreated(Path file) { + onFileChanged(file); + } + + @Override + public void onFileDeleted(Path file) { + onFileChanged(file); + } + + public boolean filesChanged() { + return filesChanged.get().isEmpty() == false; + } + + public int getFilesChangedCount() { + return filesChanged.get().size(); + } + + public void reset() { + filesChanged.set(new ArrayList<>()); + } + + public List getFiles() { + return new ArrayList<>(filesChanged.get()); + } + + } +} diff --git a/server/src/test/java/org/elasticsearch/watcher/FileGroupWatcherTests.java b/server/src/test/java/org/elasticsearch/watcher/FileGroupWatcherTests.java new file mode 100644 index 0000000000000..17055bfcb25d1 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/watcher/FileGroupWatcherTests.java @@ -0,0 +1,144 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.watcher; + +import org.apache.lucene.tests.util.LuceneTestCase; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.FileTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; + +@LuceneTestCase.SuppressFileSystems("ExtrasFS") +public class FileGroupWatcherTests extends ESTestCase { + + private static class RecordingFileGroupChangesListener implements FileGroupChangesListener { + private final List notifications = new ArrayList<>(); + + @Override + public void onFileChanged(List files) { + notifications.add("onFileChanged: " + files.size() + " files"); + } + + public List notifications() { + return notifications; + } + } + + public void testMultipleFileChanges() throws IOException, InterruptedException { + Path tempDir = createTempDir(); + RecordingFileGroupChangesListener listener = new RecordingFileGroupChangesListener(); + Path file1 = tempDir.resolve("test1.txt"); + Path file2 = tempDir.resolve("test2.txt"); + Path file3 = tempDir.resolve("test3.txt"); + + touch(file1); + touch(file2); + touch(file3); + + FileGroupWatcher fileGroupWatcher = new FileGroupWatcher(Arrays.asList(file1, file2, file3)); + fileGroupWatcher.addListener(listener); + fileGroupWatcher.init(); + + // Modify multiple files - should batch into single notification + Files.setLastModifiedTime(file1, FileTime.fromMillis(System.currentTimeMillis() + 1)); // + 1 makes sure a change is detected + Files.setLastModifiedTime(file2, FileTime.fromMillis(System.currentTimeMillis() + 1)); + + fileGroupWatcher.checkAndNotify(); + assertThat(listener.notifications(), hasSize(1)); + assertThat(listener.notifications().get(0), equalTo("onFileChanged: 2 files")); + } + + public void testBatchedCreation() throws IOException { + Path tempDir = createTempDir(); + RecordingFileGroupChangesListener listener = new RecordingFileGroupChangesListener(); + Path file1 = tempDir.resolve("test1.txt"); + Path file2 = tempDir.resolve("test2.txt"); + + FileGroupWatcher fileGroupWatcher = new FileGroupWatcher(Arrays.asList(file1, file2)); + fileGroupWatcher.addListener(listener); + fileGroupWatcher.init(); + + // Create both files + touch(file1); + touch(file2); + + fileGroupWatcher.checkAndNotify(); + assertThat(listener.notifications(), hasSize(1)); + assertThat(listener.notifications().get(0), equalTo("onFileChanged: 2 files")); + } + + public void testBatchedDeletion() throws IOException { + Path tempDir = createTempDir(); + RecordingFileGroupChangesListener listener = new RecordingFileGroupChangesListener(); + Path file1 = tempDir.resolve("test1.txt"); + Path file2 = tempDir.resolve("test2.txt"); + + touch(file1); + touch(file2); + + FileGroupWatcher fileGroupWatcher = new FileGroupWatcher(Arrays.asList(file1, file2)); + fileGroupWatcher.addListener(listener); + fileGroupWatcher.init(); + + Files.delete(file1); + Files.delete(file2); + + fileGroupWatcher.checkAndNotify(); + assertThat(listener.notifications(), hasSize(1)); + assertThat(listener.notifications().get(0), equalTo("onFileChanged: 2 files")); + } + + public void testMixedOperationsSeparateNotifications() throws IOException { + Path tempDir = createTempDir(); + RecordingFileGroupChangesListener listener = new RecordingFileGroupChangesListener(); + Path existingFile = tempDir.resolve("existing.txt"); + Path newFile = tempDir.resolve("new.txt"); + Path deleteFile = tempDir.resolve("delete.txt"); + + touch(existingFile); + touch(deleteFile); + + FileGroupWatcher fileGroupWatcher = new FileGroupWatcher(Arrays.asList(existingFile, newFile, deleteFile)); + fileGroupWatcher.addListener(listener); + fileGroupWatcher.init(); + + Files.setLastModifiedTime(existingFile, FileTime.fromMillis(System.currentTimeMillis() + 1)); // + 1 makes sure a change is detected + touch(newFile); + Files.delete(deleteFile); + + fileGroupWatcher.checkAndNotify(); + assertThat(listener.notifications(), hasSize(1)); + assertThat(listener.notifications(), containsInAnyOrder("onFileChanged: 3 files")); + } + + public void testEmptyFileGroup() throws IOException { + RecordingFileGroupChangesListener listener = new RecordingFileGroupChangesListener(); + + FileGroupWatcher fileGroupWatcher = new FileGroupWatcher(List.of()); + fileGroupWatcher.addListener(listener); + fileGroupWatcher.init(); + + fileGroupWatcher.checkAndNotify(); + assertThat(listener.notifications(), empty()); + } + + static void touch(Path path) throws IOException { + Files.newOutputStream(path).close(); + } +}