Skip to content

Commit 31cab3e

Browse files
authored
[Transform] Wait for PIT to close (#134955) (#135002)
Transforms will now continue after PIT is closed, rather than sending the PIT request and not blocking. Resolve #134925
1 parent ede6f66 commit 31cab3e

File tree

5 files changed

+63
-21
lines changed

5 files changed

+63
-21
lines changed

docs/changelog/134955.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 134955
2+
summary: Prevent Transform from queuing too many PIT close requests by waiting for PIT to close before finishing the checkpoint
3+
area: Transform
4+
type: bug
5+
issues:
6+
- 134925

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -280,10 +280,7 @@ private boolean startJob() {
280280
*/
281281
private void finishJob() {
282282
isJobFinishing.set(true);
283-
doSaveState(finishAndSetState(), position.get(), () -> {
284-
afterFinishOrFailure();
285-
isJobFinishing.set(false);
286-
});
283+
doSaveState(finishAndSetState(), position.get(), this::afterFinishOrFailure);
287284
}
288285

289286
private <T> ActionListener<T> finishJobListener() {
@@ -434,7 +431,9 @@ protected float getMaxDocsPerSecond() {
434431
* This will be called before the internal state changes from {@link IndexerState#INDEXING} to {@link IndexerState#STARTED} or
435432
* from {@link IndexerState#STOPPING} to {@link IndexerState#STOPPED}.
436433
*/
437-
protected void afterFinishOrFailure() {}
434+
protected void afterFinishOrFailure() {
435+
isJobFinishing.set(false);
436+
}
438437

439438
/**
440439
* Called when the indexer is stopped. This is only called when the indexer is stopped

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.action.search.TransportClosePointInTimeAction;
2626
import org.elasticsearch.action.search.TransportOpenPointInTimeAction;
2727
import org.elasticsearch.action.search.TransportSearchAction;
28+
import org.elasticsearch.action.support.CountDownActionListener;
2829
import org.elasticsearch.action.support.IndicesOptions;
2930
import org.elasticsearch.action.support.master.AcknowledgedRequest;
3031
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
@@ -70,9 +71,11 @@
7071
import org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil;
7172
import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder;
7273

74+
import java.util.ArrayList;
7375
import java.util.LinkedHashMap;
7476
import java.util.Map;
7577
import java.util.Map.Entry;
78+
import java.util.Objects;
7679
import java.util.concurrent.ConcurrentHashMap;
7780
import java.util.concurrent.atomic.AtomicBoolean;
7881
import java.util.concurrent.atomic.AtomicReference;
@@ -154,9 +157,13 @@ protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse>
154157
}
155158

156159
if (getNextCheckpoint().getCheckpoint() != pitCheckpoint) {
157-
closePointInTime();
160+
closePointInTime(() -> doNextSearch(nextPhase));
161+
} else {
162+
doNextSearch(nextPhase);
158163
}
164+
}
159165

166+
private void doNextSearch(ActionListener<SearchResponse> nextPhase) {
160167
injectPointInTimeIfNeeded(
161168
buildSearchRequest(),
162169
ActionListener.wrap(searchRequest -> doSearch(searchRequest, nextPhase), nextPhase::onFailure)
@@ -435,8 +442,7 @@ SeqNoPrimaryTermAndIndex getSeqNoPrimaryTermAndIndex() {
435442

436443
@Override
437444
protected void afterFinishOrFailure() {
438-
closePointInTime();
439-
super.afterFinishOrFailure();
445+
closePointInTime(super::afterFinishOrFailure);
440446
}
441447

442448
@Override
@@ -477,23 +483,34 @@ private boolean destinationIndexHasWriteBlock() {
477483

478484
@Override
479485
protected void onStop() {
480-
closePointInTime();
481-
super.onStop();
486+
closePointInTime(super::onStop);
482487
}
483488

484-
private void closePointInTime() {
485-
for (String name : namedPits.keySet()) {
486-
closePointInTime(name);
489+
// visible for testing
490+
void closePointInTime(Runnable runAfter) {
491+
// we shouldn't need to do this, because a transform is only ever running on one thread anyway, but now that we're waiting for
492+
// N PIT contexts to close, we want to make sure that the number N doesn't change in the underlying data structure so we can
493+
// guarantee that we call runAfter
494+
var pitEntries = new ArrayList<Entry<String, PointInTimeBuilder>>(namedPits.size());
495+
var iter = namedPits.entrySet().iterator();
496+
while (iter.hasNext()) {
497+
pitEntries.add(iter.next());
498+
iter.remove();
487499
}
488-
}
489500

490-
private void closePointInTime(String name) {
491-
PointInTimeBuilder pit = namedPits.remove(name);
492-
493-
if (pit == null) {
494-
return;
501+
if (pitEntries.isEmpty()) {
502+
runAfter.run();
503+
} else {
504+
var countDownActionListener = new CountDownActionListener(pitEntries.size(), ActionListener.running(runAfter));
505+
pitEntries.stream()
506+
.map(Entry::getValue)
507+
.filter(Objects::nonNull)
508+
.forEach(pit -> closePointInTime(pit, countDownActionListener));
495509
}
496510

511+
}
512+
513+
private void closePointInTime(PointInTimeBuilder pit, ActionListener<Void> listener) {
497514
BytesReference oldPit = pit.getEncodedId();
498515

499516
ClosePointInTimeRequest closePitRequest = new ClosePointInTimeRequest(oldPit);
@@ -503,12 +520,12 @@ private void closePointInTime(String name) {
503520
client,
504521
TransportClosePointInTimeAction.TYPE,
505522
closePitRequest,
506-
ActionListener.wrap(response -> {
523+
ActionListener.runAfter(ActionListener.wrap(response -> {
507524
logger.trace("[{}] closed pit search context [{}]", getJobId(), oldPit);
508525
}, e -> {
509526
// note: closing the pit should never throw, even if the pit is invalid
510527
logger.error(() -> "[" + getJobId() + "] Failed to close point in time reader", e);
511-
})
528+
}), () -> listener.onResponse(null))
512529
);
513530
}
514531

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,7 @@ private void finalizeCheckpoint(ActionListener<Void> listener) {
628628
@Override
629629
protected void afterFinishOrFailure() {
630630
finishIndexerThreadShutdown();
631+
super.afterFinishOrFailure();
631632
}
632633

633634
@Override

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,17 +74,20 @@
7474

7575
import java.time.Clock;
7676
import java.time.Instant;
77+
import java.util.ArrayList;
7778
import java.util.Collections;
7879
import java.util.List;
7980
import java.util.concurrent.CountDownLatch;
8081
import java.util.concurrent.ExecutorService;
8182
import java.util.concurrent.TimeUnit;
8283
import java.util.concurrent.atomic.AtomicBoolean;
84+
import java.util.concurrent.atomic.AtomicInteger;
8385
import java.util.concurrent.atomic.AtomicLong;
8486
import java.util.concurrent.atomic.AtomicReference;
8587
import java.util.function.Consumer;
8688
import java.util.stream.IntStream;
8789

90+
import static org.hamcrest.Matchers.equalTo;
8891
import static org.mockito.ArgumentMatchers.any;
8992
import static org.mockito.ArgumentMatchers.anyBoolean;
9093
import static org.mockito.ArgumentMatchers.anyString;
@@ -219,6 +222,16 @@ public void testPitInjection() throws InterruptedException {
219222
assertEquals(new BytesArray("the_pit_id+"), response.pointInTimeId());
220223
});
221224

225+
var paceCounter = new AtomicInteger(0);
226+
client.addBeforeCloseListener(() -> assertThat(paceCounter.getAndIncrement(), equalTo(0)));
227+
indexer.closePointInTime(() -> assertThat(paceCounter.getAndIncrement(), equalTo(1)));
228+
assertThat(paceCounter.get(), equalTo(2));
229+
assertEquals(0L, client.getPitContextCounter());
230+
231+
this.<SearchResponse>assertAsync(listener -> indexer.doNextSearch(0, listener), response -> {
232+
assertEquals(new BytesArray("the_pit_id+"), response.pointInTimeId());
233+
});
234+
222235
this.<SearchResponse>assertAsync(listener -> indexer.doNextSearch(0, listener), response -> {
223236
assertEquals(new BytesArray("the_pit_id++"), response.pointInTimeId());
224237
});
@@ -586,12 +599,17 @@ protected Tuple<String, SearchRequest> buildSearchRequest() {
586599
private static class PitMockClient extends NoOpClient {
587600
private final boolean pitSupported;
588601
private AtomicLong pitContextCounter = new AtomicLong();
602+
private List<Runnable> beforeCloseListeners = new ArrayList<>();
589603

590604
PitMockClient(ThreadPool threadPool, boolean pitSupported) {
591605
super(threadPool);
592606
this.pitSupported = pitSupported;
593607
}
594608

609+
public void addBeforeCloseListener(Runnable listener) {
610+
this.beforeCloseListeners.add(listener);
611+
}
612+
595613
public long getPitContextCounter() {
596614
return pitContextCounter.get();
597615
}
@@ -613,6 +631,7 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
613631
}
614632
return;
615633
} else if (request instanceof ClosePointInTimeRequest) {
634+
beforeCloseListeners.forEach(Runnable::run);
616635
ClosePointInTimeResponse response = new ClosePointInTimeResponse(true, 1);
617636
assert pitContextCounter.get() > 0;
618637
pitContextCounter.decrementAndGet();

0 commit comments

Comments
 (0)