99
1010package org .elasticsearch .reservedstate .service ;
1111
12+ import org .apache .logging .log4j .Level ;
13+ import org .apache .logging .log4j .LogManager ;
14+ import org .apache .logging .log4j .Logger ;
1215import org .elasticsearch .Version ;
1316import org .elasticsearch .action .ActionListener ;
1417import org .elasticsearch .cluster .ClusterChangedEvent ;
2326import org .elasticsearch .cluster .routing .RerouteService ;
2427import org .elasticsearch .cluster .service .ClusterService ;
2528import org .elasticsearch .common .component .Lifecycle ;
29+ import org .elasticsearch .common .file .AbstractFileWatchingService ;
30+ import org .elasticsearch .common .logging .Loggers ;
2631import org .elasticsearch .common .settings .ClusterSettings ;
2732import org .elasticsearch .common .settings .Settings ;
2833import org .elasticsearch .core .TimeValue ;
3944import org .mockito .stubbing .Answer ;
4045
4146import java .io .IOException ;
47+ import java .nio .file .AtomicMoveNotSupportedException ;
4248import java .nio .file .Files ;
4349import java .nio .file .Path ;
44- import java .nio .file .StandardCopyOption ;
50+ import java .nio .file .attribute .FileTime ;
51+ import java .time .Instant ;
52+ import java .time .LocalDateTime ;
53+ import java .time .ZoneId ;
54+ import java .time .ZoneOffset ;
4555import java .util .List ;
4656import java .util .Map ;
4757import java .util .Set ;
5060import java .util .concurrent .atomic .AtomicBoolean ;
5161import java .util .function .Consumer ;
5262
63+ import static java .nio .file .StandardCopyOption .ATOMIC_MOVE ;
5364import static org .elasticsearch .node .Node .NODE_NAME_SETTING ;
5465import static org .hamcrest .Matchers .anEmptyMap ;
5566import static org .hamcrest .Matchers .hasEntry ;
6273import static org .mockito .Mockito .verify ;
6374
6475public class FileSettingsServiceTests extends ESTestCase {
76+ private static final Logger logger = LogManager .getLogger (FileSettingsServiceTests .class );
6577 private Environment env ;
6678 private ClusterService clusterService ;
6779 private ReservedClusterStateService controller ;
@@ -71,6 +83,8 @@ public class FileSettingsServiceTests extends ESTestCase {
7183 @ Before
7284 public void setUp () throws Exception {
7385 super .setUp ();
86+ // TODO remove me once https://github.com/elastic/elasticsearch/issues/115280 is closed
87+ Loggers .setLevel (LogManager .getLogger (AbstractFileWatchingService .class ), Level .DEBUG );
7488
7589 threadpool = new TestThreadPool ("file_settings_service_tests" );
7690
@@ -115,16 +129,23 @@ public void setUp() throws Exception {
115129
116130 @ After
117131 public void tearDown () throws Exception {
118- if (fileSettingsService .lifecycleState () == Lifecycle .State .STARTED ) {
119- fileSettingsService .stop ();
120- }
121- if (fileSettingsService .lifecycleState () == Lifecycle .State .STOPPED ) {
122- fileSettingsService .close ();
123- }
132+ try {
133+ if (fileSettingsService .lifecycleState () == Lifecycle .State .STARTED ) {
134+ logger .info ("Stopping file settings service" );
135+ fileSettingsService .stop ();
136+ }
137+ if (fileSettingsService .lifecycleState () == Lifecycle .State .STOPPED ) {
138+ logger .info ("Closing file settings service" );
139+ fileSettingsService .close ();
140+ }
124141
125- super .tearDown ();
126- clusterService .close ();
127- threadpool .shutdownNow ();
142+ super .tearDown ();
143+ clusterService .close ();
144+ threadpool .shutdownNow ();
145+ } finally {
146+ // TODO remove me once https://github.com/elastic/elasticsearch/issues/115280 is closed
147+ Loggers .setLevel (LogManager .getLogger (AbstractFileWatchingService .class ), Level .INFO );
148+ }
128149 }
129150
130151 public void testStartStop () {
@@ -190,27 +211,17 @@ public void testInitialFileWorks() throws Exception {
190211 return null ;
191212 }).when (controller ).process (any (), any (XContentParser .class ), any (), any ());
192213
193- CountDownLatch latch = new CountDownLatch (1 );
194-
195- fileSettingsService .addFileChangedListener (latch ::countDown );
214+ CountDownLatch processFileLatch = new CountDownLatch (1 );
215+ fileSettingsService .addFileChangedListener (processFileLatch ::countDown );
196216
197217 Files .createDirectories (fileSettingsService .watchedFileDir ());
198218 // contents of the JSON don't matter, we just need a file to exist
199219 writeTestFile (fileSettingsService .watchedFile (), "{}" );
200220
201- doAnswer ((Answer <?>) invocation -> {
202- try {
203- return invocation .callRealMethod ();
204- } finally {
205- latch .countDown ();
206- }
207- }).when (fileSettingsService ).processFileOnServiceStart ();
208-
209221 fileSettingsService .start ();
210222 fileSettingsService .clusterChanged (new ClusterChangedEvent ("test" , clusterService .state (), ClusterState .EMPTY_STATE ));
211223
212- // wait for listener to be called
213- assertTrue (latch .await (20 , TimeUnit .SECONDS ));
224+ longAwait (processFileLatch );
214225
215226 verify (fileSettingsService , times (1 )).processFileOnServiceStart ();
216227 verify (controller , times (1 )).process (any (), any (XContentParser .class ), eq (ReservedStateVersionCheck .HIGHER_OR_SAME_VERSION ), any ());
@@ -223,40 +234,30 @@ public void testProcessFileChanges() throws Exception {
223234 return null ;
224235 }).when (controller ).process (any (), any (XContentParser .class ), any (), any ());
225236
226- // we get three events: initial clusterChanged event, first write, second write
227- CountDownLatch latch = new CountDownLatch (3 );
228-
229- fileSettingsService .addFileChangedListener (latch ::countDown );
237+ CountDownLatch processFileCreationLatch = new CountDownLatch (1 );
238+ fileSettingsService .addFileChangedListener (processFileCreationLatch ::countDown );
230239
231240 Files .createDirectories (fileSettingsService .watchedFileDir ());
232241 // contents of the JSON don't matter, we just need a file to exist
233242 writeTestFile (fileSettingsService .watchedFile (), "{}" );
234243
235- doAnswer ((Answer <?>) invocation -> {
236- try {
237- return invocation .callRealMethod ();
238- } finally {
239- latch .countDown ();
240- }
241- }).when (fileSettingsService ).processFileOnServiceStart ();
242- doAnswer ((Answer <?>) invocation -> {
243- try {
244- return invocation .callRealMethod ();
245- } finally {
246- latch .countDown ();
247- }
248- }).when (fileSettingsService ).processFileChanges ();
249-
250244 fileSettingsService .start ();
251245 fileSettingsService .clusterChanged (new ClusterChangedEvent ("test" , clusterService .state (), ClusterState .EMPTY_STATE ));
252- // second file change; contents still don't matter
253- overwriteTestFile (fileSettingsService .watchedFile (), "{}" );
254246
255- // wait for listener to be called (once for initial processing, once for subsequent update)
256- assertTrue (latch .await (20 , TimeUnit .SECONDS ));
247+ longAwait (processFileCreationLatch );
248+
249+ CountDownLatch processFileChangeLatch = new CountDownLatch (1 );
250+ fileSettingsService .addFileChangedListener (processFileChangeLatch ::countDown );
257251
258252 verify (fileSettingsService , times (1 )).processFileOnServiceStart ();
259253 verify (controller , times (1 )).process (any (), any (XContentParser .class ), eq (ReservedStateVersionCheck .HIGHER_OR_SAME_VERSION ), any ());
254+
255+ // Touch the file to get an update
256+ Instant now = LocalDateTime .now (ZoneId .systemDefault ()).toInstant (ZoneOffset .ofHours (0 ));
257+ Files .setLastModifiedTime (fileSettingsService .watchedFile (), FileTime .from (now ));
258+
259+ longAwait (processFileChangeLatch );
260+
260261 verify (fileSettingsService , times (1 )).processFileChanges ();
261262 verify (controller , times (1 )).process (any (), any (XContentParser .class ), eq (ReservedStateVersionCheck .HIGHER_VERSION_ONLY ), any ());
262263 }
@@ -295,9 +296,7 @@ public void testStopWorksInMiddleOfProcessing() throws Exception {
295296 // Make some fake settings file to cause the file settings service to process it
296297 writeTestFile (fileSettingsService .watchedFile (), "{}" );
297298
298- // we need to wait a bit, on MacOS it may take up to 10 seconds for the Java watcher service to notice the file,
299- // on Linux is instantaneous. Windows is instantaneous too.
300- assertTrue (processFileLatch .await (30 , TimeUnit .SECONDS ));
299+ longAwait (processFileLatch );
301300
302301 // Stopping the service should interrupt the watcher thread, we should be able to stop
303302 fileSettingsService .stop ();
@@ -352,15 +351,27 @@ public void testHandleSnapshotRestoreResetsMetadata() throws Exception {
352351 }
353352
354353 // helpers
355- private void writeTestFile (Path path , String contents ) throws IOException {
354+ private static void writeTestFile (Path path , String contents ) throws IOException {
355+ logger .info ("Writing settings file under [{}]" , path .toAbsolutePath ());
356356 Path tempFilePath = createTempFile ();
357357 Files .writeString (tempFilePath , contents );
358- Files .move (tempFilePath , path , StandardCopyOption .ATOMIC_MOVE , StandardCopyOption .REPLACE_EXISTING );
358+ try {
359+ Files .move (tempFilePath , path , ATOMIC_MOVE );
360+ } catch (AtomicMoveNotSupportedException e ) {
361+ logger .info ("Atomic move not available. Falling back on non-atomic move to write [{}]" , path .toAbsolutePath ());
362+ Files .move (tempFilePath , path );
363+ }
359364 }
360365
361- private void overwriteTestFile (Path path , String contents ) throws IOException {
362- Path tempFilePath = createTempFile ();
363- Files .writeString (tempFilePath , contents );
364- Files .move (tempFilePath , path , StandardCopyOption .REPLACE_EXISTING );
366+ // this waits for up to 20 seconds to account for watcher service differences between OSes
367+ // on MacOS it may take up to 10 seconds for the Java watcher service to notice the file,
368+ // on Linux is instantaneous. Windows is instantaneous too.
369+ private static void longAwait (CountDownLatch latch ) {
370+ try {
371+ assertTrue ("longAwait: CountDownLatch did not reach zero within the timeout" , latch .await (20 , TimeUnit .SECONDS ));
372+ } catch (InterruptedException e ) {
373+ Thread .currentThread ().interrupt ();
374+ fail (e , "longAwait: interrupted waiting for CountDownLatch to reach zero" );
375+ }
365376 }
366377}
0 commit comments