Skip to content

Commit d361eec

Browse files
authored
[Transform] Fail checkpoint on missing clusters (#106793) (#106842)
When there are no remote or local clusters for a given source index, we call the listener's `onFailure` method with a `CheckpointException`. A running transform will fail and retry, eventually moving into an unhealthy and failed state. Any call to the stats API will note the checkpoint failure and return. This fixes a timeout issue calling the Transform stats API and prevents the Transform from being stuck in indexing. Fix #106790 Fix #104533
1 parent 566aea1 commit d361eec

File tree

4 files changed

+94
-28
lines changed

4 files changed

+94
-28
lines changed

docs/changelog/106793.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pr: 106793
2+
summary: Fail checkpoint on missing clusters
3+
area: Transform
4+
type: bug
5+
issues:
6+
- 104533
7+
- 106790

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/CheckpointException.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010
import org.elasticsearch.ElasticsearchException;
1111

1212
class CheckpointException extends ElasticsearchException {
13+
CheckpointException(String msg, Object... params) {
14+
super(msg, params);
15+
}
16+
1317
CheckpointException(String msg, Throwable cause, Object... params) {
1418
super(msg, cause, params);
1519
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,12 @@ protected void getIndexCheckpoints(TimeValue timeout, ActionListener<Map<String,
116116
ResolvedIndices resolvedIndexes = remoteClusterResolver.resolve(transformConfig.getSource().getIndex());
117117
ActionListener<Map<String, long[]>> groupedListener = listener;
118118

119+
if (resolvedIndexes.numClusters() == 0) {
120+
var indices = String.join(",", transformConfig.getSource().getIndex());
121+
listener.onFailure(new CheckpointException("No clusters exist for [{}]", indices));
122+
return;
123+
}
124+
119125
if (resolvedIndexes.numClusters() > 1) {
120126
ActionListener<Collection<Map<String, long[]>>> mergeMapsListener = ActionListener.wrap(indexCheckpoints -> {
121127
listener.onResponse(
@@ -228,10 +234,7 @@ private static void getCheckpointsFromOneClusterV2(
228234
);
229235
ActionListener<GetCheckpointAction.Response> checkpointListener;
230236
if (RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY.equals(cluster)) {
231-
checkpointListener = ActionListener.wrap(
232-
checkpointResponse -> listener.onResponse(checkpointResponse.getCheckpoints()),
233-
listener::onFailure
234-
);
237+
checkpointListener = listener.safeMap(GetCheckpointAction.Response::getCheckpoints);
235238
} else {
236239
checkpointListener = ActionListener.wrap(
237240
checkpointResponse -> listener.onResponse(
@@ -395,12 +398,12 @@ public void getCheckpointingInfo(
395398

396399
long timestamp = clock.millis();
397400

398-
getIndexCheckpoints(timeout, ActionListener.wrap(checkpointsByIndex -> {
401+
getIndexCheckpoints(timeout, listener.delegateFailure((l, checkpointsByIndex) -> {
399402
TransformCheckpoint sourceCheckpoint = new TransformCheckpoint(transformConfig.getId(), timestamp, -1L, checkpointsByIndex, 0L);
400403
checkpointingInfoBuilder.setSourceCheckpoint(sourceCheckpoint);
401404
checkpointingInfoBuilder.setOperationsBehind(TransformCheckpoint.getBehind(lastCheckpoint, sourceCheckpoint));
402-
listener.onResponse(checkpointingInfoBuilder);
403-
}, listener::onFailure));
405+
l.onResponse(checkpointingInfoBuilder);
406+
}));
404407
}
405408

406409
@Override

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java

Lines changed: 73 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.test.MockLogAppender;
3232
import org.elasticsearch.test.MockLogAppender.LoggingExpectation;
3333
import org.elasticsearch.threadpool.ThreadPool;
34+
import org.elasticsearch.transport.ActionNotFoundTransportException;
3435
import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction;
3536
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
3637
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
@@ -67,7 +68,7 @@
6768

6869
public class DefaultCheckpointProviderTests extends ESTestCase {
6970

70-
private static Logger checkpointProviderLogger = LogManager.getLogger(DefaultCheckpointProvider.class);
71+
private static final Logger checkpointProviderLogger = LogManager.getLogger(DefaultCheckpointProvider.class);
7172

7273
private Clock clock;
7374
private Client client;
@@ -96,7 +97,7 @@ public void setUpMocks() {
9697
transformAuditor = MockTransformAuditor.createMockAuditor();
9798
}
9899

99-
public void testReportSourceIndexChangesRunsEmpty() throws Exception {
100+
public void testReportSourceIndexChangesRunsEmpty() {
100101
String transformId = getTestName();
101102
TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId);
102103
DefaultCheckpointProvider provider = newCheckpointProvider(transformConfig);
@@ -138,7 +139,7 @@ public void testReportSourceIndexChangesRunsEmpty() throws Exception {
138139
);
139140
}
140141

141-
public void testReportSourceIndexChangesAddDelete() throws Exception {
142+
public void testReportSourceIndexChangesAddDelete() {
142143
String transformId = getTestName();
143144
TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId);
144145
DefaultCheckpointProvider provider = newCheckpointProvider(transformConfig);
@@ -197,7 +198,7 @@ public void testReportSourceIndexChangesAddDelete() throws Exception {
197198
);
198199
}
199200

200-
public void testReportSourceIndexChangesAddDeleteMany() throws Exception {
201+
public void testReportSourceIndexChangesAddDeleteMany() {
201202
String transformId = getTestName();
202203
TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId);
203204
DefaultCheckpointProvider provider = newCheckpointProvider(transformConfig);
@@ -231,29 +232,88 @@ public void testReportSourceIndexChangesAddDeleteMany() throws Exception {
231232
}
232233

233234
public void testHandlingShardFailures() throws Exception {
234-
String transformId = getTestName();
235-
String indexName = "some-index";
235+
var transformId = getTestName();
236+
var indexName = "some-index";
236237
TransformConfig transformConfig = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig(transformId)).setSource(
237238
new SourceConfig(indexName)
238239
).build();
239240

240-
RemoteClusterResolver remoteClusterResolver = mock(RemoteClusterResolver.class);
241+
var remoteClusterResolver = mock(RemoteClusterResolver.class);
241242
doReturn(new RemoteClusterResolver.ResolvedIndices(Collections.emptyMap(), Collections.singletonList(indexName))).when(
242243
remoteClusterResolver
243244
).resolve(transformConfig.getSource().getIndex());
244245

246+
mockGetIndexResponse(indexName);
247+
mockIndicesStatsResponse(indexName);
248+
mockGetCheckpointAction();
249+
250+
var provider = new DefaultCheckpointProvider(
251+
clock,
252+
parentTaskClient,
253+
remoteClusterResolver,
254+
transformConfigManager,
255+
transformAuditor,
256+
transformConfig
257+
);
258+
259+
var latch = new CountDownLatch(1);
260+
provider.createNextCheckpoint(
261+
null,
262+
new LatchedActionListener<>(
263+
ActionListener.wrap(
264+
response -> fail("This test case must fail"),
265+
e -> assertThat(
266+
e.getMessage(),
267+
startsWith(
268+
"Source has [7] failed shards, first shard failure: [some-index][3] failed, "
269+
+ "reason [java.lang.Exception: something's wrong"
270+
)
271+
)
272+
),
273+
latch
274+
)
275+
);
276+
assertTrue(latch.await(1, TimeUnit.MILLISECONDS));
277+
}
278+
279+
private void mockGetIndexResponse(String indexName) {
245280
GetIndexResponse getIndexResponse = new GetIndexResponse(new String[] { indexName }, null, null, null, null, null);
246281
doAnswer(withResponse(getIndexResponse)).when(client).execute(eq(GetIndexAction.INSTANCE), any(), any());
282+
}
247283

284+
private void mockIndicesStatsResponse(String indexName) {
248285
IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class);
249286
doReturn(7).when(indicesStatsResponse).getFailedShards();
250287
doReturn(
251288
new DefaultShardOperationFailedException[] {
252289
new DefaultShardOperationFailedException(indexName, 3, new Exception("something's wrong")) }
253290
).when(indicesStatsResponse).getShardFailures();
254291
doAnswer(withResponse(indicesStatsResponse)).when(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());
292+
}
255293

256-
DefaultCheckpointProvider provider = new DefaultCheckpointProvider(
294+
private void mockGetCheckpointAction() {
295+
doAnswer(invocationOnMock -> {
296+
ActionListener<?> listener = invocationOnMock.getArgument(2);
297+
listener.onFailure(new ActionNotFoundTransportException("This should fail."));
298+
return null;
299+
}).when(client).execute(eq(GetCheckpointAction.INSTANCE), any(), any());
300+
}
301+
302+
public void testHandlingNoClusters() throws Exception {
303+
var transformId = getTestName();
304+
var indexName = "some-missing-index";
305+
var transformConfig = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig(transformId)).setSource(
306+
new SourceConfig(indexName)
307+
).build();
308+
309+
var remoteClusterResolver = mock(RemoteClusterResolver.class);
310+
doReturn(new RemoteClusterResolver.ResolvedIndices(Map.of(), List.of())).when(remoteClusterResolver)
311+
.resolve(transformConfig.getSource().getIndex());
312+
313+
mockGetIndexResponse(indexName);
314+
mockIndicesStatsResponse(indexName);
315+
316+
var provider = new DefaultCheckpointProvider(
257317
clock,
258318
parentTaskClient,
259319
remoteClusterResolver,
@@ -262,24 +322,18 @@ public void testHandlingShardFailures() throws Exception {
262322
transformConfig
263323
);
264324

265-
CountDownLatch latch = new CountDownLatch(1);
325+
var latch = new CountDownLatch(1);
266326
provider.createNextCheckpoint(
267327
null,
268328
new LatchedActionListener<>(
269329
ActionListener.wrap(
270330
response -> fail("This test case must fail"),
271-
e -> assertThat(
272-
e.getMessage(),
273-
startsWith(
274-
"Source has [7] failed shards, first shard failure: [some-index][3] failed, "
275-
+ "reason [java.lang.Exception: something's wrong"
276-
)
277-
)
331+
e -> assertThat(e.getMessage(), equalTo("No clusters exist for [some-missing-index]"))
278332
),
279333
latch
280334
)
281335
);
282-
latch.await(10, TimeUnit.SECONDS);
336+
assertTrue(latch.await(1, TimeUnit.MILLISECONDS));
283337
}
284338

285339
public void testSourceHasChanged() throws InterruptedException {
@@ -407,8 +461,7 @@ private DefaultCheckpointProvider newCheckpointProvider(TransformConfig transfor
407461
);
408462
}
409463

410-
private void assertExpectation(LoggingExpectation loggingExpectation, AuditExpectation auditExpectation, Runnable codeBlock)
411-
throws IllegalAccessException {
464+
private void assertExpectation(LoggingExpectation loggingExpectation, AuditExpectation auditExpectation, Runnable codeBlock) {
412465
MockLogAppender mockLogAppender = new MockLogAppender();
413466
mockLogAppender.start();
414467

@@ -429,10 +482,9 @@ private void assertExpectation(LoggingExpectation loggingExpectation, AuditExpec
429482
}
430483
}
431484

432-
@SuppressWarnings("unchecked")
433485
private static <Response> Answer<Response> withResponse(Response response) {
434486
return invocationOnMock -> {
435-
ActionListener<Response> listener = (ActionListener<Response>) invocationOnMock.getArguments()[2];
487+
ActionListener<Response> listener = invocationOnMock.getArgument(2);
436488
listener.onResponse(response);
437489
return null;
438490
};

0 commit comments

Comments
 (0)