From 3cb7b59c984468e6f1b18e7321fa383d0c6151cf Mon Sep 17 00:00:00 2001 From: Johannes Freden Jansson Date: Fri, 2 May 2025 10:07:42 +0200 Subject: [PATCH 1/4] Reprocess all changed files in settings dir if symlink dir is updated --- .../service/FileSettingsServiceIT.java | 61 ++++++++++++++++++- .../file/AbstractFileWatchingService.java | 41 +++++++------ 2 files changed, 82 insertions(+), 20 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java index 6586bb03f36ba..049b8d003e643 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java @@ -44,7 +44,6 @@ import static org.elasticsearch.health.HealthStatus.YELLOW; import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING; import static org.elasticsearch.node.Node.INITIAL_STATE_TIMEOUT_SETTING; -import static org.elasticsearch.test.NodeRoles.dataNode; import static org.elasticsearch.test.NodeRoles.dataOnlyNode; import static org.elasticsearch.test.NodeRoles.masterNode; import static org.hamcrest.Matchers.allOf; @@ -139,6 +138,11 @@ private void assertMasterNode(Client client, String node) { public static void writeJSONFile(String node, String json, Logger logger, Long version) throws Exception { FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node); + writeJSONFile(node, json, logger, version, fileSettingsService.watchedFile()); + } + + public static void writeJSONFile(String node, String json, Logger logger, Long version, Path targetPath) throws Exception { + FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node); Files.createDirectories(fileSettingsService.watchedFileDir()); Path tempFilePath = createTempFile(); @@ -152,8 +156,8 @@ public static void writeJSONFile(String node, String json, Logger logger, Long v do { try { // this can fail on Windows because of timing - Files.move(tempFilePath, fileSettingsService.watchedFile(), StandardCopyOption.ATOMIC_MOVE); - logger.info("--> after writing JSON config to node {} with path {}", node, tempFilePath); + Files.move(tempFilePath, targetPath, StandardCopyOption.ATOMIC_MOVE); + logger.info("--> after writing JSON config to node {} with path {}", node, targetPath); return; } catch (IOException e) { logger.info("--> retrying writing a settings file [{}]", retryCount); @@ -503,6 +507,57 @@ public void testSettingsAppliedOnMasterReElection() throws Exception { assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "43mb"); } + public void testSymlinkUpdateTriggerReload() throws Exception { + internalCluster().setBootstrapMasterNodeIndex(0); + logger.info("--> start data node / non master node"); + String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s")); + FileSettingsService dataFileSettingsService = internalCluster().getInstance(FileSettingsService.class, dataNode); + + assertFalse(dataFileSettingsService.watching()); + + logger.info("--> start master node"); + final String masterNode = internalCluster().startMasterOnlyNode(); + assertMasterNode(internalCluster().nonMasterClient(), masterNode); + { + var savedClusterState = setupClusterStateListener(masterNode); + + FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode); + + assertBusy(() -> assertTrue(masterFileSettingsService.watching())); + assertFalse(dataFileSettingsService.watching()); + + // Create the settings.json as a symlink to simulate k8 setup + // settings.json -> ..data/settings.json + // ..data -> ..TIMESTAMP_TEMP_FOLDER_1 + createK8sLikeSymlinks(masterNode); + assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "50mb"); + } + { + var savedClusterState = setupClusterStateListener(masterNode); + // Update ..data symlink to ..data -> ..TIMESTAMP_TEMP_FOLDER_2 to simulate kubernetes secret update + updateSymlinks(masterNode, testJSON43mb); + assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "43mb"); + } + } + + public Path createK8sLikeSymlinks(String node) throws Exception { + FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node); + Path baseDir = fileSettingsService.watchedFileDir(); + var fileDir = Files.createDirectories(baseDir.resolve("..TIMESTAMP_TEMP_FOLDER_1")); + writeJSONFile(node, testJSON, logger, versionCounter.incrementAndGet(), fileDir.resolve("settings.json")); + var dataDir = Files.createSymbolicLink(baseDir.resolve("..data"), fileDir.getFileName()); + return Files.createSymbolicLink(baseDir.resolve("settings.json"), dataDir.getFileName().resolve("settings.json")); + } + + public void updateSymlinks(String node, String json) throws Exception { + FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node); + Path baseDir = fileSettingsService.watchedFileDir(); + var fileDir = Files.createDirectories(baseDir.resolve("..TIMESTAMP_TEMP_FOLDER_2")); + writeJSONFile(node, json, logger, versionCounter.incrementAndGet(), fileDir.resolve("settings.json")); + Files.deleteIfExists(baseDir.resolve("..data")); + Files.createSymbolicLink(baseDir.resolve("..data"), fileDir.getFileName()); + } + public void testHealthIndicatorWithSingleNode() throws Exception { internalCluster().setBootstrapMasterNodeIndex(0); logger.info("--> start the node"); diff --git a/server/src/main/java/org/elasticsearch/common/file/AbstractFileWatchingService.java b/server/src/main/java/org/elasticsearch/common/file/AbstractFileWatchingService.java index c9993f12b63b9..fdcb9b8d662e5 100644 --- a/server/src/main/java/org/elasticsearch/common/file/AbstractFileWatchingService.java +++ b/server/src/main/java/org/elasticsearch/common/file/AbstractFileWatchingService.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.ClosedWatchServiceException; +import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.StandardWatchEventKinds; @@ -27,11 +28,11 @@ import java.nio.file.attribute.BasicFileAttributes; import java.nio.file.attribute.FileTime; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -237,14 +238,16 @@ protected final void watcherThread() { key.reset(); if (key == settingsDirWatchKey) { - // there may be multiple events for the same file - we only want to re-read once - Set processedFiles = new HashSet<>(); - for (WatchEvent e : events) { - Path fullFile = settingsDir.resolve(e.context().toString()); - if (processedFiles.add(fullFile)) { - if (fileChanged(fullFile)) { - process(fullFile); - } + Set changedPaths = events.stream() + .map(event -> settingsDir.resolve(event.context().toString())) + .collect(Collectors.toSet()); + for (var changedPath : changedPaths) { + // If a symlinked dir changed in the settings dir, it could be linked to other symlinks, so reprocess all files + if (Files.isDirectory(changedPath) && Files.isSymbolicLink(changedPath)) { + reprocessAllChangedFilesInSettingsDir(); + break; + } else if (fileChanged(changedPath)) { + process(changedPath); } } } else if (key == configDirWatchKey) { @@ -257,14 +260,7 @@ protected final void watcherThread() { settingsDirWatchKey = enableDirectoryWatcher(settingsDirWatchKey, settingsDir); // re-read the settings directory, and ping for any changes - try (Stream files = filesList(settingsDir)) { - for (var f = files.iterator(); f.hasNext();) { - Path file = f.next(); - if (fileChanged(file)) { - process(file); - } - } - } + reprocessAllChangedFilesInSettingsDir(); } else if (settingsDirWatchKey != null) { settingsDirWatchKey.cancel(); } @@ -279,6 +275,17 @@ protected final void watcherThread() { } } + private void reprocessAllChangedFilesInSettingsDir() throws IOException, InterruptedException { + try (Stream files = filesList(settingsDir)) { + for (var f = files.iterator(); f.hasNext();) { + Path file = f.next(); + if (fileChanged(file)) { + process(file); + } + } + } + } + protected final synchronized void stopWatcher() { if (watching()) { logger.debug("stopping watcher ..."); From 1c0c99c7cb41a19bbd9852c28d72aede16af80ba Mon Sep 17 00:00:00 2001 From: Johannes Freden Jansson Date: Fri, 2 May 2025 13:47:26 +0200 Subject: [PATCH 2/4] Simplify test --- .../service/FileSettingsServiceIT.java | 46 +++++-------------- 1 file changed, 12 insertions(+), 34 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java index 049b8d003e643..1899b055ed0c9 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java @@ -509,55 +509,33 @@ public void testSettingsAppliedOnMasterReElection() throws Exception { public void testSymlinkUpdateTriggerReload() throws Exception { internalCluster().setBootstrapMasterNodeIndex(0); - logger.info("--> start data node / non master node"); - String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s")); - FileSettingsService dataFileSettingsService = internalCluster().getInstance(FileSettingsService.class, dataNode); - - assertFalse(dataFileSettingsService.watching()); - - logger.info("--> start master node"); final String masterNode = internalCluster().startMasterOnlyNode(); - assertMasterNode(internalCluster().nonMasterClient(), masterNode); + FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode); + Path baseDir = masterFileSettingsService.watchedFileDir(); + assertBusy(() -> assertTrue(masterFileSettingsService.watching())); + { var savedClusterState = setupClusterStateListener(masterNode); - - FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode); - - assertBusy(() -> assertTrue(masterFileSettingsService.watching())); - assertFalse(dataFileSettingsService.watching()); - // Create the settings.json as a symlink to simulate k8 setup // settings.json -> ..data/settings.json // ..data -> ..TIMESTAMP_TEMP_FOLDER_1 - createK8sLikeSymlinks(masterNode); + var fileDir = Files.createDirectories(baseDir.resolve("..TIMESTAMP_TEMP_FOLDER_1")); + writeJSONFile(masterNode, testJSON, logger, versionCounter.incrementAndGet(), fileDir.resolve("settings.json")); + var dataDir = Files.createSymbolicLink(baseDir.resolve("..data"), fileDir.getFileName()); + Files.createSymbolicLink(baseDir.resolve("settings.json"), dataDir.getFileName().resolve("settings.json")); assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "50mb"); } { var savedClusterState = setupClusterStateListener(masterNode); // Update ..data symlink to ..data -> ..TIMESTAMP_TEMP_FOLDER_2 to simulate kubernetes secret update - updateSymlinks(masterNode, testJSON43mb); + var fileDir = Files.createDirectories(baseDir.resolve("..TIMESTAMP_TEMP_FOLDER_2")); + writeJSONFile(masterNode, testJSON43mb, logger, versionCounter.incrementAndGet(), fileDir.resolve("settings.json")); + Files.deleteIfExists(baseDir.resolve("..data")); + Files.createSymbolicLink(baseDir.resolve("..data"), fileDir.getFileName()); assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "43mb"); } } - public Path createK8sLikeSymlinks(String node) throws Exception { - FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node); - Path baseDir = fileSettingsService.watchedFileDir(); - var fileDir = Files.createDirectories(baseDir.resolve("..TIMESTAMP_TEMP_FOLDER_1")); - writeJSONFile(node, testJSON, logger, versionCounter.incrementAndGet(), fileDir.resolve("settings.json")); - var dataDir = Files.createSymbolicLink(baseDir.resolve("..data"), fileDir.getFileName()); - return Files.createSymbolicLink(baseDir.resolve("settings.json"), dataDir.getFileName().resolve("settings.json")); - } - - public void updateSymlinks(String node, String json) throws Exception { - FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node); - Path baseDir = fileSettingsService.watchedFileDir(); - var fileDir = Files.createDirectories(baseDir.resolve("..TIMESTAMP_TEMP_FOLDER_2")); - writeJSONFile(node, json, logger, versionCounter.incrementAndGet(), fileDir.resolve("settings.json")); - Files.deleteIfExists(baseDir.resolve("..data")); - Files.createSymbolicLink(baseDir.resolve("..data"), fileDir.getFileName()); - } - public void testHealthIndicatorWithSingleNode() throws Exception { internalCluster().setBootstrapMasterNodeIndex(0); logger.info("--> start the node"); From 3850598abc4e72a999bb62db51dd50cfa693719f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Fred=C3=A9n?= <109296772+jfreden@users.noreply.github.com> Date: Fri, 2 May 2025 13:58:17 +0200 Subject: [PATCH 3/4] Update docs/changelog/127628.yaml --- docs/changelog/127628.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/127628.yaml diff --git a/docs/changelog/127628.yaml b/docs/changelog/127628.yaml new file mode 100644 index 0000000000000..083b425f13eb4 --- /dev/null +++ b/docs/changelog/127628.yaml @@ -0,0 +1,5 @@ +pr: 127628 +summary: Ensure config reload on ..data symlink switch for CSI driver support +area: Infra/Settings +type: enhancement +issues: [] From d68fbfb3def2043bfedf954cd649ae452e4f98f0 Mon Sep 17 00:00:00 2001 From: Johannes Freden Jansson Date: Mon, 12 May 2025 13:37:35 +0200 Subject: [PATCH 4/4] fixup! move file ops to FileSettingsService --- .../common/file/AbstractFileWatchingService.java | 5 +++-- .../reservedstate/service/FileSettingsService.java | 5 +++++ .../common/file/AbstractFileWatchingServiceTests.java | 5 +++++ .../common/file/MasterNodeFileWatchingServiceTests.java | 5 +++++ 4 files changed, 18 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/file/AbstractFileWatchingService.java b/server/src/main/java/org/elasticsearch/common/file/AbstractFileWatchingService.java index fdcb9b8d662e5..243126964529b 100644 --- a/server/src/main/java/org/elasticsearch/common/file/AbstractFileWatchingService.java +++ b/server/src/main/java/org/elasticsearch/common/file/AbstractFileWatchingService.java @@ -18,7 +18,6 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.ClosedWatchServiceException; -import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.StandardWatchEventKinds; @@ -243,7 +242,7 @@ protected final void watcherThread() { .collect(Collectors.toSet()); for (var changedPath : changedPaths) { // If a symlinked dir changed in the settings dir, it could be linked to other symlinks, so reprocess all files - if (Files.isDirectory(changedPath) && Files.isSymbolicLink(changedPath)) { + if (filesIsDirectory(changedPath) && filesIsSymbolicLink(changedPath)) { reprocessAllChangedFilesInSettingsDir(); break; } else if (fileChanged(changedPath)) { @@ -385,6 +384,8 @@ private record FileUpdateState(long timestamp, String path, Object fileKey) {} protected abstract boolean filesIsDirectory(Path path); + protected abstract boolean filesIsSymbolicLink(Path path); + protected abstract A filesReadAttributes(Path path, Class clazz) throws IOException; protected abstract Stream filesList(Path dir) throws IOException; diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/FileSettingsService.java b/server/src/main/java/org/elasticsearch/reservedstate/service/FileSettingsService.java index 8de1979a1dbd8..76cf7ef1b6947 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/FileSettingsService.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/FileSettingsService.java @@ -467,6 +467,11 @@ protected boolean filesIsDirectory(Path path) { return Files.isDirectory(path); } + @Override + protected boolean filesIsSymbolicLink(Path path) { + return Files.isSymbolicLink(path); + } + @Override protected A filesReadAttributes(Path path, Class clazz) throws IOException { return Files.readAttributes(path, clazz); diff --git a/server/src/test/java/org/elasticsearch/common/file/AbstractFileWatchingServiceTests.java b/server/src/test/java/org/elasticsearch/common/file/AbstractFileWatchingServiceTests.java index 4fbc8f776b256..5a2c06dd56eb6 100644 --- a/server/src/test/java/org/elasticsearch/common/file/AbstractFileWatchingServiceTests.java +++ b/server/src/test/java/org/elasticsearch/common/file/AbstractFileWatchingServiceTests.java @@ -98,6 +98,11 @@ protected boolean filesIsDirectory(Path path) { return Files.isDirectory(path); } + @Override + protected boolean filesIsSymbolicLink(Path path) { + return Files.isSymbolicLink(path); + } + @Override protected A filesReadAttributes(Path path, Class clazz) throws IOException { return Files.readAttributes(path, clazz); diff --git a/server/src/test/java/org/elasticsearch/common/file/MasterNodeFileWatchingServiceTests.java b/server/src/test/java/org/elasticsearch/common/file/MasterNodeFileWatchingServiceTests.java index 712287ee9af4c..7d692e7e437e3 100644 --- a/server/src/test/java/org/elasticsearch/common/file/MasterNodeFileWatchingServiceTests.java +++ b/server/src/test/java/org/elasticsearch/common/file/MasterNodeFileWatchingServiceTests.java @@ -80,6 +80,11 @@ protected boolean filesIsDirectory(Path path) { return Files.isDirectory(path); } + @Override + protected boolean filesIsSymbolicLink(Path path) { + return Files.isSymbolicLink(path); + } + @Override protected A filesReadAttributes(Path path, Class clazz) throws IOException { return Files.readAttributes(path, clazz);