Skip to content

Commit 15107b7

Browse files
dan-rubinsteinelasticmachine
authored andcommitted
Retry on ClusterBlockException on transform destination index (elastic#118194) (elastic#118581)
* Retry on ClusterBlockException on transform destination index * Update docs/changelog/118194.yaml * Cleaning up tests * Fixing tests --------- Co-authored-by: Elastic Machine <[email protected]>
1 parent 9694602 commit 15107b7

File tree

3 files changed

+220
-61
lines changed

3 files changed

+220
-61
lines changed

docs/changelog/118194.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 118194
2+
summary: Retry on `ClusterBlockException` on transform destination index
3+
area: Machine Learning
4+
type: enhancement
5+
issues: []

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandler.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,14 @@ private void handleScriptException(ScriptException scriptException, boolean unat
169169
* @param numFailureRetries the number of configured retries
170170
*/
171171
private void handleBulkIndexingException(BulkIndexingException bulkIndexingException, boolean unattended, int numFailureRetries) {
172-
if (unattended == false && bulkIndexingException.isIrrecoverable()) {
172+
if (bulkIndexingException.getCause() instanceof ClusterBlockException) {
173+
retryWithoutIncrementingFailureCount(
174+
bulkIndexingException,
175+
bulkIndexingException.getDetailedMessage(),
176+
unattended,
177+
numFailureRetries
178+
);
179+
} else if (unattended == false && bulkIndexingException.isIrrecoverable()) {
173180
String message = TransformMessages.getMessage(
174181
TransformMessages.LOG_TRANSFORM_PIVOT_IRRECOVERABLE_BULK_INDEXING_ERROR,
175182
bulkIndexingException.getDetailedMessage()
@@ -232,12 +239,46 @@ private void retry(Throwable unwrappedException, String message, boolean unatten
232239
&& unwrappedException.getClass().equals(context.getLastFailure().getClass());
233240

234241
final int failureCount = context.incrementAndGetFailureCount(unwrappedException);
235-
236242
if (unattended == false && numFailureRetries != -1 && failureCount > numFailureRetries) {
237243
fail(unwrappedException, "task encountered more than " + numFailureRetries + " failures; latest failure: " + message);
238244
return;
239245
}
240246

247+
logRetry(unwrappedException, message, unattended, numFailureRetries, failureCount, repeatedFailure);
248+
}
249+
250+
/**
251+
* Terminate failure handling without incrementing the retries used
252+
* <p>
253+
* This is used when there is an ongoing recoverable issue and we want to retain
254+
* retries for any issues that may occur after the issue is resolved
255+
*
256+
* @param unwrappedException The exception caught
257+
* @param message error message to log/audit
258+
* @param unattended whether the transform runs in unattended mode
259+
* @param numFailureRetries the number of configured retries
260+
*/
261+
private void retryWithoutIncrementingFailureCount(
262+
Throwable unwrappedException,
263+
String message,
264+
boolean unattended,
265+
int numFailureRetries
266+
) {
267+
// group failures to decide whether to report it below
268+
final boolean repeatedFailure = context.getLastFailure() != null
269+
&& unwrappedException.getClass().equals(context.getLastFailure().getClass());
270+
271+
logRetry(unwrappedException, message, unattended, numFailureRetries, context.getFailureCount(), repeatedFailure);
272+
}
273+
274+
private void logRetry(
275+
Throwable unwrappedException,
276+
String message,
277+
boolean unattended,
278+
int numFailureRetries,
279+
int failureCount,
280+
boolean repeatedFailure
281+
) {
241282
// Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
242283
// times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
243284
// and if the number of retries is about to exceed

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandlerTests.java

Lines changed: 172 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
2323
import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor;
2424

25+
import java.util.List;
2526
import java.util.Map;
2627
import java.util.Set;
2728

@@ -63,9 +64,121 @@ public int getFailureCountChangedCounter() {
6364
}
6465
}
6566

66-
public void testUnattended() {
67+
public void testHandleIndexerFailure_CircuitBreakingExceptionNewPageSizeLessThanMinimumPageSize() {
68+
var e = new CircuitBreakingException(randomAlphaOfLength(10), 1, 0, randomFrom(CircuitBreaker.Durability.values()));
69+
assertRetryIfUnattendedOtherwiseFail(e);
70+
}
71+
72+
public void testHandleIndexerFailure_CircuitBreakingExceptionNewPageSizeNotLessThanMinimumPageSize() {
73+
var e = new CircuitBreakingException(randomAlphaOfLength(10), 1, 1, randomFrom(CircuitBreaker.Durability.values()));
74+
75+
List.of(true, false).forEach((unattended) -> { assertNoFailureAndContextPageSizeSet(e, unattended, 365); });
76+
}
77+
78+
public void testHandleIndexerFailure_ScriptException() {
79+
var e = new ScriptException(
80+
randomAlphaOfLength(10),
81+
new ArithmeticException(randomAlphaOfLength(10)),
82+
singletonList(randomAlphaOfLength(10)),
83+
randomAlphaOfLength(10),
84+
randomAlphaOfLength(10)
85+
);
86+
assertRetryIfUnattendedOtherwiseFail(e);
87+
}
88+
89+
public void testHandleIndexerFailure_BulkIndexExceptionWrappingClusterBlockException() {
90+
final BulkIndexingException bulkIndexingException = new BulkIndexingException(
91+
randomAlphaOfLength(10),
92+
new ClusterBlockException(Map.of("test-index", Set.of(MetadataIndexStateService.INDEX_CLOSED_BLOCK))),
93+
randomBoolean()
94+
);
95+
96+
List.of(true, false).forEach((unattended) -> { assertRetryFailureCountNotIncremented(bulkIndexingException, unattended); });
97+
}
98+
99+
public void testHandleIndexerFailure_IrrecoverableBulkIndexException() {
100+
final BulkIndexingException e = new BulkIndexingException(
101+
randomAlphaOfLength(10),
102+
new ElasticsearchStatusException(randomAlphaOfLength(10), RestStatus.INTERNAL_SERVER_ERROR),
103+
true
104+
);
105+
assertRetryIfUnattendedOtherwiseFail(e);
106+
}
107+
108+
public void testHandleIndexerFailure_RecoverableBulkIndexException() {
109+
final BulkIndexingException bulkIndexingException = new BulkIndexingException(
110+
randomAlphaOfLength(10),
111+
new ElasticsearchStatusException(randomAlphaOfLength(10), RestStatus.INTERNAL_SERVER_ERROR),
112+
false
113+
);
114+
115+
List.of(true, false).forEach((unattended) -> { assertRetry(bulkIndexingException, unattended); });
116+
}
117+
118+
public void testHandleIndexerFailure_ClusterBlockException() {
119+
List.of(true, false).forEach((unattended) -> {
120+
assertRetry(
121+
new ClusterBlockException(Map.of(randomAlphaOfLength(10), Set.of(MetadataIndexStateService.INDEX_CLOSED_BLOCK))),
122+
unattended
123+
);
124+
});
125+
}
126+
127+
public void testHandleIndexerFailure_SearchPhaseExecutionExceptionWithNoShardSearchFailures() {
128+
List.of(true, false).forEach((unattended) -> {
129+
assertRetry(
130+
new SearchPhaseExecutionException(randomAlphaOfLength(10), randomAlphaOfLength(10), ShardSearchFailure.EMPTY_ARRAY),
131+
unattended
132+
);
133+
});
134+
}
135+
136+
public void testHandleIndexerFailure_SearchPhaseExecutionExceptionWithShardSearchFailures() {
137+
List.of(true, false).forEach((unattended) -> {
138+
assertRetry(
139+
new SearchPhaseExecutionException(
140+
randomAlphaOfLength(10),
141+
randomAlphaOfLength(10),
142+
new ShardSearchFailure[] { new ShardSearchFailure(new Exception()) }
143+
),
144+
unattended
145+
);
146+
});
147+
}
148+
149+
public void testHandleIndexerFailure_RecoverableElasticsearchException() {
150+
List.of(true, false).forEach((unattended) -> {
151+
assertRetry(new ElasticsearchStatusException(randomAlphaOfLength(10), RestStatus.INTERNAL_SERVER_ERROR), unattended);
152+
});
153+
}
154+
155+
public void testHandleIndexerFailure_IrrecoverableElasticsearchException() {
156+
var e = new ElasticsearchStatusException(randomAlphaOfLength(10), RestStatus.NOT_FOUND);
157+
assertRetryIfUnattendedOtherwiseFail(e);
158+
}
159+
160+
public void testHandleIndexerFailure_IllegalArgumentException() {
161+
var e = new IllegalArgumentException(randomAlphaOfLength(10));
162+
assertRetryIfUnattendedOtherwiseFail(e);
163+
}
164+
165+
public void testHandleIndexerFailure_UnexpectedException() {
166+
List.of(true, false).forEach((unattended) -> { assertRetry(new Exception(), unattended); });
167+
}
168+
169+
private void assertRetryIfUnattendedOtherwiseFail(Exception e) {
170+
List.of(true, false).forEach((unattended) -> {
171+
if (unattended) {
172+
assertRetry(e, unattended);
173+
} else {
174+
assertFailure(e);
175+
}
176+
});
177+
}
178+
179+
private void assertRetry(Exception e, boolean unattended) {
67180
String transformId = randomAlphaOfLength(10);
68-
SettingsConfig settings = new SettingsConfig.Builder().setUnattended(true).build();
181+
SettingsConfig settings = new SettingsConfig.Builder().setNumFailureRetries(2).setUnattended(unattended).build();
69182

70183
MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor();
71184
MockTransformContextListener contextListener = new MockTransformContextListener();
@@ -74,51 +187,33 @@ public void testUnattended() {
74187

75188
TransformFailureHandler handler = new TransformFailureHandler(auditor, context, transformId);
76189

77-
handler.handleIndexerFailure(
78-
new SearchPhaseExecutionException(
79-
"query",
80-
"Partial shards failure",
81-
new ShardSearchFailure[] {
82-
new ShardSearchFailure(new CircuitBreakingException("to much memory", 110, 100, CircuitBreaker.Durability.TRANSIENT)) }
83-
),
84-
settings
85-
);
190+
assertNoFailure(handler, e, contextListener, settings, true);
191+
assertNoFailure(handler, e, contextListener, settings, true);
192+
if (unattended) {
193+
assertNoFailure(handler, e, contextListener, settings, true);
194+
} else {
195+
// fail after max retry attempts reached
196+
assertFailure(handler, e, contextListener, settings, true);
197+
}
198+
}
86199

87-
// CBE isn't a failure, but it only affects page size(which we don't test here)
88-
assertFalse(contextListener.getFailed());
89-
assertEquals(0, contextListener.getFailureCountChangedCounter());
200+
private void assertRetryFailureCountNotIncremented(Exception e, boolean unattended) {
201+
String transformId = randomAlphaOfLength(10);
202+
SettingsConfig settings = new SettingsConfig.Builder().setNumFailureRetries(2).setUnattended(unattended).build();
90203

91-
assertNoFailure(
92-
handler,
93-
new SearchPhaseExecutionException(
94-
"query",
95-
"Partial shards failure",
96-
new ShardSearchFailure[] {
97-
new ShardSearchFailure(
98-
new ScriptException(
99-
"runtime error",
100-
new ArithmeticException("/ by zero"),
101-
singletonList("stack"),
102-
"test",
103-
"painless"
104-
)
105-
) }
106-
),
107-
contextListener,
108-
settings
109-
);
110-
assertNoFailure(
111-
handler,
112-
new ElasticsearchStatusException("something really bad happened", RestStatus.INTERNAL_SERVER_ERROR),
113-
contextListener,
114-
settings
115-
);
116-
assertNoFailure(handler, new IllegalArgumentException("expected apples not oranges"), contextListener, settings);
117-
assertNoFailure(handler, new RuntimeException("the s*** hit the fan"), contextListener, settings);
118-
assertNoFailure(handler, new NullPointerException("NPE"), contextListener, settings);
204+
MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor();
205+
MockTransformContextListener contextListener = new MockTransformContextListener();
206+
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener);
207+
context.setPageSize(500);
208+
209+
TransformFailureHandler handler = new TransformFailureHandler(auditor, context, transformId);
210+
211+
assertNoFailure(handler, e, contextListener, settings, false);
212+
assertNoFailure(handler, e, contextListener, settings, false);
213+
assertNoFailure(handler, e, contextListener, settings, false);
119214
}
120215

121-
public void testClusterBlock() {
216+
private void assertFailure(Exception e) {
122217
String transformId = randomAlphaOfLength(10);
123218
SettingsConfig settings = new SettingsConfig.Builder().setNumFailureRetries(2).build();
124219

@@ -129,32 +224,50 @@ public void testClusterBlock() {
129224

130225
TransformFailureHandler handler = new TransformFailureHandler(auditor, context, transformId);
131226

132-
final ClusterBlockException clusterBlock = new ClusterBlockException(
133-
Map.of("test-index", Set.of(MetadataIndexStateService.INDEX_CLOSED_BLOCK))
134-
);
227+
assertFailure(handler, e, contextListener, settings, false);
228+
}
135229

136-
handler.handleIndexerFailure(clusterBlock, settings);
137-
assertFalse(contextListener.getFailed());
138-
assertEquals(1, contextListener.getFailureCountChangedCounter());
230+
private void assertNoFailure(
231+
TransformFailureHandler handler,
232+
Exception e,
233+
MockTransformContextListener mockTransformContextListener,
234+
SettingsConfig settings,
235+
boolean failureCountIncremented
236+
) {
237+
handler.handleIndexerFailure(e, settings);
238+
assertFalse(mockTransformContextListener.getFailed());
239+
assertEquals(failureCountIncremented ? 1 : 0, mockTransformContextListener.getFailureCountChangedCounter());
240+
mockTransformContextListener.reset();
241+
}
139242

140-
handler.handleIndexerFailure(clusterBlock, settings);
141-
assertFalse(contextListener.getFailed());
142-
assertEquals(2, contextListener.getFailureCountChangedCounter());
243+
private void assertNoFailureAndContextPageSizeSet(Exception e, boolean unattended, int newPageSize) {
244+
String transformId = randomAlphaOfLength(10);
245+
SettingsConfig settings = new SettingsConfig.Builder().setNumFailureRetries(2).setUnattended(unattended).build();
143246

144-
handler.handleIndexerFailure(clusterBlock, settings);
145-
assertTrue(contextListener.getFailed());
146-
assertEquals(3, contextListener.getFailureCountChangedCounter());
247+
MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor();
248+
MockTransformContextListener contextListener = new MockTransformContextListener();
249+
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener);
250+
context.setPageSize(500);
251+
252+
TransformFailureHandler handler = new TransformFailureHandler(auditor, context, transformId);
253+
254+
handler.handleIndexerFailure(e, settings);
255+
assertFalse(contextListener.getFailed());
256+
assertEquals(0, contextListener.getFailureCountChangedCounter());
257+
assertEquals(newPageSize, context.getPageSize());
258+
contextListener.reset();
147259
}
148260

149-
private void assertNoFailure(
261+
private void assertFailure(
150262
TransformFailureHandler handler,
151263
Exception e,
152264
MockTransformContextListener mockTransformContextListener,
153-
SettingsConfig settings
265+
SettingsConfig settings,
266+
boolean failureCountChanged
154267
) {
155268
handler.handleIndexerFailure(e, settings);
156-
assertFalse(mockTransformContextListener.getFailed());
157-
assertEquals(1, mockTransformContextListener.getFailureCountChangedCounter());
269+
assertTrue(mockTransformContextListener.getFailed());
270+
assertEquals(failureCountChanged ? 1 : 0, mockTransformContextListener.getFailureCountChangedCounter());
158271
mockTransformContextListener.reset();
159272
}
160273

0 commit comments

Comments
 (0)