Skip to content

Commit fc9810d

Browse files
authored
[Transform] Close PIT on abort (#143889)
When we force stop a transform, onAbort is eventually called to immediately halt the progress of the checkpoint. onAbort now closes the now unused PIT requests. This avoids the keepalive timeout and lets Elasticsearch release the held segments. This largely only impacts our integration tests that will not wait for that 30 seconds.
1 parent ca050eb commit fc9810d

File tree

2 files changed

+29
-0
lines changed

2 files changed

+29
-0
lines changed

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,11 @@ protected void onStop() {
486486
closePointInTime(super::onStop);
487487
}
488488

489+
@Override
490+
protected void onAbort() {
491+
closePointInTime(super::onAbort);
492+
}
493+
489494
// visible for testing
490495
void closePointInTime(Runnable runAfter) {
491496
// we shouldn't need to do this, because a transform is only ever running on one thread anyway, but now that we're waiting for

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,30 @@ public void testPitInjection() throws InterruptedException {
218218
}
219219
}
220220

221+
public void testPitClosedOnAbort() throws InterruptedException {
222+
TransformConfig config = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig()).setSettings(
223+
new SettingsConfig.Builder().setUsePit(true).build()
224+
).build();
225+
226+
try (var threadPool = createThreadPool()) {
227+
final var client = new PitMockClient(threadPool, true);
228+
MockClientTransformIndexer indexer = createMockIndexerForPitTest(client, config);
229+
230+
this.<SearchResponse>assertAsync(listener -> indexer.doNextSearch(0, listener), response -> {
231+
assertThat(response.pointInTimeId(), equalBytes(new BytesArray("the_pit_id+")));
232+
});
233+
234+
assertEquals(1L, client.getPitContextCounter());
235+
236+
indexer.onAbort();
237+
assertEquals(0L, client.getPitContextCounter());
238+
239+
// calling onAbort again should be a no-op
240+
indexer.onAbort();
241+
assertEquals(0L, client.getPitContextCounter());
242+
}
243+
}
244+
221245
public void testPitInjectionIfPitNotSupported() throws InterruptedException {
222246
// pit must be enabled, otherwise take a random config
223247
TransformConfig config = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig()).setSettings(

0 commit comments

Comments
 (0)