3838import org .elasticsearch .test .ESTestCase ;
3939import org .elasticsearch .threadpool .TestThreadPool ;
4040import org .elasticsearch .threadpool .ThreadPool ;
41+ import org .elasticsearch .xcontent .XContentParseException ;
4142import org .elasticsearch .xcontent .XContentParser ;
4243import org .junit .After ;
4344import org .junit .Before ;
5556import java .util .List ;
5657import java .util .Map ;
5758import java .util .Set ;
59+ import java .util .concurrent .BrokenBarrierException ;
5860import java .util .concurrent .CountDownLatch ;
61+ import java .util .concurrent .CyclicBarrier ;
62+ import java .util .concurrent .ExecutionException ;
5963import java .util .concurrent .TimeUnit ;
64+ import java .util .concurrent .TimeoutException ;
6065import java .util .concurrent .atomic .AtomicBoolean ;
6166import java .util .function .Consumer ;
6267
6368import static java .nio .file .StandardCopyOption .ATOMIC_MOVE ;
69+ import static java .nio .file .StandardCopyOption .REPLACE_EXISTING ;
6470import static org .elasticsearch .node .Node .NODE_NAME_SETTING ;
6571import static org .hamcrest .Matchers .anEmptyMap ;
6672import static org .hamcrest .Matchers .hasEntry ;
6773import static org .mockito .ArgumentMatchers .any ;
74+ import static org .mockito .ArgumentMatchers .argThat ;
6875import static org .mockito .ArgumentMatchers .eq ;
6976import static org .mockito .Mockito .doAnswer ;
7077import static org .mockito .Mockito .mock ;
@@ -262,6 +269,68 @@ public void testProcessFileChanges() throws Exception {
262269 verify (controller , times (1 )).process (any (), any (XContentParser .class ), eq (ReservedStateVersionCheck .HIGHER_VERSION_ONLY ), any ());
263270 }
264271
272+ @ SuppressWarnings ("unchecked" )
273+ public void testInvalidJSON () throws Exception {
274+ doAnswer ((Answer <Void >) invocation -> {
275+ invocation .getArgument (1 , XContentParser .class ).map (); // Throw if JSON is invalid
276+ ((Consumer <Exception >) invocation .getArgument (3 )).accept (null );
277+ return null ;
278+ }).when (controller ).process (any (), any (XContentParser .class ), any (), any ());
279+
280+ CyclicBarrier fileChangeBarrier = new CyclicBarrier (2 );
281+ fileSettingsService .addFileChangedListener (() -> awaitOrBust (fileChangeBarrier ));
282+
283+ Files .createDirectories (fileSettingsService .watchedFileDir ());
284+ // contents of the JSON don't matter, we just need a file to exist
285+ writeTestFile (fileSettingsService .watchedFile (), "{}" );
286+
287+ doAnswer ((Answer <?>) invocation -> {
288+ boolean returnedNormally = false ;
289+ try {
290+ var result = invocation .callRealMethod ();
291+ returnedNormally = true ;
292+ return result ;
293+ } catch (XContentParseException e ) {
294+ // We're expecting a parse error. processFileChanges specifies that this is supposed to throw ExecutionException.
295+ throw new ExecutionException (e );
296+ } catch (Throwable e ) {
297+ throw new AssertionError ("Unexpected exception" , e );
298+ } finally {
299+ if (returnedNormally == false ) {
300+ // Because of the exception, listeners aren't notified, so we need to activate the barrier ourselves
301+ awaitOrBust (fileChangeBarrier );
302+ }
303+ }
304+ }).when (fileSettingsService ).processFileChanges ();
305+
306+ // Establish the initial valid JSON
307+ fileSettingsService .start ();
308+ fileSettingsService .clusterChanged (new ClusterChangedEvent ("test" , clusterService .state (), ClusterState .EMPTY_STATE ));
309+ awaitOrBust (fileChangeBarrier );
310+
311+ // Now break the JSON
312+ writeTestFile (fileSettingsService .watchedFile (), "test_invalid_JSON" );
313+ awaitOrBust (fileChangeBarrier );
314+
315+ verify (fileSettingsService , times (1 )).processFileOnServiceStart (); // The initial state
316+ verify (fileSettingsService , times (1 )).processFileChanges (); // The changed state
317+ verify (fileSettingsService , times (1 )).onProcessFileChangesException (
318+ argThat (e -> e instanceof ExecutionException && e .getCause () instanceof XContentParseException )
319+ );
320+
321+ // Note: the name "processFileOnServiceStart" is a bit misleading because it is not
322+ // referring to fileSettingsService.start(). Rather, it is referring to the initialization
323+ // of the watcher thread itself, which occurs asynchronously when clusterChanged is first called.
324+ }
325+
326+ private static void awaitOrBust (CyclicBarrier barrier ) {
327+ try {
328+ barrier .await (20 , TimeUnit .SECONDS );
329+ } catch (InterruptedException | BrokenBarrierException | TimeoutException e ) {
330+ throw new AssertionError ("Unexpected exception waiting for barrier" , e );
331+ }
332+ }
333+
265334 @ SuppressWarnings ("unchecked" )
266335 public void testStopWorksInMiddleOfProcessing () throws Exception {
267336 CountDownLatch processFileLatch = new CountDownLatch (1 );
@@ -356,10 +425,10 @@ private static void writeTestFile(Path path, String contents) throws IOException
356425 Path tempFilePath = createTempFile ();
357426 Files .writeString (tempFilePath , contents );
358427 try {
359- Files .move (tempFilePath , path , ATOMIC_MOVE );
428+ Files .move (tempFilePath , path , REPLACE_EXISTING , ATOMIC_MOVE );
360429 } catch (AtomicMoveNotSupportedException e ) {
361430 logger .info ("Atomic move not available. Falling back on non-atomic move to write [{}]" , path .toAbsolutePath ());
362- Files .move (tempFilePath , path );
431+ Files .move (tempFilePath , path , REPLACE_EXISTING );
363432 }
364433 }
365434
@@ -374,4 +443,5 @@ private static void longAwait(CountDownLatch latch) {
374443 fail (e , "longAwait: interrupted waiting for CountDownLatch to reach zero" );
375444 }
376445 }
446+
377447}
0 commit comments