Skip to content

Commit ada3529

Browse files
authored
Handling timeout exceptions on watcher startup (#90421) (#90480)
Right now if watcher throws an exception while starting up (for example a TimeoutException while waiting for a refresh of .watches or .triggered_watches to complete) then watcher gets into a state where it will never be restarted automatically, and is incredibly difficult to start manually. This PR catches those exceptions and sets the state to STOPPED so that when the next cluster change event comes through it will attempt to start watcher again.
1 parent 5c362a2 commit ada3529

File tree

5 files changed

+148
-14
lines changed

5 files changed

+148
-14
lines changed

docs/changelog/90421.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 90421
2+
summary: Handling timeout exceptions on watcher startup
3+
area: Watcher
4+
type: bug
5+
issues:
6+
- 44981

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,11 @@ public void clusterChanged(ClusterChangedEvent event) {
101101
// if this is not a data node, we need to start it ourselves possibly
102102
if (event.state().nodes().getLocalNode().canContainData() == false && isWatcherStoppedManually == false && isStoppedOrStopping) {
103103
this.state.set(WatcherState.STARTING);
104-
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED));
104+
watcherService.start(
105+
event.state(),
106+
() -> this.state.set(WatcherState.STARTED),
107+
(exception -> this.state.set(WatcherState.STOPPED))
108+
);
105109
return;
106110
}
107111

@@ -165,7 +169,10 @@ public void clusterChanged(ClusterChangedEvent event) {
165169
watcherService.reload(event.state(), "new local watcher shard allocation ids");
166170
} else if (isStoppedOrStopping) {
167171
this.state.set(WatcherState.STARTING);
168-
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED));
172+
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED), (exception) -> {
173+
clearAllocationIds();
174+
this.state.set(WatcherState.STOPPED);
175+
});
169176
}
170177
} else {
171178
clearAllocationIds();

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -231,14 +231,17 @@ void reload(ClusterState state, String reason) {
231231
* @param state the current cluster state
232232
* @param postWatchesLoadedCallback the callback to be triggered, when watches where loaded successfully
233233
*/
234-
public void start(ClusterState state, Runnable postWatchesLoadedCallback) {
234+
public void start(ClusterState state, Runnable postWatchesLoadedCallback, Consumer<Exception> exceptionConsumer) {
235235
executionService.unPause();
236236
processedClusterStateVersion.set(state.getVersion());
237237
executor.execute(wrapWatcherService(() -> {
238238
if (reloadInner(state, "starting", true)) {
239239
postWatchesLoadedCallback.run();
240240
}
241-
}, e -> logger.error("error starting watcher", e)));
241+
}, e -> {
242+
logger.error("error starting watcher", e);
243+
exceptionConsumer.accept(e);
244+
}));
242245
}
243246

244247
/**
@@ -312,13 +315,7 @@ private Collection<Watch> loadWatches(ClusterState clusterState) {
312315
SearchResponse response = null;
313316
List<Watch> watches = new ArrayList<>();
314317
try {
315-
RefreshResponse refreshResponse = client.admin()
316-
.indices()
317-
.refresh(new RefreshRequest(INDEX))
318-
.actionGet(TimeValue.timeValueSeconds(5));
319-
if (refreshResponse.getSuccessfulShards() < indexMetadata.getNumberOfShards()) {
320-
throw illegalState("not all required shards have been refreshed");
321-
}
318+
refreshWatches(indexMetadata);
322319

323320
// find out local shards
324321
String watchIndexName = indexMetadata.getIndex().getName();
@@ -410,6 +407,17 @@ private Collection<Watch> loadWatches(ClusterState clusterState) {
410407
return watches;
411408
}
412409

410+
// Non private for unit testing purposes
411+
void refreshWatches(IndexMetadata indexMetadata) {
412+
RefreshResponse refreshResponse = client.admin()
413+
.indices()
414+
.refresh(new RefreshRequest(INDEX))
415+
.actionGet(TimeValue.timeValueSeconds(5));
416+
if (refreshResponse.getSuccessfulShards() < indexMetadata.getNumberOfShards()) {
417+
throw illegalState("not all required shards have been refreshed");
418+
}
419+
}
420+
413421
/**
414422
* Find out if the watch with this id, should be parsed and triggered on this node
415423
*

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
*/
77
package org.elasticsearch.xpack.watcher;
88

