Skip to content

Commit f429eef

Browse files
Merge remote-tracking branch 'elastic/main' into batched-exec-short
2 parents cddd698 + bda99c9 commit f429eef

File tree

20 files changed

+240
-85
lines changed

20 files changed

+240
-85
lines changed

docs/changelog/121119.yaml

Lines changed: 0 additions & 5 deletions
This file was deleted.

docs/changelog/121327.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 121327
2+
summary: Reduce Data Loss in System Indices Migration
3+
area: Infra/Core
4+
type: bug
5+
issues: []

modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/AbstractFeatureMigrationIntegTest.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,17 @@
99

1010
package org.elasticsearch.migration;
1111

12+
import org.elasticsearch.ElasticsearchException;
1213
import org.elasticsearch.Version;
1314
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.action.ActionRequest;
1416
import org.elasticsearch.action.admin.cluster.migration.TransportGetFeatureUpgradeStatusAction;
1517
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
1618
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
1719
import org.elasticsearch.action.admin.indices.stats.IndexStats;
1820
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
1921
import org.elasticsearch.action.index.IndexRequestBuilder;
22+
import org.elasticsearch.action.support.ActionFilter;
2023
import org.elasticsearch.action.support.ActiveShardCount;
2124
import org.elasticsearch.client.internal.Client;
2225
import org.elasticsearch.cluster.ClusterState;
@@ -28,6 +31,7 @@
2831
import org.elasticsearch.index.IndexVersion;
2932
import org.elasticsearch.indices.AssociatedIndexDescriptor;
3033
import org.elasticsearch.indices.SystemIndexDescriptor;
34+
import org.elasticsearch.plugins.ActionPlugin;
3135
import org.elasticsearch.plugins.Plugin;
3236
import org.elasticsearch.plugins.PluginsService;
3337
import org.elasticsearch.plugins.SystemIndexPlugin;
@@ -50,6 +54,10 @@
5054
import java.util.function.BiConsumer;
5155
import java.util.function.Function;
5256

57+
import static java.util.Collections.emptySet;
58+
import static java.util.Collections.singletonList;
59+
import static java.util.Collections.unmodifiableSet;
60+
import static org.elasticsearch.common.util.set.Sets.newHashSet;
5361
import static org.hamcrest.Matchers.containsInAnyOrder;
5462
import static org.hamcrest.Matchers.endsWith;
5563
import static org.hamcrest.Matchers.equalTo;
@@ -255,12 +263,18 @@ protected void assertIndexHasCorrectProperties(
255263
assertThat(thisIndexStats.getTotal().getDocs().getCount(), is((long) INDEX_DOC_COUNT));
256264
}
257265

258-
public static class TestPlugin extends Plugin implements SystemIndexPlugin {
266+
public static class TestPlugin extends Plugin implements SystemIndexPlugin, ActionPlugin {
259267
public final AtomicReference<Function<ClusterState, Map<String, Object>>> preMigrationHook = new AtomicReference<>();
260268
public final AtomicReference<BiConsumer<ClusterState, Map<String, Object>>> postMigrationHook = new AtomicReference<>();
269+
private final BlockingActionFilter blockingActionFilter;
261270

262271
public TestPlugin() {
272+
blockingActionFilter = new BlockingActionFilter();
273+
}
263274

275+
@Override
276+
public List<ActionFilter> getActionFilters() {
277+
return singletonList(blockingActionFilter);
264278
}
265279

266280
@Override
@@ -299,5 +313,26 @@ public void indicesMigrationComplete(
299313
postMigrationHook.get().accept(clusterService.state(), preUpgradeMetadata);
300314
listener.onResponse(true);
301315
}
316+
317+
public static class BlockingActionFilter extends org.elasticsearch.action.support.ActionFilter.Simple {
318+
private Set<String> blockedActions = emptySet();
319+
320+
@Override
321+
protected boolean apply(String action, ActionRequest request, ActionListener<?> listener) {
322+
if (blockedActions.contains(action)) {
323+
throw new ElasticsearchException("force exception on [" + action + "]");
324+
}
325+
return true;
326+
}
327+
328+
@Override
329+
public int order() {
330+
return 0;
331+
}
332+
333+
public void blockActions(String... actions) {
334+
blockedActions = unmodifiableSet(newHashSet(actions));
335+
}
336+
}
302337
}
303338
}

modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/FeatureMigrationIT.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@
1717
import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeRequest;
1818
import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeResponse;
1919
import org.elasticsearch.action.admin.indices.alias.Alias;
20+
import org.elasticsearch.action.admin.indices.alias.TransportIndicesAliasesAction;
2021
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
2122
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
2223
import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
2324
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
2425
import org.elasticsearch.action.search.SearchRequestBuilder;
26+
import org.elasticsearch.action.support.ActionFilter;
27+
import org.elasticsearch.action.support.ActionFilters;
2528
import org.elasticsearch.action.support.ActiveShardCount;
2629
import org.elasticsearch.cluster.ClusterState;
2730
import org.elasticsearch.cluster.ClusterStateUpdateTask;
@@ -36,10 +39,12 @@
3639
import org.elasticsearch.common.settings.Settings;
3740
import org.elasticsearch.index.query.QueryBuilders;
3841
import org.elasticsearch.indices.SystemIndexDescriptor;
42+
import org.elasticsearch.migration.AbstractFeatureMigrationIntegTest.TestPlugin.BlockingActionFilter;
3943
import org.elasticsearch.painless.PainlessPlugin;
4044
import org.elasticsearch.plugins.Plugin;
4145
import org.elasticsearch.plugins.SystemIndexPlugin;
4246
import org.elasticsearch.reindex.ReindexPlugin;
47+
import org.elasticsearch.test.InternalTestCluster;
4348
import org.elasticsearch.upgrades.FeatureMigrationResults;
4449
import org.elasticsearch.upgrades.SingleFeatureMigrationResult;
4550

@@ -272,6 +277,60 @@ public void testMigrateIndexWithWriteBlock() throws Exception {
272277
});
273278
}
274279

