Skip to content

Commit 68b3ee8

Browse files
authored
[Transform] Reset max page size to settings value (#109532)
When a circuit breaker exception causes the transform checkpoint to fail, the transform will reduce the max page size to the value specified by the circuit breaker. Transform will retry the checkpoint until the search succeeds. When the search succeeds, the Transform will now reset the max search size to the value specified in the Transform settings rather than the now-deprecated value in the Latest/Pivot config. Fix #109308
1 parent 20750e9 commit 68b3ee8

File tree

3 files changed

+183
-4
lines changed

3 files changed

+183
-4
lines changed

docs/changelog/109532.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 109532
2+
summary: Reset max page size to settings value
3+
area: Transform
4+
type: bug
5+
issues:
6+
- 109308

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
4646
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
4747
import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper;
48+
import org.elasticsearch.xpack.transform.Transform;
4849
import org.elasticsearch.xpack.transform.TransformServices;
4950
import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
5051
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
@@ -543,7 +544,7 @@ private void executeRetentionPolicy(ActionListener<Void> listener) {
543544
private void finalizeCheckpoint(ActionListener<Void> listener) {
544545
try {
545546
// reset the page size, so we do not memorize a low page size forever
546-
context.setPageSize(function.getInitialPageSize());
547+
resetPageSize();
547548
// reset the changed bucket to free memory
548549
if (changeCollector != null) {
549550
changeCollector.clear();
@@ -1221,12 +1222,17 @@ private RunState determineRunStateAtStart() {
12211222

12221223
private void configurePageSize(Integer newPageSize) {
12231224
initialConfiguredPageSize = newPageSize;
1225+
resetPageSize();
1226+
}
12241227

1225-
// if the user explicitly set a page size, take it from the config, otherwise let the function decide
1228+
private void resetPageSize() {
12261229
if (initialConfiguredPageSize != null && initialConfiguredPageSize > 0) {
12271230
context.setPageSize(initialConfiguredPageSize);
1228-
} else {
1231+
} else if (function != null) {
12291232
context.setPageSize(function.getInitialPageSize());
1233+
} else {
1234+
// we should never be in a state where both initialConfiguredPageSize and function are null, but just in case...
1235+
context.setPageSize(Transform.DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE);
12301236
}
12311237
}
12321238

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java

Lines changed: 168 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import org.elasticsearch.action.search.ShardSearchFailure;
2020
import org.elasticsearch.action.support.ActionTestUtils;
2121
import org.elasticsearch.client.internal.Client;
22+
import org.elasticsearch.common.breaker.CircuitBreaker;
23+
import org.elasticsearch.common.breaker.CircuitBreakingException;
2224
import org.elasticsearch.common.settings.Settings;
2325
import org.elasticsearch.core.TimeValue;
2426
import org.elasticsearch.index.reindex.BulkByScrollResponse;
@@ -35,6 +37,7 @@
3537
import org.elasticsearch.xpack.core.indexing.IndexerState;
3638
import org.elasticsearch.xpack.core.indexing.IterationResult;
3739
import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction;
40+
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
3841
import org.elasticsearch.xpack.core.transform.transforms.TimeRetentionPolicyConfigTests;
3942
import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig;
4043
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
@@ -43,6 +46,7 @@
4346
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
4447
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
4548
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
49+
import org.elasticsearch.xpack.transform.Transform;
4650
import org.elasticsearch.xpack.transform.TransformNode;
4751
import org.elasticsearch.xpack.transform.TransformServices;
4852
import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
@@ -59,7 +63,9 @@
5963
import java.time.Clock;
6064
import java.util.Collections;
6165
import java.util.Map;
66+
import java.util.concurrent.BlockingDeque;
6267
import java.util.concurrent.CountDownLatch;
68+
import java.util.concurrent.LinkedBlockingDeque;
6369
import java.util.concurrent.TimeUnit;
6470
import java.util.concurrent.atomic.AtomicBoolean;
6571
import java.util.concurrent.atomic.AtomicReference;
@@ -107,9 +113,13 @@ class MockedTransformIndexer extends TransformIndexer {
107113
private CountDownLatch searchLatch;
108114
private CountDownLatch doProcessLatch;
109115
private CountDownLatch doSaveStateLatch;
116+
private CountDownLatch afterFinishOrFailureLatch;
110117

111118
private AtomicBoolean saveStateInProgress = new AtomicBoolean(false);
112119

120+
private BlockingDeque<Exception> searchExceptions = new LinkedBlockingDeque<>();
121+
private BlockingDeque<Runnable> runBeforeOnFinish = new LinkedBlockingDeque<>();
122+
113123
// how many loops to execute until reporting done
114124
private int numberOfLoops;
115125

@@ -211,7 +221,11 @@ protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse>
211221
throw new IllegalStateException(e);
212222
}
213223
}
214-
threadPool.generic().execute(() -> nextPhase.onResponse(ONE_HIT_SEARCH_RESPONSE));
224+
if (searchExceptions.isEmpty() == false) {
225+
nextPhase.onFailure(searchExceptions.poll());
226+
} else {
227+
threadPool.generic().execute(() -> nextPhase.onResponse(ONE_HIT_SEARCH_RESPONSE));
228+
}
215229
}
216230

217231
@Override
@@ -261,6 +275,22 @@ void doMaybeCreateDestIndex(Map<String, String> deducedDestIndexMappings, Action
261275
listener.onResponse(null);
262276
}
263277

278+
@Override
279+
protected void onFinish(ActionListener<Void> listener) {
280+
while (runBeforeOnFinish.isEmpty() == false) {
281+
runBeforeOnFinish.poll().run();
282+
}
283+
super.onFinish(listener);
284+
}
285+
286+
@Override
287+
protected void afterFinishOrFailure() {
288+
super.afterFinishOrFailure();
289+
if (afterFinishOrFailureLatch != null) {
290+
afterFinishOrFailureLatch.countDown();
291+
}
292+
}
293+
264294
public boolean waitingForNextSearch() {
265295
return super.getScheduledNextSearch() != null;
266296
}
@@ -278,6 +308,14 @@ void persistState(TransformState state, ActionListener<Void> listener) {
278308
void validate(ActionListener<ValidateTransformAction.Response> listener) {
279309
listener.onResponse(null);
280310
}
311+
312+
public void addAfterFinishOrFailureLatch() {
313+
afterFinishOrFailureLatch = new CountDownLatch(1);
314+
}
315+
316+
public void waitForAfterFinishOrFailureLatch(long timeout, TimeUnit unit) throws InterruptedException {
317+
assertTrue(afterFinishOrFailureLatch.await(timeout, unit));
318+
}
281319
}
282320

283321
@Before
@@ -439,6 +477,135 @@ public void testInterActionWhileIndexerShutsdown() throws Exception {
439477
assertBusy(() -> assertEquals(IndexerState.STOPPED, indexer.getState()), 5, TimeUnit.SECONDS);
440478
}
441479

480+
public void testMaxPageSearchSizeIsResetToDefaultValue() throws Exception {
481+
TransformConfig config = new TransformConfig(
482+
randomAlphaOfLength(10),
483+
randomSourceConfig(),
484+
randomDestConfig(),
485+
null,
486+
new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)),
487+
null,
488+
randomPivotConfig(),
489+
null,
490+
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
491+
null,
492+
null,
493+
null,
494+
null,
495+
null
496+
);
497+
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STARTED);
498+
499+
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class));
500+
final MockedTransformIndexer indexer = createMockIndexer(
501+
1,
502+
config,
503+
state,
504+
null,
505+
threadPool,
506+
auditor,
507+
new TransformIndexerStats(),
508+
context
509+
);
510+
511+
// add latches
512+
CountDownLatch searchLatch = indexer.createAwaitForSearchLatch(1);
513+
indexer.addAfterFinishOrFailureLatch();
514+
515+
indexer.start();
516+
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
517+
assertEquals(indexer.getState(), IndexerState.INDEXING);
518+
519+
// set circuit breaker to 50%
520+
indexer.searchExceptions.offer(new CircuitBreakingException("hello", 2, 1, CircuitBreaker.Durability.TRANSIENT));
521+
indexer.runBeforeOnFinish.offer(() -> {
522+
assertEquals(Math.round(Transform.DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE / 2.0), context.getPageSize());
523+
});
524+
assertFalse(indexer.runBeforeOnFinish.isEmpty());
525+
526+
// run and wait
527+
searchLatch.countDown();
528+
indexer.waitForAfterFinishOrFailureLatch(5, TimeUnit.SECONDS);
529+
530+
// rerun, don't throw an exception this time
531+
searchLatch = indexer.createAwaitForSearchLatch(1);
532+
indexer.addAfterFinishOrFailureLatch();
533+
assertBusy(() -> assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())));
534+
searchLatch.countDown();
535+
indexer.waitForAfterFinishOrFailureLatch(5, TimeUnit.SECONDS);
536+
537+
// verify that we checked the pageSize decreased
538+
assertTrue(indexer.runBeforeOnFinish.isEmpty());
539+
// verify that the pageSize reset
540+
assertEquals(Transform.DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE.intValue(), context.getPageSize());
541+
}
542+
543+
public void testMaxPageSearchSizeIsResetToConfiguredValue() throws Exception {
544+
TransformConfig config = new TransformConfig(
545+
randomAlphaOfLength(10),
546+
randomSourceConfig(),
547+
randomDestConfig(),
548+
null,
549+
new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)),
550+
null,
551+
randomPivotConfig(),
552+
null,
553+
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
554+
null,
555+
null,
556+
null,
557+
null,
558+
null
559+
);
560+
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STARTED);
561+
562+
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class));
563+
final MockedTransformIndexer indexer = createMockIndexer(
564+
1,
565+
config,
566+
state,
567+
null,
568+
threadPool,
569+
auditor,
570+
new TransformIndexerStats(),
571+
context
572+
);
573+
574+
// add latches
575+
CountDownLatch searchLatch = indexer.createAwaitForSearchLatch(1);
576+
indexer.addAfterFinishOrFailureLatch();
577+
578+
indexer.start();
579+
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
580+
assertEquals(indexer.getState(), IndexerState.INDEXING);
581+
582+
var configuredMaxPageSearchSize = 20_000;
583+
indexer.applyNewSettings(
584+
new SettingsConfig.Builder(SettingsConfig.EMPTY).setMaxPageSearchSize(configuredMaxPageSearchSize).build()
585+
);
586+
587+
// set circuit breaker to 50%
588+
indexer.searchExceptions.offer(new CircuitBreakingException("hello", 2, 1, CircuitBreaker.Durability.TRANSIENT));
589+
indexer.runBeforeOnFinish.offer(() -> { assertEquals(Math.round(configuredMaxPageSearchSize / 2.0), context.getPageSize()); });
590+
assertFalse(indexer.runBeforeOnFinish.isEmpty());
591+
592+
// run and wait
593+
searchLatch.countDown();
594+
indexer.waitForAfterFinishOrFailureLatch(5, TimeUnit.SECONDS);
595+
596+
// rerun, don't throw an exception this time
597+
searchLatch = indexer.createAwaitForSearchLatch(1);
598+
indexer.addAfterFinishOrFailureLatch();
599+
assertBusy(() -> assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())));
600+
searchLatch.countDown();
601+
indexer.waitForAfterFinishOrFailureLatch(5, TimeUnit.SECONDS);
602+
603+
// verify that we checked the pageSize decreased
604+
assertTrue(indexer.runBeforeOnFinish.isEmpty());
605+
// verify that the pageSize reset
606+
assertEquals(configuredMaxPageSearchSize, context.getPageSize());
607+
}
608+
442609
private MockedTransformIndexer createMockIndexer(
443610
int numberOfLoops,
444611
TransformConfig config,

0 commit comments

Comments
 (0)