9+
import org.elasticsearch.ElasticsearchTimeoutException;
910
import org.elasticsearch.Version;
1011
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
1112
import org.elasticsearch.cluster.ClusterChangedEvent;
@@ -41,13 +42,17 @@
4142
import java.util.Collections;
4243
import java.util.HashSet;
4344
import java.util.List;
45+
import java.util.concurrent.TimeoutException;
46+
import java.util.concurrent.atomic.AtomicBoolean;
47+
import java.util.function.Consumer;
4448
import java.util.stream.Collectors;
4549
import java.util.stream.IntStream;
4650

4751
import static java.util.Arrays.asList;
4852
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
4953
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
5054
import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.HISTORY_TEMPLATE_NAME;
55+
import static org.hamcrest.Matchers.equalTo;
5156
import static org.hamcrest.Matchers.hasSize;
5257
import static org.hamcrest.Matchers.is;
5358
import static org.mockito.ArgumentMatchers.any;
@@ -174,14 +179,79 @@ public void testManualStartStop() {
174179
reset(watcherService);
175180
when(watcherService.validate(clusterState)).thenReturn(true);
176181
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, stoppedClusterState));
177-
verify(watcherService, times(1)).start(eq(clusterState), any());
182+
verify(watcherService, times(1)).start(eq(clusterState), any(), any());
178183

179184
// no change, keep going
180185
reset(watcherService);
181186
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
182187
verifyNoMoreInteractions(watcherService);
183188
}
184189

190+
@SuppressWarnings("unchecked")
191+
public void testExceptionOnStart() {
192+
/*
193+
* This tests that if watcher fails to start because of some exception (for example a timeout while refreshing indices) that it
194+
* will fail gracefully, and will start the next time there is a cluster change event if there is no exception that time.
195+
*/
196+
Index index = new Index(Watch.INDEX, "uuid");
197+
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
198+
indexRoutingTableBuilder.addShard(
199+
TestShardRouting.newShardRouting(new ShardId(index, 0), "node_1", true, ShardRoutingState.STARTED)
200+
);
201+
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(Watch.INDEX)
202+
.settings(settings(Version.CURRENT).put(IndexMetadata.INDEX_FORMAT_SETTING.getKey(), 6)) // the internal index format, required
203+
.numberOfShards(1)
204+
.numberOfReplicas(0);
205+
Metadata.Builder metadataBuilder = Metadata.builder()
206+
.put(indexMetadataBuilder)
207+
.put(IndexTemplateMetadata.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns()));
208+
if (randomBoolean()) {
209+
metadataBuilder.putCustom(WatcherMetadata.TYPE, new WatcherMetadata(false));
210+
}
211+
Metadata metadata = metadataBuilder.build();
212+
IndexRoutingTable indexRoutingTable = indexRoutingTableBuilder.build();
213+
ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster"))
214+
.nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1")))
215+
.routingTable(RoutingTable.builder().add(indexRoutingTable).build())
216+
.metadata(metadata)
217+
.build();
218+
219+
// mark watcher manually as stopped
220+
ClusterState stoppedClusterState = ClusterState.builder(new ClusterName("my-cluster"))
221+
.nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1")))
222+
.routingTable(RoutingTable.builder().add(indexRoutingTable).build())
223+
.metadata(Metadata.builder(metadata).putCustom(WatcherMetadata.TYPE, new WatcherMetadata(true)).build())
224+
.build();
225+
226+
lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", stoppedClusterState, clusterState));
227+
assertThat(lifeCycleService.getState().get(), equalTo(WatcherState.STOPPING));
228+
229+
// Now attempt to start watcher with a simulated TimeoutException. Should be stopped
230+
when(watcherService.validate(clusterState)).thenReturn(true);
231+
AtomicBoolean exceptionHit = new AtomicBoolean(false);
232+
doAnswer(invocation -> {
233+
Consumer<Exception> exceptionConsumer = invocation.getArgument(2);
234+
exceptionConsumer.accept(new ElasticsearchTimeoutException(new TimeoutException("Artificial timeout")));
235+
exceptionHit.set(true);
236+
return null;
237+
}).when(watcherService).start(any(), any(), any(Consumer.class));
238+
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
239+
assertTrue("Expected simulated timeout not hit", exceptionHit.get());
240+
assertThat(lifeCycleService.getState().get(), equalTo(WatcherState.STOPPED));
241+
242+
// And now attempt to start watcher with no exception. It should start up.
243+
AtomicBoolean runnableCalled = new AtomicBoolean(false);
244+
doAnswer(invocation -> {
245+
Runnable runnable = invocation.getArgument(1);
246+
runnable.run();
247+
runnableCalled.set(true);
248+
return null;
249+
}).when(watcherService).start(any(), any(Runnable.class), any());
250+
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
251+
assertTrue("Runnable not called", runnableCalled.get());
252+
assertThat(lifeCycleService.getState().get(), equalTo(WatcherState.STARTED));
253+
}
254+
185255
public void testNoLocalShards() {
186256
Index watchIndex = new Index(Watch.INDEX, "foo");
187257
ShardId shardId = new ShardId(watchIndex, 0);
@@ -443,7 +513,7 @@ public void testWatcherServiceDoesNotStartIfIndexTemplatesAreMissing() throws Ex
443513
when(watcherService.validate(eq(state))).thenReturn(true);
444514

445515
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", state, state));
446-
verify(watcherService, times(0)).start(any(ClusterState.class), any());
516+
verify(watcherService, times(0)).start(any(ClusterState.class), any(), any());
447517
}
448518

