Skip to content

Commit 852026c

Browse files
author
Hendrik Muhs
authored
Backports the following commits to 7.17: (#90408)
- [Transform] Don't fail a transform due to ILM closing an index (#90396)
1 parent dd0421e commit 852026c

File tree

2 files changed

+131
-1
lines changed

2 files changed

+131
-1
lines changed

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.action.search.SearchRequest;
1919
import org.elasticsearch.action.search.SearchResponse;
2020
import org.elasticsearch.action.support.IndicesOptions;
21+
import org.elasticsearch.cluster.block.ClusterBlockException;
2122
import org.elasticsearch.common.breaker.CircuitBreakingException;
2223
import org.elasticsearch.common.logging.LoggerMessageFormat;
2324
import org.elasticsearch.common.util.CollectionUtils;
@@ -937,7 +938,8 @@ void handleFailure(Exception e) {
937938
}
938939

939940
// irrecoverable error without special handling
940-
if (unwrappedException instanceof ElasticsearchException) {
941+
// gh#89802 treat ClusterBlockException is retry-able
942+
if (unwrappedException instanceof ClusterBlockException == false && unwrappedException instanceof ElasticsearchException) {
941943
ElasticsearchException elasticsearchException = (ElasticsearchException) unwrappedException;
942944
if (ExceptionRootCauseFinder.IRRECOVERABLE_REST_STATUSES.contains(elasticsearchException.status())) {
943945
failIndexer("task encountered irrecoverable failure: " + elasticsearchException.getDetailedMessage());

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.xpack.transform.transforms;
99

1010
import org.apache.lucene.search.TotalHits;
11+
import org.elasticsearch.ElasticsearchException;
1112
import org.elasticsearch.ElasticsearchParseException;
1213
import org.elasticsearch.ElasticsearchTimeoutException;
1314
import org.elasticsearch.action.ActionListener;
@@ -20,11 +21,14 @@
2021
import org.elasticsearch.action.search.SearchResponse;
2122
import org.elasticsearch.action.search.ShardSearchFailure;
2223
import org.elasticsearch.client.Client;
24+
import org.elasticsearch.cluster.block.ClusterBlockException;
25+
import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
2326
import org.elasticsearch.common.breaker.CircuitBreaker.Durability;
2427
import org.elasticsearch.common.breaker.CircuitBreakingException;
2528
import org.elasticsearch.core.TimeValue;
2629
import org.elasticsearch.index.reindex.BulkByScrollResponse;
2730
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
31+
import org.elasticsearch.rest.RestStatus;
2832
import org.elasticsearch.script.ScriptException;
2933
import org.elasticsearch.search.SearchHit;
3034
import org.elasticsearch.search.SearchHits;
@@ -520,6 +524,130 @@ public void testScriptError() throws Exception {
520524
);
521525
}
522526

527+
public void testDontFailForClusterBlockException() throws Exception {
528+
Integer pageSize = randomBoolean() ? null : randomIntBetween(500, 10_000);
529+
String transformId = randomAlphaOfLength(10);
530+
TransformConfig config = new TransformConfig(
531+
transformId,
532+
randomSourceConfig(),
533+
randomDestConfig(),
534+
null,
535+
null,
536+
null,
537+
randomPivotConfig(),
538+
null,
539+
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
540+
new SettingsConfig(pageSize, null, (Boolean) null, null, null),
541+
null,
542+
null,
543+
null,
544+
null
545+
);
546+
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
547+
Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> {
548+
throw new ClusterBlockException(
549+
Collections.singletonMap("test-index", Collections.singleton(MetadataIndexStateService.INDEX_CLOSED_BLOCK))
550+
);
551+
};
552+
553+
Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);
554+
555+
TransformAuditor auditor = MockTransformAuditor.createMockAuditor();
556+
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class));
557+
558+
MockedTransformIndexer indexer = createMockIndexer(
559+
config,
560+
state,
561+
searchFunction,
562+
bulkFunction,
563+
null,
564+
null,
565+
threadPool,
566+
ThreadPool.Names.GENERIC,
567+
auditor,
568+
context
569+
);
570+
final CountDownLatch latch = indexer.newLatch(1);
571+
indexer.start();
572+
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
573+
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
574+
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
575+
576+
latch.countDown();
577+
assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.MINUTES);
578+
assertThat(context.getFailureCount(), equalTo(1));
579+
}
580+
581+
public void testFailForNonRetryableElasticSearchException() throws Exception {
582+
Integer pageSize = randomBoolean() ? null : randomIntBetween(500, 10_000);
583+
String transformId = randomAlphaOfLength(10);
584+
TransformConfig config = new TransformConfig(
585+
transformId,
586+
randomSourceConfig(),
587+
randomDestConfig(),
588+
null,
589+
null,
590+
null,
591+
randomPivotConfig(),
592+
null,
593+
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
594+
new SettingsConfig(pageSize, null, (Boolean) null, null, null),
595+
null,
596+
null,
597+
null,
598+
null
599+
);
600+
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
601+
Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> {
602+
throw new ElasticsearchException("internal error") {
603+
@Override
604+
public RestStatus status() {
605+
return RestStatus.BAD_REQUEST;
606+
}
607+
};
608+
};
609+
610+
Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);
611+
612+
final AtomicBoolean failIndexerCalled = new AtomicBoolean(false);
613+
final AtomicReference<String> failureMessage = new AtomicReference<>();
614+
Consumer<String> failureConsumer = message -> {
615+
failIndexerCalled.compareAndSet(false, true);
616+
failureMessage.compareAndSet(null, message);
617+
};
618+
619+
MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor();
620+
TransformContext.Listener contextListener = mock(TransformContext.Listener.class);
621+
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener);
622+
623+
MockedTransformIndexer indexer = createMockIndexer(
624+
config,
625+
state,
626+
searchFunction,
627+
bulkFunction,
628+
null,
629+
failureConsumer,
630+
threadPool,
631+
ThreadPool.Names.GENERIC,
632+
auditor,
633+
context
634+
);
635+
636+
final CountDownLatch latch = indexer.newLatch(1);
637+
638+
indexer.start();
639+
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
640+
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
641+
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
642+
643+
latch.countDown();
644+
assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS);
645+
assertTrue(failIndexerCalled.get());
646+
verify(contextListener, times(1)).fail(matches("task encountered irrecoverable failure: .*: internal error"), any());
647+
648+
assertThat(failureMessage.get(), matchesRegex("task encountered irrecoverable failure: .*: internal error"));
649+
}
650+
523651
public void testRetentionPolicyDeleteByQueryThrowsIrrecoverable() throws Exception {
524652
String transformId = randomAlphaOfLength(10);
525653
TransformConfig config = new TransformConfig(

0 commit comments

Comments
 (0)