280+
@AwaitsFix(bugUrl = "ES-10666") // This test uncovered an existing issue
281+
public void testIndexBlockIsRemovedWhenAliasRequestFails() throws Exception {
282+
createSystemIndexForDescriptor(INTERNAL_UNMANAGED);
283+
ensureGreen();
284+
285+
// Block the alias request to simulate a failure
286+
InternalTestCluster internalTestCluster = internalCluster();
287+
ActionFilters actionFilters = internalTestCluster.getInstance(ActionFilters.class, internalTestCluster.getMasterName());
288+
BlockingActionFilter blockingActionFilter = null;
289+
for (ActionFilter filter : actionFilters.filters()) {
290+
if (filter instanceof BlockingActionFilter) {
291+
blockingActionFilter = (BlockingActionFilter) filter;
292+
break;
293+
}
294+
}
295+
assertNotNull("BlockingActionFilter should exist", blockingActionFilter);
296+
blockingActionFilter.blockActions(TransportIndicesAliasesAction.NAME);
297+
298+
// Start the migration
299+
client().execute(PostFeatureUpgradeAction.INSTANCE, new PostFeatureUpgradeRequest(TEST_REQUEST_TIMEOUT)).get();
300+
301+
// Wait till the migration fails
302+
assertBusy(() -> {
303+
GetFeatureUpgradeStatusResponse statusResp = client().execute(
304+
GetFeatureUpgradeStatusAction.INSTANCE,
305+
new GetFeatureUpgradeStatusRequest(TEST_REQUEST_TIMEOUT)
306+
).get();
307+
logger.info(Strings.toString(statusResp));
308+
assertThat(statusResp.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.ERROR));
309+
});
310+
311+
// Get the settings to see if the write block was removed
312+
var allsettings = client().admin().indices().prepareGetSettings(INTERNAL_UNMANAGED.getIndexPattern()).get().getIndexToSettings();
313+
var internalUnmanagedOldIndexSettings = allsettings.get(".int-unman-old");
314+
var writeBlock = internalUnmanagedOldIndexSettings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey());
315+
assertThat("Write block on old index should be removed on migration ERROR status", writeBlock, equalTo("false"));
316+
317+
// Unblock the alias request
318+
blockingActionFilter.blockActions();
319+
320+
// Retry the migration
321+
client().execute(PostFeatureUpgradeAction.INSTANCE, new PostFeatureUpgradeRequest(TEST_REQUEST_TIMEOUT)).get();
322+
323+
// Ensure that the migration is successful after the alias request is unblocked
324+
assertBusy(() -> {
325+
GetFeatureUpgradeStatusResponse statusResp = client().execute(
326+
GetFeatureUpgradeStatusAction.INSTANCE,
327+
new GetFeatureUpgradeStatusRequest(TEST_REQUEST_TIMEOUT)
328+
).get();
329+
logger.info(Strings.toString(statusResp));
330+
assertThat(statusResp.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.NO_MIGRATION_NEEDED));
331+
});
332+
}
333+
275334
public void testMigrationWillRunAfterError() throws Exception {
276335
createSystemIndexForDescriptor(INTERNAL_MANAGED);
277336

muted-tests.yml

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,6 @@ tests:
104104
- class: org.elasticsearch.discovery.ClusterDisruptionIT
105105
method: testAckedIndexing
106106
issue: https://github.com/elastic/elasticsearch/issues/117024
107-
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
108-
method: test {p0=snapshot/10_basic/Create a source only snapshot and then restore it}
109-
issue: https://github.com/elastic/elasticsearch/issues/117295
110107
- class: org.elasticsearch.xpack.inference.InferenceRestIT
111108
method: test {p0=inference/40_semantic_text_query/Query a field that uses the default ELSER 2 endpoint}
112109
issue: https://github.com/elastic/elasticsearch/issues/117027
@@ -364,18 +361,13 @@ tests:
364361
- class: org.elasticsearch.analysis.common.CommonAnalysisClientYamlTestSuiteIT
365362
method: test {yaml=analysis-common/40_token_filters/stemmer_override file access}
366363
issue: https://github.com/elastic/elasticsearch/issues/121625
367-
- class: org.elasticsearch.xpack.esql.plugin.DataNodeRequestSenderTests
368-
method: testDoNotRetryOnRequestLevelFailure
369-
issue: https://github.com/elastic/elasticsearch/issues/121966
370364
- class: org.elasticsearch.xpack.searchablesnapshots.hdfs.SecureHdfsSearchableSnapshotsIT
371365
issue: https://github.com/elastic/elasticsearch/issues/121967
372366
- class: org.elasticsearch.action.search.SearchQueryThenFetchAsyncActionTests
373367
method: testBottomFieldSort
374368
issue: https://github.com/elastic/elasticsearch/issues/121503
375369
- class: org.elasticsearch.xpack.application.CohereServiceUpgradeIT
376370
issue: https://github.com/elastic/elasticsearch/issues/121537
377-
- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT
378-
issue: https://github.com/elastic/elasticsearch/issues/121737
379371
- class: org.elasticsearch.xpack.restart.FullClusterRestartIT
380372
method: testWatcherWithApiKey {cluster=UPGRADED}
381373
issue: https://github.com/elastic/elasticsearch/issues/122061
@@ -391,16 +383,8 @@ tests:
391383
- class: org.elasticsearch.xpack.ml.integration.ClassificationIT
392384
method: testWithOnlyTrainingRowsAndTrainingPercentIsFifty_DependentVariableIsBoolean
393385
issue: https://github.com/elastic/elasticsearch/issues/121680
394-
- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT
395-
issue: https://github.com/elastic/elasticsearch/issues/122153
396386
- class: org.elasticsearch.xpack.esql.qa.single_node.EsqlSpecIT
397387
issue: https://github.com/elastic/elasticsearch/issues/121411
398-
- class: org.elasticsearch.xpack.esql.action.EsqlNodeFailureIT
399-
method: testFailureLoadingFields
400-
issue: https://github.com/elastic/elasticsearch/issues/122132
401-
- class: org.elasticsearch.xpack.esql.action.CrossClustersCancellationIT
402-
method: testCloseSkipUnavailable
403-
issue: https://github.com/elastic/elasticsearch/issues/121627
404388
- class: org.elasticsearch.xpack.downsample.DownsampleActionSingleNodeTests
405389
method: testDuplicateDownsampleRequest
406390
issue: https://github.com/elastic/elasticsearch/issues/122158
@@ -416,6 +400,17 @@ tests:
416400
- class: org.elasticsearch.xpack.esql.analysis.VerifierTests
417401
method: testChangePoint_valueNumeric
418402
issue: https://github.com/elastic/elasticsearch/issues/122181
403+
- class: org.elasticsearch.datastreams.TSDBPassthroughIndexingIT
404+
issue: https://github.com/elastic/elasticsearch/issues/121716
405+
- class: org.elasticsearch.smoketest.SmokeTestMonitoringWithSecurityIT
406+
method: testHTTPExporterWithSSL
407+
issue: https://github.com/elastic/elasticsearch/issues/122220
408+
- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryStopIT
409+
method: testStopQueryLocal
410+
issue: https://github.com/elastic/elasticsearch/issues/121672
411+
- class: org.elasticsearch.xpack.security.authz.IndexAliasesTests
412+
method: testRemoveIndex
413+
issue: https://github.com/elastic/elasticsearch/issues/122221
419414

420415
# Examples:
421416
#

server/src/internalClusterTest/java/org/elasticsearch/search/SearchTimeoutIT.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -240,13 +240,14 @@ public float score() {
240240
for (int doc = min; doc < max; ++doc) {
241241
if (acceptDocs == null || acceptDocs.get(doc)) {
242242
collector.collect(doc);
243-
// collect one doc, then throw a timeout, this ensures partial results will be returned
243+
// collect one doc per segment, only then throw a timeout: this ensures partial
244+
// results are returned
244245
((ContextIndexSearcher) searcher).throwTimeExceededException();
245246
}
246247
}
247-
throw new AssertionError(
248-
"Should have thrown a time exceeded exception: [min: " + min + " - max: " + max + "]"
249-
);
248+
// there is a slight chance that no docs are scored for a specific segment.
249+
// other shards / slices will throw the timeout anyway, one is enough.
250+
return max == maxDoc ? DocIdSetIterator.NO_MORE_DOCS : max;
250251
}
251252

252253
@Override

server/src/main/java/org/elasticsearch/action/admin/indices/alias/IndicesAliasesResponse.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,17 @@ public boolean hasErrors() {
7777
return errors;
7878
}
7979

80+
/**
81+
* Get a list of all errors from the response. If there are no errors, an empty list is returned.
82+
*/
83+
public List<ElasticsearchException> getErrors() {
84+
if (errors == false) {
85+
return List.of();
86+
} else {
87+
return actionResults.stream().filter(a -> a.getError() != null).map(AliasActionResult::getError).toList();
88+
}
89+
}
90+
8091
/**
8192
* Build a response from a list of action results. Sets the errors boolean based
8293
* on whether an of the individual results contain an error.
@@ -165,6 +176,13 @@ public static AliasActionResult buildSuccess(List<String> indices, AliasActions
165176
return new AliasActionResult(indices, action, null);
166177
}
167178

179+
/**
180+
* The error result if the action failed, null if the action succeeded.
181+
*/
182+
public ElasticsearchException getError() {
183+
return error;
184+
}
185+
168186
private int getStatus() {
169187
return error == null ? 200 : error.status().getStatus();
170188
}

0 commit comments

Comments
 (0)