449519
public void testWatcherStopsWhenMasterNodeIsMissing() {

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package org.elasticsearch.xpack.watcher;
88

99
import org.apache.lucene.search.TotalHits;
10+
import org.elasticsearch.ElasticsearchTimeoutException;
1011
import org.elasticsearch.Version;
1112
import org.elasticsearch.action.ActionListener;
1213
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
@@ -65,7 +66,10 @@
6566
import java.time.ZonedDateTime;
6667
import java.util.Collections;
6768
import java.util.List;
69+
import java.util.concurrent.TimeoutException;
70+
import java.util.concurrent.atomic.AtomicReference;
6871

72+
import static org.hamcrest.Matchers.equalTo;
6973
import static org.hamcrest.Matchers.hasSize;
7074
import static org.hamcrest.Matchers.is;
7175
import static org.mockito.ArgumentMatchers.any;
@@ -236,7 +240,7 @@ void stopExecutor() {}
236240
return null;
237241
}).when(client).execute(eq(ClearScrollAction.INSTANCE), any(ClearScrollRequest.class), anyActionListener());
238242

239-
service.start(clusterState, () -> {});
243+
service.start(clusterState, () -> {}, exception -> {});
240244

241245
ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
242246
verify(triggerService).start(captor.capture());
@@ -245,6 +249,45 @@ void stopExecutor() {}
245249
assertThat(watches, hasSize(activeWatchCount));
246250
}
247251

252+
public void testExceptionHandling() {
253+
/*
254+
* This tests that if the WatcherService throws an exception while refreshing indices that the exception is handled by the
255+
* exception consumer rather than being propagated higher in the stack.
256+
*/
257+
TriggerService triggerService = mock(TriggerService.class);
258+
TriggeredWatchStore triggeredWatchStore = mock(TriggeredWatchStore.class);
259+
ExecutionService executionService = mock(ExecutionService.class);
260+
WatchParser parser = mock(WatchParser.class);
261+
final ElasticsearchTimeoutException exception = new ElasticsearchTimeoutException(new TimeoutException("Artifical timeout"));
262+
WatcherService service = new WatcherService(
263+
Settings.EMPTY,
264+
triggerService,
265+
triggeredWatchStore,
266+
executionService,
267+
parser,
268+
client,
269+
EsExecutors.DIRECT_EXECUTOR_SERVICE
270+
) {
271+
@Override
272+
void refreshWatches(IndexMetadata indexMetadata) {
273+
throw exception;
274+
}
275+
};
276+
277+
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
278+
Metadata.Builder metadataBuilder = Metadata.builder();
279+
Settings indexSettings = settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
280+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
281+
.build();
282+
metadataBuilder.put(IndexMetadata.builder(Watch.INDEX).settings(indexSettings));
283+
csBuilder.metadata(metadataBuilder);
284+
ClusterState clusterState = csBuilder.build();
285+
286+
AtomicReference<Exception> exceptionReference = new AtomicReference<>();
287+
service.start(clusterState, () -> { fail("Excepted an exception"); }, exceptionReference::set);
288+
assertThat(exceptionReference.get(), equalTo(exception));
289+
}
290+
248291
@SuppressWarnings({ "unchecked", "rawtypes" })
249292
public void testPausingWatcherServiceAlsoPausesTriggerService() {
250293
String engineType = "foo";

0 commit comments

Comments
 (0)