Skip to content

Commit c9f995a

Browse files
authored
Log reindexing failures (#112676)
Wait for reindexing tasks to finish during shutdown for an amount of time defined by settings. Also log the number of reindexing tasks still in flight after the wait.
1 parent 60678a1 commit c9f995a

File tree

5 files changed

+332
-116
lines changed

5 files changed

+332
-116
lines changed
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.reindex;
11+
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
14+
import org.elasticsearch.common.settings.Settings;
15+
import org.elasticsearch.core.TimeValue;
16+
import org.elasticsearch.node.ShutdownPrepareService;
17+
import org.elasticsearch.plugins.Plugin;
18+
import org.elasticsearch.reindex.ReindexPlugin;
19+
import org.elasticsearch.tasks.TaskInfo;
20+
import org.elasticsearch.tasks.TaskManager;
21+
import org.elasticsearch.test.ESIntegTestCase;
22+
import org.elasticsearch.transport.TransportService;
23+
24+
import java.util.Arrays;
25+
import java.util.Collection;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.stream.Collectors;
28+
import java.util.stream.IntStream;
29+
30+
import static org.elasticsearch.node.ShutdownPrepareService.MAXIMUM_REINDEXING_TIMEOUT_SETTING;
31+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
32+
33+
/**
34+
* Test that a wait added during shutdown is necessary for a large reindexing task to complete.
35+
* The test works as follows:
36+
* 1. Start a large (reasonably long running) reindexing request on the coordinator-only node.
37+
* 2. Check that the reindexing task appears on the coordinating node
38+
* 3. With a 10s timeout value for MAXIMUM_REINDEXING_TIMEOUT_SETTING,
39+
* wait for the reindexing task to complete before closing the node
40+
* 4. Confirm that the reindexing task succeeds with the wait (it will fail without it)
41+
*/
42+
@ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST)
43+
public class ReindexNodeShutdownIT extends ESIntegTestCase {
44+
45+
protected static final String INDEX = "reindex-shutdown-index";
46+
protected static final String DEST_INDEX = "dest-index";
47+
48+
@Override
49+
protected Collection<Class<? extends Plugin>> nodePlugins() {
50+
return Arrays.asList(ReindexPlugin.class);
51+
}
52+
53+
protected ReindexRequestBuilder reindex(String nodeName) {
54+
return new ReindexRequestBuilder(internalCluster().client(nodeName));
55+
}
56+
57+
public void testReindexWithShutdown() throws Exception {
58+
final String masterNodeName = internalCluster().startMasterOnlyNode();
59+
final String dataNodeName = internalCluster().startDataOnlyNode();
60+
61+
final Settings COORD_SETTINGS = Settings.builder()
62+
.put(MAXIMUM_REINDEXING_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(10))
63+
.build();
64+
final String coordNodeName = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
65+
66+
ensureStableCluster(3);
67+
68+
int numDocs = 20000;
69+
createIndex(numDocs);
70+
createReindexTaskAndShutdown(coordNodeName);
71+
checkDestinationIndex(dataNodeName, numDocs);
72+
}
73+
74+
private void createIndex(int numDocs) {
75+
// INDEX will be created on the dataNode
76+
createIndex(INDEX);
77+
78+
logger.debug("setting up [{}] docs", numDocs);
79+
indexRandom(
80+
true,
81+
false,
82+
true,
83+
IntStream.range(0, numDocs)
84+
.mapToObj(i -> prepareIndex(INDEX).setId(String.valueOf(i)).setSource("n", i))
85+
.collect(Collectors.toList())
86+
);
87+
88+
// Checks that the all documents have been indexed and correctly counted
89+
assertHitCount(prepareSearch(INDEX).setSize(0).setTrackTotalHits(true), numDocs);
90+
}
91+
92+
private void createReindexTaskAndShutdown(final String coordNodeName) throws Exception {
93+
AbstractBulkByScrollRequestBuilder<?, ?> builder = reindex(coordNodeName).source(INDEX).destination(DEST_INDEX);
94+
AbstractBulkByScrollRequest<?> reindexRequest = builder.request();
95+
ShutdownPrepareService shutdownPrepareService = internalCluster().getInstance(ShutdownPrepareService.class, coordNodeName);
96+
97+
TaskManager taskManager = internalCluster().getInstance(TransportService.class, coordNodeName).getTaskManager();
98+
99+
// Now execute the reindex action...
100+
ActionListener<BulkByScrollResponse> reindexListener = new ActionListener<BulkByScrollResponse>() {
101+
@Override
102+
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
103+
assertNull(bulkByScrollResponse.getReasonCancelled());
104+
logger.debug(bulkByScrollResponse.toString());
105+
}
106+
107+
@Override
108+
public void onFailure(Exception e) {
109+
logger.debug("Encounterd " + e.toString());
110+
fail(e, "Encounterd " + e.toString());
111+
}
112+
};
113+
internalCluster().client(coordNodeName).execute(ReindexAction.INSTANCE, reindexRequest, reindexListener);
114+
115+
// Check for reindex task to appear in the tasks list and Immediately stop coordinating node
116+
waitForTask(ReindexAction.INSTANCE.name(), coordNodeName);
117+
shutdownPrepareService.prepareForShutdown(taskManager);
118+
internalCluster().stopNode(coordNodeName);
119+
}
120+
121+
// Make sure all documents from the source index have been reindexed into the destination index
122+
private void checkDestinationIndex(String dataNodeName, int numDocs) throws Exception {
123+
assertTrue(indexExists(DEST_INDEX));
124+
flushAndRefresh(DEST_INDEX);
125+
assertBusy(() -> { assertHitCount(prepareSearch(DEST_INDEX).setSize(0).setTrackTotalHits(true), numDocs); });
126+
}
127+
128+
private static void waitForTask(String actionName, String nodeName) throws Exception {
129+
assertBusy(() -> {
130+
ListTasksResponse tasks = clusterAdmin().prepareListTasks(nodeName).setActions(actionName).setDetailed(true).get();
131+
tasks.rethrowFailures("Find my task");
132+
for (TaskInfo taskInfo : tasks.getTasks()) {
133+
// Skip tasks with a parent because those are children of the task we want
134+
if (taskInfo.parentTaskId().isSet() == false) return;
135+
}
136+
fail("Couldn't find task after waiting, tasks=" + tasks.getTasks());
137+
}, 10, TimeUnit.SECONDS);
138+
}
139+
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
import org.elasticsearch.monitor.process.ProcessService;
109109
import org.elasticsearch.node.Node;
110110
import org.elasticsearch.node.NodeRoleSettings;
111+
import org.elasticsearch.node.ShutdownPrepareService;
111112
import org.elasticsearch.persistent.PersistentTasksClusterService;
112113
import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
113114
import org.elasticsearch.plugins.PluginsService;
@@ -456,6 +457,8 @@ public void apply(Settings value, Settings current, Settings previous) {
456457
Environment.PATH_SHARED_DATA_SETTING,
457458
NodeEnvironment.NODE_ID_SEED_SETTING,
458459
Node.INITIAL_STATE_TIMEOUT_SETTING,
460+
ShutdownPrepareService.MAXIMUM_SHUTDOWN_TIMEOUT_SETTING,
461+
ShutdownPrepareService.MAXIMUM_REINDEXING_TIMEOUT_SETTING,
459462
DiscoveryModule.DISCOVERY_TYPE_SETTING,
460463
DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING,
461464
DiscoveryModule.ELECTION_STRATEGY_SETTING,

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 3 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,6 @@
1313
import org.apache.logging.log4j.Logger;
1414
import org.apache.lucene.util.SetOnce;
1515
import org.elasticsearch.ElasticsearchTimeoutException;
16-
import org.elasticsearch.action.search.TransportSearchAction;
17-
import org.elasticsearch.action.support.PlainActionFuture;
18-
import org.elasticsearch.action.support.RefCountingListener;
19-
import org.elasticsearch.action.support.SubscribableListener;
2016
import org.elasticsearch.bootstrap.BootstrapCheck;
2117
import org.elasticsearch.bootstrap.BootstrapContext;
2218
import org.elasticsearch.client.internal.Client;
@@ -82,7 +78,6 @@
8278
import org.elasticsearch.snapshots.SnapshotShardsService;
8379
import org.elasticsearch.snapshots.SnapshotsService;
8480
import org.elasticsearch.tasks.TaskCancellationService;
85-
import org.elasticsearch.tasks.TaskManager;
8681
import org.elasticsearch.tasks.TaskResultsService;
8782
import org.elasticsearch.threadpool.ThreadPool;
8883
import org.elasticsearch.transport.RemoteClusterPortSettings;
@@ -106,18 +101,12 @@
106101
import java.util.List;
107102
import java.util.Map;
108103
import java.util.concurrent.CountDownLatch;
109-
import java.util.concurrent.ExecutionException;
110104
import java.util.concurrent.TimeUnit;
111-
import java.util.concurrent.TimeoutException;
112105
import java.util.function.BiConsumer;
113106
import java.util.function.Function;
114-
import java.util.function.Supplier;
115-
import java.util.stream.Collectors;
116107

117108
import javax.net.ssl.SNIHostName;
118109

119-
import static org.elasticsearch.core.Strings.format;
120-
121110
/**
122111
* A node represent a node within a cluster ({@code cluster.name}). The {@link #client()} can be used
123112
* in order to use a {@link Client} to perform actions/operations against the cluster.
@@ -161,12 +150,6 @@ public class Node implements Closeable {
161150
Property.NodeScope
162151
);
163152

164-
public static final Setting<TimeValue> MAXIMUM_SHUTDOWN_TIMEOUT_SETTING = Setting.positiveTimeSetting(
165-
"node.maximum_shutdown_grace_period",
166-
TimeValue.ZERO,
167-
Setting.Property.NodeScope
168-
);
169-
170153
private final Lifecycle lifecycle = new Lifecycle();
171154

172155
/**
@@ -187,6 +170,7 @@ public class Node implements Closeable {
187170
private final LocalNodeFactory localNodeFactory;
188171
private final NodeService nodeService;
189172
private final TerminationHandler terminationHandler;
173+
190174
// for testing
191175
final NamedWriteableRegistry namedWriteableRegistry;
192176
final NamedXContentRegistry namedXContentRegistry;
@@ -606,105 +590,8 @@ public synchronized void close() throws IOException {
606590
* logic should use Node Shutdown, see {@link org.elasticsearch.cluster.metadata.NodesShutdownMetadata}.
607591
*/
608592
public void prepareForClose() {
609-
final var maxTimeout = MAXIMUM_SHUTDOWN_TIMEOUT_SETTING.get(this.settings());
610-
611-
record Stopper(String name, SubscribableListener<Void> listener) {
612-
boolean isIncomplete() {
613-
return listener().isDone() == false;
614-
}
615-
}
616-
617-
final var stoppers = new ArrayList<Stopper>();
618-
final var allStoppersFuture = new PlainActionFuture<Void>();
619-
try (var listeners = new RefCountingListener(allStoppersFuture)) {
620-
final BiConsumer<String, Runnable> stopperRunner = (name, action) -> {
621-
final var stopper = new Stopper(name, new SubscribableListener<>());
622-
stoppers.add(stopper);
623-
stopper.listener().addListener(listeners.acquire());
624-
new Thread(() -> {
625-
try {
626-
action.run();
627-
} catch (Exception ex) {
628-
logger.warn("unexpected exception in shutdown task [" + stopper.name() + "]", ex);
629-
} finally {
630-
stopper.listener().onResponse(null);
631-
}
632-
}, stopper.name()).start();
633-
};
634-
635-
stopperRunner.accept("http-server-transport-stop", injector.getInstance(HttpServerTransport.class)::close);
636-
stopperRunner.accept("async-search-stop", () -> awaitSearchTasksComplete(maxTimeout));
637-
if (terminationHandler != null) {
638-
stopperRunner.accept("termination-handler-stop", terminationHandler::handleTermination);
639-
}
640-
}
641-
642-
final Supplier<String> incompleteStoppersDescriber = () -> stoppers.stream()
643-
.filter(Stopper::isIncomplete)
644-
.map(Stopper::name)
645-
.collect(Collectors.joining(", ", "[", "]"));
646-
647-
try {
648-
if (TimeValue.ZERO.equals(maxTimeout)) {
649-
allStoppersFuture.get();
650-
} else {
651-
allStoppersFuture.get(maxTimeout.millis(), TimeUnit.MILLISECONDS);
652-
}
653-
} catch (ExecutionException e) {
654-
assert false : e; // listeners are never completed exceptionally
655-
logger.warn("failed during graceful shutdown tasks", e);
656-
} catch (InterruptedException e) {
657-
Thread.currentThread().interrupt();
658-
logger.warn("interrupted while waiting for graceful shutdown tasks: " + incompleteStoppersDescriber.get(), e);
659-
} catch (TimeoutException e) {
660-
logger.warn("timed out while waiting for graceful shutdown tasks: " + incompleteStoppersDescriber.get());
661-
}
662-
}
663-
664-
private void awaitSearchTasksComplete(TimeValue asyncSearchTimeout) {
665-
TaskManager taskManager = injector.getInstance(TransportService.class).getTaskManager();
666-
long millisWaited = 0;
667-
while (true) {
668-
long searchTasksRemaining = taskManager.getTasks()
669-
.values()
670-
.stream()
671-
.filter(task -> TransportSearchAction.TYPE.name().equals(task.getAction()))
672-
.count();
673-
if (searchTasksRemaining == 0) {
674-
logger.debug("all search tasks complete");
675-
return;
676-
} else {
677-
// Let the system work on those searches for a while. We're on a dedicated thread to manage app shutdown, so we
678-
// literally just want to wait and not take up resources on this thread for now. Poll period chosen to allow short
679-
// response times, but checking the tasks list is relatively expensive, and we don't want to waste CPU time we could
680-
// be spending on finishing those searches.
681-
final TimeValue pollPeriod = TimeValue.timeValueMillis(500);
682-
millisWaited += pollPeriod.millis();
683-
if (TimeValue.ZERO.equals(asyncSearchTimeout) == false && millisWaited >= asyncSearchTimeout.millis()) {
684-
logger.warn(
685-
format(
686-
"timed out after waiting [%s] for [%d] search tasks to finish",
687-
asyncSearchTimeout.toString(),
688-
searchTasksRemaining
689-
)
690-
);
691-
return;
692-
}
693-
logger.debug(format("waiting for [%s] search tasks to finish, next poll in [%s]", searchTasksRemaining, pollPeriod));
694-
try {
695-
Thread.sleep(pollPeriod.millis());
696-
} catch (InterruptedException ex) {
697-
logger.warn(
698-
format(
699-
"interrupted while waiting [%s] for [%d] search tasks to finish",
700-
asyncSearchTimeout.toString(),
701-
searchTasksRemaining
702-
)
703-
);
704-
return;
705-
}
706-
}
707-
}
593+
injector.getInstance(ShutdownPrepareService.class)
594+
.prepareForShutdown(injector.getInstance(TransportService.class).getTaskManager());
708595
}
709596

710597
/**

server/src/main/java/org/elasticsearch/node/NodeConstruction.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1099,6 +1099,8 @@ private void construct(
10991099
telemetryProvider.getTracer()
11001100
);
11011101

1102+
final ShutdownPrepareService shutdownPrepareService = new ShutdownPrepareService(settings, httpServerTransport, terminationHandler);
1103+
11021104
modules.add(
11031105
loadPersistentTasksService(
11041106
settingsModule,
@@ -1200,6 +1202,7 @@ private void construct(
12001202
b.bind(CompatibilityVersions.class).toInstance(compatibilityVersions);
12011203
b.bind(DataStreamAutoShardingService.class).toInstance(dataStreamAutoShardingService);
12021204
b.bind(FailureStoreMetrics.class).toInstance(failureStoreMetrics);
1205+
b.bind(ShutdownPrepareService.class).toInstance(shutdownPrepareService);
12031206
});
12041207

12051208
if (ReadinessService.enabled(environment)) {

0 commit comments

Comments
 (0)