Skip to content

Commit 26e5d5a

Browse files
committed
Reprocess all changed files in settings dir if symlink dir is updated
1 parent 5f256cc commit 26e5d5a

File tree

2 files changed

+82
-19
lines changed

2 files changed

+82
-19
lines changed

server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@ private void assertMasterNode(Client client, String node) {
134134

135135
public static void writeJSONFile(String node, String json, Logger logger, Long version) throws Exception {
136136
FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node);
137+
writeJSONFile(node, json, logger, version, fileSettingsService.watchedFile());
138+
}
139+
140+
public static void writeJSONFile(String node, String json, Logger logger, Long version, Path targetPath) throws Exception {
141+
FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node);
137142

138143
Files.createDirectories(fileSettingsService.watchedFileDir());
139144
Path tempFilePath = createTempFile();
@@ -147,8 +152,8 @@ public static void writeJSONFile(String node, String json, Logger logger, Long v
147152
do {
148153
try {
149154
// this can fail on Windows because of timing
150-
Files.move(tempFilePath, fileSettingsService.watchedFile(), StandardCopyOption.ATOMIC_MOVE);
151-
logger.info("--> after writing JSON config to node {} with path {}", node, tempFilePath);
155+
Files.move(tempFilePath, targetPath, StandardCopyOption.ATOMIC_MOVE);
156+
logger.info("--> after writing JSON config to node {} with path {}", node, targetPath);
152157
return;
153158
} catch (IOException e) {
154159
logger.info("--> retrying writing a settings file [{}]", retryCount);
@@ -498,6 +503,57 @@ public void testSettingsAppliedOnMasterReElection() throws Exception {
498503
assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "43mb");
499504
}
500505

506+
public void testSymlinkUpdateTriggerReload() throws Exception {
507+
internalCluster().setBootstrapMasterNodeIndex(0);
508+
logger.info("--> start data node / non master node");
509+
String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s"));
510+
FileSettingsService dataFileSettingsService = internalCluster().getInstance(FileSettingsService.class, dataNode);
511+
512+
assertFalse(dataFileSettingsService.watching());
513+
514+
logger.info("--> start master node");
515+
final String masterNode = internalCluster().startMasterOnlyNode();
516+
assertMasterNode(internalCluster().nonMasterClient(), masterNode);
517+
{
518+
var savedClusterState = setupClusterStateListener(masterNode);
519+
520+
FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);
521+
522+
assertBusy(() -> assertTrue(masterFileSettingsService.watching()));
523+
assertFalse(dataFileSettingsService.watching());
524+
525+
// Create the settings.json as a symlink to simulate k8 setup
526+
// settings.json -> ..data/settings.json
527+
// ..data -> ..TIMESTAMP_TEMP_FOLDER_1
528+
createK8sLikeSymlinks(masterNode);
529+
assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "50mb");
530+
}
531+
{
532+
var savedClusterState = setupClusterStateListener(masterNode);
533+
// Update ..data symlink to ..data -> ..TIMESTAMP_TEMP_FOLDER_2 to simulate kubernetes secret update
534+
updateSymlinks(masterNode, testJSON43mb);
535+
assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "43mb");
536+
}
537+
}
538+
539+
public Path createK8sLikeSymlinks(String node) throws Exception {
540+
FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node);
541+
Path baseDir = fileSettingsService.watchedFileDir();
542+
var fileDir = Files.createDirectories(baseDir.resolve("..TIMESTAMP_TEMP_FOLDER_1"));
543+
writeJSONFile(node, testJSON, logger, versionCounter.incrementAndGet(), fileDir.resolve("settings.json"));
544+
var dataDir = Files.createSymbolicLink(baseDir.resolve("..data"), fileDir.getFileName());
545+
return Files.createSymbolicLink(baseDir.resolve("settings.json"), dataDir.getFileName().resolve("settings.json"));
546+
}
547+
548+
public void updateSymlinks(String node, String json) throws Exception {
549+
FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node);
550+
Path baseDir = fileSettingsService.watchedFileDir();
551+
var fileDir = Files.createDirectories(baseDir.resolve("..TIMESTAMP_TEMP_FOLDER_2"));
552+
writeJSONFile(node, json, logger, versionCounter.incrementAndGet(), fileDir.resolve("settings.json"));
553+
Files.deleteIfExists(baseDir.resolve("..data"));
554+
Files.createSymbolicLink(baseDir.resolve("..data"), fileDir.getFileName());
555+
}
556+
501557
private void assertHasErrors(AtomicLong waitForMetadataVersion, String expectedError) {
502558
var errorMetadata = getErrorMetadata(waitForMetadataVersion);
503559
assertThat(errorMetadata, is(notNullValue()));

server/src/main/java/org/elasticsearch/common/file/AbstractFileWatchingService.java

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.io.IOException;
1919
import java.io.InputStream;
2020
import java.nio.file.ClosedWatchServiceException;
21+
import java.nio.file.Files;
2122
import java.nio.file.NoSuchFileException;
2223
import java.nio.file.Path;
2324
import java.nio.file.StandardWatchEventKinds;
@@ -27,11 +28,11 @@
2728
import java.nio.file.attribute.BasicFileAttributes;
2829
import java.nio.file.attribute.FileTime;
2930
import java.util.HashMap;
30-
import java.util.HashSet;
3131
import java.util.List;
3232
import java.util.Map;
3333
import java.util.Set;
3434
import java.util.concurrent.ExecutionException;
35+
import java.util.stream.Collectors;
3536
import java.util.stream.Stream;
3637

3738
/**
@@ -237,14 +238,16 @@ protected final void watcherThread() {
237238
key.reset();
238239

239240
if (key == settingsDirWatchKey) {
240-
// there may be multiple events for the same file - we only want to re-read once
241-
Set<Path> processedFiles = new HashSet<>();
242-
for (WatchEvent<?> e : events) {
243-
Path fullFile = settingsDir.resolve(e.context().toString());
244-
if (processedFiles.add(fullFile)) {
245-
if (fileChanged(fullFile)) {
246-
process(fullFile);
247-
}
241+
Set<Path> changedPaths = events.stream()
242+
.map(event -> settingsDir.resolve(event.context().toString()))
243+
.collect(Collectors.toSet());
244+
for (var changedPath : changedPaths) {
245+
// If a symlinked dir changed in the settings dir, it could be linked to other symlinks, so reprocess all files
246+
if (Files.isDirectory(changedPath) && Files.isSymbolicLink(changedPath)) {
247+
reprocessAllChangedFilesInSettingsDir();
248+
break;
249+
} else if (fileChanged(changedPath)) {
250+
process(changedPath);
248251
}
249252
}
250253
} else if (key == configDirWatchKey) {
@@ -257,14 +260,7 @@ protected final void watcherThread() {
257260
settingsDirWatchKey = enableDirectoryWatcher(settingsDirWatchKey, settingsDir);
258261

259262
// re-read the settings directory, and ping for any changes
260-
try (Stream<Path> files = filesList(settingsDir)) {
261-
for (var f = files.iterator(); f.hasNext();) {
262-
Path file = f.next();
263-
if (fileChanged(file)) {
264-
process(file);
265-
}
266-
}
267-
}
263+
reprocessAllChangedFilesInSettingsDir();
268264
} else if (settingsDirWatchKey != null) {
269265
settingsDirWatchKey.cancel();
270266
}
@@ -279,6 +275,17 @@ protected final void watcherThread() {
279275
}
280276
}
281277

278+
private void reprocessAllChangedFilesInSettingsDir() throws IOException, InterruptedException {
279+
try (Stream<Path> files = filesList(settingsDir)) {
280+
for (var f = files.iterator(); f.hasNext();) {
281+
Path file = f.next();
282+
if (fileChanged(file)) {
283+
process(file);
284+
}
285+
}
286+
}
287+
}
288+
282289
protected final synchronized void stopWatcher() {
283290
if (watching()) {
284291
logger.debug("stopping watcher ...");

0 commit comments

Comments
 (0)