Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.watcher;

import java.nio.file.Path;
import java.util.List;

/**
* Callback interface that FileGroupWatcher is using to notify listeners about changes.
*/
public interface FileGroupChangesListener {
/**
* Called only once for any number of files that were changed (creates/updates/deletes) in the watched directory
*/
default void onFileChanged(List<Path> file) {}

}
119 changes: 119 additions & 0 deletions server/src/main/java/org/elasticsearch/watcher/FileGroupWatcher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.watcher;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

/**
* An abstraction that coalesces any number of updates, deletes or creations for any number of files into ONE update
* per configured Resource Watcher period. To this end, the semantics are such that distinctions between
* updates, deletes or create actions are all collapsed and everything is referred to as a "change".
* <p>
* The implementation relies on FileWatcher for the specifics of monitoring files. This class acts like an
* orchestrator, initiating and reacting to the results of a collection of file watchers.
*/
public class FileGroupWatcher extends AbstractResourceWatcher<FileGroupChangesListener> {

private static final Logger logger = LogManager.getLogger(FileGroupWatcher.class);

private final List<FileWatcher> watchedFiles;
private final CoalescingFileChangesListener coalescingListener;

/**
* @param files a group of files to be watched
*/
public FileGroupWatcher(Collection<Path> files) {
this.coalescingListener = new CoalescingFileChangesListener();
this.watchedFiles = new ArrayList<>();
for (Path file : files) {
final FileWatcher watcher = new FileWatcher(file);
watcher.addListener(coalescingListener);
watchedFiles.add(watcher);
}
}

@Override
protected void doInit() throws IOException {
for (FileWatcher watcher : watchedFiles) {
watcher.doInit();
}
}

@Override
protected void doCheckAndNotify() throws IOException {
if (coalescingListener.filesChanged()) {
// fallback in case state wasn't cleared correctly in previous iteration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds concerning. How can this happen?

Copy link
Contributor Author

@ankit--sethi ankit--sethi Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it shouldn't ever happen due to the reset happening in the finally block. I was thinking of very rare situations where the JDK spec says finally will not execute. I had initially found this reference:

Likewise, if the thread executing the try or catch code is interrupted or killed, the finally block may not execute even though the application as a whole continues

https://stackoverflow.com/a/2417986

After digging a little deeper to write this reply, it turns out this is a JDK documentation mistake that they have now updated. Apparently, the section I quoted above is not actually true!
https://bugs.openjdk.org/browse/JDK-8276156?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel

With things where they are now, I'm fine with removing this if condition if it scares more than helps!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed this if-condition

logger.error("Detected inconsistent state in the File Group Watcher, clearing it and proceeding.");
coalescingListener.reset();
}
for (FileWatcher watcher : watchedFiles) {
watcher.doCheckAndNotify();
}
// at this point the results of all file changes has coalesced
if (coalescingListener.filesChanged()) {
logger.info("Number of changed files: {}", coalescingListener.getFilesChangedCount());
try {
for (FileGroupChangesListener listener : listeners()) {
try {
listener.onFileChanged(coalescingListener.getFiles());
} catch (Exception e) {
logger.warn("Cannot notify file group changes listener {}", listener.getClass().getSimpleName(), e);
}
}
} finally {
coalescingListener.reset();
}
}
}

static class CoalescingFileChangesListener implements FileChangesListener {

private final ThreadLocal<List<Path>> filesChanged = ThreadLocal.withInitial(() -> Collections.synchronizedList(new ArrayList<>()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm failing to understand the need for ThreadLocal and synchronized list. Could you explain?

Copy link
Contributor Author

@ankit--sethi ankit--sethi Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think synchronized list was the result of an earlier solution that got superseded by ThreadLocal but I forgot to remove it. I'll explain the general scenario either way though:

  1. File change detected at the end of a particular time interval triggers this listener to action, the listener is executing in thread A.
  2. If the listener execution is time-consuming it is possible that thread A is still executing by the time a second monitoring interval completes. And if there is yet another change to the files, then the resource watcher will assign a second thread B to execute the same same listener. (I'm making some assumptions about how the resource watcher schedules tasks here)
  3. now thread A and B are modifying the same list concurrently.

This edge case requires CoalescingFileChangesListener to be doing something very time-consuming, or the watcher frequency is very very small. It's very unlikely but wrapping a list in the ThreadLocal is a low cost way to have some thread isolation in case this ever happens.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the listener execution is time-consuming it is possible that thread A is still executing by the time a second monitoring interval completes. And if there is yet another change to the files, then the resource watcher will assign a second thread B to execute the same same listener. (I'm making some assumptions about how the resource watcher schedules tasks here)

I think we should verify these assumptions with @elastic/es-core-infra (assuming you are the owners of resource watcher). My understanding is that file watcher does not schedule new thread until it finishes with processing current. Meaning, it seems to be single threaded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds right, I tracked it down to these lines here and the flow is such that only after a task completes is the next runnable scheduled.

I feel though that depending on this information adds risk because it is tightly coupling the design to the internals of how a separate component works. With the ThreadLocal in place we don't need to depend on the scheduler guaranteeing thread safety, or things breaking if the design is ever modified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed the synchronizedList usage, it's not required.


@Override
public void onFileChanged(Path file) {
filesChanged.get().add(file);
}

@Override
public void onFileCreated(Path file) {
onFileChanged(file);
}

@Override
public void onFileDeleted(Path file) {
onFileChanged(file);
}

public boolean filesChanged() {
return filesChanged.get().isEmpty() == false;
}

public int getFilesChangedCount() {
return filesChanged.get().size();
}

public void reset() {
filesChanged.set(Collections.synchronizedList(new ArrayList<>()));
}

public List<Path> getFiles() {
return new ArrayList<>(filesChanged.get());
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.watcher;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;

@LuceneTestCase.SuppressFileSystems("ExtrasFS")
public class FileGroupWatcherTests extends ESTestCase {

private static class RecordingFileGroupChangesListener implements FileGroupChangesListener {
private final List<String> notifications = new ArrayList<>();

@Override
public void onFileChanged(List<Path> files) {
notifications.add("onFileChanged: " + files.size() + " files");
}

public List<String> notifications() {
return notifications;
}
}

public void testMultipleFileChanges() throws IOException {
Path tempDir = createTempDir();
RecordingFileGroupChangesListener listener = new RecordingFileGroupChangesListener();
Path file1 = tempDir.resolve("test1.txt");
Path file2 = tempDir.resolve("test2.txt");
Path file3 = tempDir.resolve("test3.txt");

touch(file1);
touch(file2);
touch(file3);

FileGroupWatcher fileGroupWatcher = new FileGroupWatcher(Arrays.asList(file1, file2, file3));
fileGroupWatcher.addListener(listener);
fileGroupWatcher.init();

// Modify multiple files - should batch into single notification
Files.setLastModifiedTime(file1, FileTime.fromMillis(System.currentTimeMillis()));
Files.setLastModifiedTime(file2, FileTime.fromMillis(System.currentTimeMillis()));

fileGroupWatcher.checkAndNotify();
assertThat(listener.notifications(), hasSize(1));
assertThat(listener.notifications().get(0), equalTo("onFileChanged: 2 files"));
}

public void testBatchedCreation() throws IOException {
Path tempDir = createTempDir();
RecordingFileGroupChangesListener listener = new RecordingFileGroupChangesListener();
Path file1 = tempDir.resolve("test1.txt");
Path file2 = tempDir.resolve("test2.txt");

FileGroupWatcher fileGroupWatcher = new FileGroupWatcher(Arrays.asList(file1, file2));
fileGroupWatcher.addListener(listener);
fileGroupWatcher.init();

// Create both files
touch(file1);
touch(file2);

fileGroupWatcher.checkAndNotify();
assertThat(listener.notifications(), hasSize(1));
assertThat(listener.notifications().get(0), equalTo("onFileChanged: 2 files"));
}

public void testBatchedDeletion() throws IOException {
Path tempDir = createTempDir();
RecordingFileGroupChangesListener listener = new RecordingFileGroupChangesListener();
Path file1 = tempDir.resolve("test1.txt");
Path file2 = tempDir.resolve("test2.txt");

touch(file1);
touch(file2);

FileGroupWatcher fileGroupWatcher = new FileGroupWatcher(Arrays.asList(file1, file2));
fileGroupWatcher.addListener(listener);
fileGroupWatcher.init();

Files.delete(file1);
Files.delete(file2);

fileGroupWatcher.checkAndNotify();
assertThat(listener.notifications(), hasSize(1));
assertThat(listener.notifications().get(0), equalTo("onFileChanged: 2 files"));
}

public void testMixedOperationsSeparateNotifications() throws IOException {
Path tempDir = createTempDir();
RecordingFileGroupChangesListener listener = new RecordingFileGroupChangesListener();
Path existingFile = tempDir.resolve("existing.txt");
Path newFile = tempDir.resolve("new.txt");
Path deleteFile = tempDir.resolve("delete.txt");

touch(existingFile);
touch(deleteFile);

FileGroupWatcher fileGroupWatcher = new FileGroupWatcher(Arrays.asList(existingFile, newFile, deleteFile));
fileGroupWatcher.addListener(listener);
fileGroupWatcher.init();

Files.setLastModifiedTime(existingFile, FileTime.fromMillis(System.currentTimeMillis()));
touch(newFile);
Files.delete(deleteFile);

fileGroupWatcher.checkAndNotify();
assertThat(listener.notifications(), hasSize(1));
assertThat(listener.notifications(), containsInAnyOrder("onFileChanged: 3 files"));
}

public void testEmptyFileGroup() throws IOException {
RecordingFileGroupChangesListener listener = new RecordingFileGroupChangesListener();

FileGroupWatcher fileGroupWatcher = new FileGroupWatcher(List.of());
fileGroupWatcher.addListener(listener);
fileGroupWatcher.init();

fileGroupWatcher.checkAndNotify();
assertThat(listener.notifications(), empty());
}

static void touch(Path path) throws IOException {
Files.newOutputStream(path).close();
}
}