Skip to content

Commit 28e54f6

Browse files
authored
Revert "[Transform] Allow transforms to use PIT with remote clusters … (#107970)
This reverts commit 9b584aa.
1 parent 585fae3 commit 28e54f6

File tree

3 files changed

+72
-9
lines changed

3 files changed

+72
-9
lines changed

docs/changelog/107970.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 107970
2+
summary: "Revert \"[Transform] Allow transforms to use PIT with remote clusters …"
3+
area: Transform
4+
type: bug
5+
issues: []

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,8 @@ private void injectPointInTimeIfNeeded(
477477
ActionListener<Tuple<String, SearchRequest>> listener
478478
) {
479479
SearchRequest searchRequest = namedSearchRequest.v2();
480-
if (disablePit || searchRequest.indices().length == 0) {
480+
// We explicitly disable PIT in the presence of remote clusters in the source due to huge PIT handles causing performance problems.
481+
if (disablePit || searchRequest.indices().length == 0 || transformConfig.getSource().requiresRemoteCluster()) {
481482
listener.onResponse(namedSearchRequest);
482483
return;
483484
}

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -301,14 +301,8 @@ public void testPitInjectionIfPitNotSupported() throws InterruptedException {
301301
}
302302

303303
public void testDisablePit() throws InterruptedException {
304-
TransformConfig.Builder configBuilder = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig());
305-
if (randomBoolean()) {
306-
// TransformConfigTests.randomTransformConfig never produces remote indices in the source.
307-
// We need to explicitly set the remote index here for coverage.
308-
configBuilder.setSource(new SourceConfig("remote-cluster:remote-index"));
309-
}
310-
TransformConfig config = configBuilder.build();
311-
304+
// TransformConfigTests.randomTransformConfig never produces remote indices in the source, hence we are safe here. */
305+
TransformConfig config = TransformConfigTests.randomTransformConfig();
312306
boolean pitEnabled = config.getSettings().getUsePit() == null || config.getSettings().getUsePit();
313307

314308
try (var threadPool = createThreadPool()) {
@@ -371,6 +365,69 @@ public void testDisablePit() throws InterruptedException {
371365
}
372366
}
373367

368+
public void testDisablePitWhenThereIsRemoteIndexInSource() throws InterruptedException {
369+
TransformConfig config = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig())
370+
// Remote index is configured within source
371+
.setSource(new SourceConfig("remote-cluster:remote-index"))
372+
.build();
373+
boolean pitEnabled = config.getSettings().getUsePit() == null || config.getSettings().getUsePit();
374+
375+
try (var threadPool = createThreadPool()) {
376+
final var client = new PitMockClient(threadPool, true);
377+
MockClientTransformIndexer indexer = new MockClientTransformIndexer(
378+
mock(ThreadPool.class),
379+
mock(ClusterService.class),
380+
mock(IndexNameExpressionResolver.class),
381+
mock(TransformExtension.class),
382+
new TransformServices(
383+
mock(IndexBasedTransformConfigManager.class),
384+
mock(TransformCheckpointService.class),
385+
mock(TransformAuditor.class),
386+
new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO)
387+
),
388+
mock(CheckpointProvider.class),
389+
new AtomicReference<>(IndexerState.STOPPED),
390+
null,
391+
new ParentTaskAssigningClient(client, new TaskId("dummy-node:123456")),
392+
mock(TransformIndexerStats.class),
393+
config,
394+
null,
395+
new TransformCheckpoint(
396+
"transform",
397+
Instant.now().toEpochMilli(),
398+
0L,
399+
Collections.emptyMap(),
400+
Instant.now().toEpochMilli()
401+
),
402+
new TransformCheckpoint(
403+
"transform",
404+
Instant.now().toEpochMilli(),
405+
2L,
406+
Collections.emptyMap(),
407+
Instant.now().toEpochMilli()
408+
),
409+
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
410+
mock(TransformContext.class),
411+
false
412+
);
413+
414+
// Because remote index is configured within source, we expect PIT *not* being used regardless the transform settings
415+
this.<SearchResponse>assertAsync(
416+
listener -> indexer.doNextSearch(0, listener),
417+
response -> assertNull(response.pointInTimeId())
418+
);
419+
420+
// reverse the setting
421+
indexer.applyNewSettings(new SettingsConfig.Builder().setUsePit(pitEnabled == false).build());
422+
423+
// Because remote index is configured within source, we expect PIT *not* being used regardless the transform settings
424+
this.<SearchResponse>assertAsync(
425+
listener -> indexer.doNextSearch(0, listener),
426+
response -> assertNull(response.pointInTimeId())
427+
);
428+
}
429+
}
430+
374431
public void testHandlePitIndexNotFound() throws InterruptedException {
375432
// simulate a deleted index due to ILM
376433
try (var threadPool = createThreadPool()) {

0 commit comments

Comments
 (0)