Skip to content

Commit 149178c

Browse files
authored
[9.2] [Transform] Fix transform producing empty dest index when source query references runtime fields (elastic#142450) (elastic#142830)
* [Transform] Fix transform producing empty dest index when source query references runtime fields (elastic#142450) When a transform's source query references a runtime field (e.g., a range filter on a runtime-mapped field), the destination index is produced empty even though _transform/_preview returns correct results. The root cause is that ClientTransformIndexer.injectPointInTimeIfNeeded() passes the source query as the PIT indexFilter, but OpenPointInTimeRequest does not support runtime_mappings. During the can_match phase, the query on the unknown runtime field is rewritten to match_none, all shards are filtered out, and the PIT opens with zero search contexts — causing every subsequent search to return empty results. Preview is unaffected because it uses a normal SearchSourceBuilder with both the query and runtime_mappings and does not use PIT. This change skips the PIT index filter optimization when the source config has non-empty runtime_mappings, since the filter cannot resolve runtime fields. The PIT is still opened for snapshot consistency, just without shard pre-filtering. Additionally, two other code paths that use the source query without propagating runtime_mappings are fixed: the initial progress search in TransformIndexer and the sourceHasChanged check in TimeBasedCheckpointProvider. All three fixes follow the same principle — wherever the source query is used, runtime_mappings from the source config must be propagated alongside it. Fixes elastic#113156 (cherry picked from commit cf28432) # Conflicts: # x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java * fix compilation issues
1 parent 64cf45c commit 149178c

File tree

7 files changed

+217
-78
lines changed

7 files changed

+217
-78
lines changed

docs/changelog/142450.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
area: Transform
2+
issues:
3+
- 113156
4+
pr: 142450
5+
summary: Fix transform producing empty dest index when source query references runtime
6+
fields
7+
type: bug

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProvider.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public void sourceHasChanged(TransformCheckpoint lastCheckpoint, ActionListener<
7171
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().size(0)
7272
// we only want to know if there is at least 1 new document
7373
.trackTotalHitsUpTo(1)
74+
.runtimeMappings(transformConfig.getSource().getRuntimeMappings())
7475
.query(queryBuilder);
7576
SearchRequest searchRequest = new SearchRequest(transformConfig.getSource().getIndex()).allowPartialSearchResults(false)
7677
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -550,8 +550,11 @@ private void injectPointInTimeIfNeeded(
550550

551551
// no pit, create a new one
552552
OpenPointInTimeRequest pitRequest = new OpenPointInTimeRequest(searchRequest.indices()).keepAlive(PIT_KEEP_ALIVE);
553-
// use index filter for better performance
554-
pitRequest.indexFilter(transformConfig.getSource().getQueryConfig().getQuery());
553+
// Only use index filter when there are no runtime mappings, because OpenPointInTimeRequest
554+
// does not support runtime_mappings and the query may reference runtime fields
555+
if (transformConfig.getSource().getRuntimeMappings().isEmpty()) {
556+
pitRequest.indexFilter(transformConfig.getSource().getQueryConfig().getQuery());
557+
}
555558

556559
ClientHelper.executeWithHeadersAsync(
557560
transformConfig.getHeaders(),

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,9 @@ protected void onStart(long now, ActionListener<Boolean> listener) {
323323

324324
// get progress information
325325
SearchRequest request = new SearchRequest(transformConfig.getSource().getIndex());
326-
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
326+
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().runtimeMappings(
327+
transformConfig.getSource().getRuntimeMappings()
328+
);
327329

328330
function.buildSearchQueryForInitialProgress(searchSourceBuilder);
329331
searchSourceBuilder.query(QueryBuilders.boolQuery().filter(buildFilterQuery()).filter(searchSourceBuilder.query()));

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
import org.elasticsearch.threadpool.ThreadPool;
3232
import org.elasticsearch.xpack.core.transform.TransformConfigVersion;
3333
import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction;
34+
import org.elasticsearch.xpack.core.transform.transforms.QueryConfig;
3435
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
36+
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
3537
import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig;
3638
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
3739
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
@@ -50,6 +52,7 @@
5052

5153
import java.time.Clock;
5254
import java.util.Collections;
55+
import java.util.Map;
5356
import java.util.concurrent.CountDownLatch;
5457
import java.util.concurrent.TimeUnit;
5558
import java.util.function.Supplier;
@@ -283,6 +286,41 @@ private void testCreateNextCheckpoint(
283286
assertThat(exceptionHolder.get(), is(nullValue()));
284287
}
285288

289+
public void testSourceHasChangedIncludesRuntimeMappings() throws InterruptedException {
290+
// Arrange: create a config with explicit runtime_mappings
291+
Map<String, Object> runtimeMappings = Map.of(
292+
"total_price_with_tax",
293+
Map.of("type", "double", "script", Map.of("source", "emit(1.0)"))
294+
);
295+
SourceConfig sourceWithRuntimeMappings = new SourceConfig(new String[] { "source_index" }, QueryConfig.matchAll(), runtimeMappings);
296+
TransformConfig transformConfig = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig()).setSource(
297+
sourceWithRuntimeMappings
298+
).setSyncConfig(new TimeSyncConfig(TIMESTAMP_FIELD, TimeValue.ZERO)).build();
299+
300+
final SearchResponse searchResponse = newSearchResponse(0);
301+
try {
302+
doAnswer(withResponse(searchResponse)).when(client).execute(eq(TransportSearchAction.TYPE), any(), any());
303+
TimeBasedCheckpointProvider provider = newCheckpointProvider(transformConfig);
304+
305+
// Act: call sourceHasChanged
306+
CountDownLatch latch = new CountDownLatch(1);
307+
provider.sourceHasChanged(TransformCheckpoint.EMPTY, new LatchedActionListener<>(ActionListener.wrap(r -> {}, e -> {}), latch));
308+
assertThat(latch.await(100, TimeUnit.MILLISECONDS), is(true));
309+
310+
// Assert: the search request should include runtime_mappings
311+
ArgumentCaptor<SearchRequest> searchRequestCaptor = ArgumentCaptor.forClass(SearchRequest.class);
312+
verify(client).execute(eq(TransportSearchAction.TYPE), searchRequestCaptor.capture(), any());
313+
SearchRequest capturedRequest = searchRequestCaptor.getValue();
314+
assertThat(
315+
"sourceHasChanged search should include runtime_mappings from the source config",
316+
capturedRequest.source().runtimeMappings(),
317+
is(equalTo(runtimeMappings))
318+
);
319+
} finally {
320+
searchResponse.decRef();
321+
}
322+
}
323+
286324
private TimeBasedCheckpointProvider newCheckpointProvider(TransformConfig transformConfig) {
287325
return new TimeBasedCheckpointProvider(
288326
clock,

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java

Lines changed: 103 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.elasticsearch.transport.ActionNotFoundTransportException;
5252
import org.elasticsearch.xpack.core.indexing.IndexerState;
5353
import org.elasticsearch.xpack.core.transform.TransformMetadata;
54+
import org.elasticsearch.xpack.core.transform.transforms.QueryConfig;
5455
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
5556
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
5657
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
@@ -77,6 +78,7 @@
7778
import java.util.ArrayList;
7879
import java.util.Collections;
7980
import java.util.List;
81+
import java.util.Map;
8082
import java.util.concurrent.CountDownLatch;
8183
import java.util.concurrent.ExecutorService;
8284
import java.util.concurrent.TimeUnit;
@@ -150,43 +152,7 @@ public void testPitInjection() throws InterruptedException {
150152

151153
try (var threadPool = createThreadPool()) {
152154
final var client = new PitMockClient(threadPool, true);
153-
MockClientTransformIndexer indexer = new MockClientTransformIndexer(
154-
mock(ThreadPool.class),
155-
mock(ClusterService.class),
156-
mock(IndexNameExpressionResolver.class),
157-
mock(TransformExtension.class),
158-
new TransformServices(
159-
mock(IndexBasedTransformConfigManager.class),
160-
mock(TransformCheckpointService.class),
161-
mock(TransformAuditor.class),
162-
new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO),
163-
mock(TransformNode.class)
164-
),
165-
mock(CheckpointProvider.class),
166-
new AtomicReference<>(IndexerState.STOPPED),
167-
null,
168-
new ParentTaskAssigningClient(client, new TaskId("dummy-node:123456")),
169-
mock(TransformIndexerStats.class),
170-
config,
171-
null,
172-
new TransformCheckpoint(
173-
"transform",
174-
Instant.now().toEpochMilli(),
175-
0L,
176-
Collections.emptyMap(),
177-
Instant.now().toEpochMilli()
178-
),
179-
new TransformCheckpoint(
180-
"transform",
181-
Instant.now().toEpochMilli(),
182-
2L,
183-
Collections.emptyMap(),
184-
Instant.now().toEpochMilli()
185-
),
186-
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
187-
mock(TransformContext.class),
188-
false
189-
);
155+
MockClientTransformIndexer indexer = createMockIndexerForPitTest(client, config);
190156

191157
this.<SearchResponse>assertAsync(listener -> indexer.doNextSearch(0, listener), response -> {
192158
assertEquals(new BytesArray("the_pit_id+"), response.pointInTimeId());
@@ -258,43 +224,7 @@ public void testPitInjectionIfPitNotSupported() throws InterruptedException {
258224

259225
try (var threadPool = createThreadPool()) {
260226
final var client = new PitMockClient(threadPool, false);
261-
MockClientTransformIndexer indexer = new MockClientTransformIndexer(
262-
mock(ThreadPool.class),
263-
mock(ClusterService.class),
264-
mock(IndexNameExpressionResolver.class),
265-
mock(TransformExtension.class),
266-
new TransformServices(
267-
mock(IndexBasedTransformConfigManager.class),
268-
mock(TransformCheckpointService.class),
269-
mock(TransformAuditor.class),
270-
new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO),
271-
mock(TransformNode.class)
272-
),
273-
mock(CheckpointProvider.class),
274-
new AtomicReference<>(IndexerState.STOPPED),
275-
null,
276-
new ParentTaskAssigningClient(client, new TaskId("dummy-node:123456")),
277-
mock(TransformIndexerStats.class),
278-
config,
279-
null,
280-
new TransformCheckpoint(
281-
"transform",
282-
Instant.now().toEpochMilli(),
283-
0L,
284-
Collections.emptyMap(),
285-
Instant.now().toEpochMilli()
286-
),
287-
new TransformCheckpoint(
288-
"transform",
289-
Instant.now().toEpochMilli(),
290-
2L,
291-
Collections.emptyMap(),
292-
Instant.now().toEpochMilli()
293-
),
294-
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
295-
mock(TransformContext.class),
296-
false
297-
);
227+
MockClientTransformIndexer indexer = createMockIndexerForPitTest(client, config);
298228

299229
this.<SearchResponse>assertAsync(
300230
listener -> indexer.doNextSearch(0, listener),
@@ -463,6 +393,64 @@ public void testDisablePitWhenThereIsRemoteIndexInSource() throws InterruptedExc
463393
}
464394
}
465395

396+
public void testPitIndexFilterOmittedWhenRuntimeMappingsPresent() throws InterruptedException {
397+
// Arrange: create a config with runtime_mappings and PIT enabled
398+
Map<String, Object> runtimeMappings = Map.of(
399+
"total_price_with_tax",
400+
Map.of("type", "double", "script", Map.of("source", "emit(1.0)"))
401+
);
402+
TransformConfig config = newPitEnabledConfigWithSource(
403+
new SourceConfig(new String[] { "source_index" }, QueryConfig.matchAll(), runtimeMappings)
404+
);
405+
406+
try (var threadPool = createThreadPool()) {
407+
final var client = new PitMockClient(threadPool, true);
408+
MockClientTransformIndexer indexer = createMockIndexerForPitTest(client, config);
409+
410+
// Act: trigger a search which opens a PIT
411+
this.<SearchResponse>assertAsync(
412+
listener -> indexer.doNextSearch(0, listener),
413+
response -> { assertNotNull(response.pointInTimeId()); }
414+
);
415+
416+
// Assert: the PIT request should NOT have an indexFilter when runtime_mappings are present,
417+
// because OpenPointInTimeRequest does not support runtime_mappings and the query may reference runtime fields
418+
OpenPointInTimeRequest capturedPitRequest = client.getLastOpenPitRequest();
419+
assertNotNull("Expected a PIT open request to have been captured", capturedPitRequest);
420+
assertNull(
421+
"PIT indexFilter should be null when runtime_mappings are present, "
422+
+ "because OpenPointInTimeRequest does not support runtime_mappings",
423+
capturedPitRequest.indexFilter()
424+
);
425+
}
426+
}
427+
428+
public void testPitIndexFilterSetWhenNoRuntimeMappings() throws InterruptedException {
429+
// Arrange: create a config WITHOUT runtime_mappings and PIT enabled
430+
TransformConfig config = newPitEnabledConfigWithSource(
431+
new SourceConfig(new String[] { "source_index" }, QueryConfig.matchAll(), Collections.emptyMap())
432+
);
433+
434+
try (var threadPool = createThreadPool()) {
435+
final var client = new PitMockClient(threadPool, true);
436+
MockClientTransformIndexer indexer = createMockIndexerForPitTest(client, config);
437+
438+
// Act: trigger a search which opens a PIT
439+
this.<SearchResponse>assertAsync(
440+
listener -> indexer.doNextSearch(0, listener),
441+
response -> { assertNotNull(response.pointInTimeId()); }
442+
);
443+
444+
// Assert: the PIT request SHOULD have an indexFilter when no runtime_mappings are present
445+
OpenPointInTimeRequest capturedPitRequest = client.getLastOpenPitRequest();
446+
assertNotNull("Expected a PIT open request to have been captured", capturedPitRequest);
447+
assertNotNull(
448+
"PIT indexFilter should be set when no runtime_mappings are present for performance optimization",
449+
capturedPitRequest.indexFilter()
450+
);
451+
}
452+
}
453+
466454
public void testHandlePitIndexNotFound() throws InterruptedException {
467455
// simulate a deleted index due to ILM
468456
try (var threadPool = createThreadPool()) {
@@ -548,6 +536,40 @@ private IndexNameExpressionResolver resolver() {
548536
return resolver;
549537
}
550538

539+
private static TransformConfig newPitEnabledConfigWithSource(SourceConfig sourceConfig) {
540+
return new TransformConfig.Builder(TransformConfigTests.randomTransformConfig()).setSource(sourceConfig)
541+
.setSettings(new SettingsConfig.Builder().setUsePit(true).build())
542+
.build();
543+
}
544+
545+
private MockClientTransformIndexer createMockIndexerForPitTest(PitMockClient client, TransformConfig config) {
546+
return new MockClientTransformIndexer(
547+
mock(ThreadPool.class),
548+
mock(ClusterService.class),
549+
mock(IndexNameExpressionResolver.class),
550+
mock(TransformExtension.class),
551+
new TransformServices(
552+
mock(IndexBasedTransformConfigManager.class),
553+
mock(TransformCheckpointService.class),
554+
mock(TransformAuditor.class),
555+
new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO),
556+
mock(TransformNode.class)
557+
),
558+
mock(CheckpointProvider.class),
559+
new AtomicReference<>(IndexerState.STOPPED),
560+
null,
561+
new ParentTaskAssigningClient(client, new TaskId("dummy-node:123456")),
562+
mock(TransformIndexerStats.class),
563+
config,
564+
null,
565+
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 0L, Collections.emptyMap(), Instant.now().toEpochMilli()),
566+
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 2L, Collections.emptyMap(), Instant.now().toEpochMilli()),
567+
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
568+
mock(TransformContext.class),
569+
false
570+
);
571+
}
572+
551573
private static class MockClientTransformIndexer extends ClientTransformIndexer {
552574

553575
MockClientTransformIndexer(
@@ -600,12 +622,17 @@ private static class PitMockClient extends NoOpClient {
600622
private final boolean pitSupported;
601623
private AtomicLong pitContextCounter = new AtomicLong();
602624
private List<Runnable> beforeCloseListeners = new ArrayList<>();
625+
private final AtomicReference<OpenPointInTimeRequest> lastOpenPitRequest = new AtomicReference<>();
603626

604627
PitMockClient(ThreadPool threadPool, boolean pitSupported) {
605628
super(threadPool);
606629
this.pitSupported = pitSupported;
607630
}
608631

632+
public OpenPointInTimeRequest getLastOpenPitRequest() {
633+
return lastOpenPitRequest.get();
634+
}
635+
609636
public void addBeforeCloseListener(Runnable listener) {
610637
this.beforeCloseListeners.add(listener);
611638
}
@@ -621,7 +648,8 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
621648
Request request,
622649
ActionListener<Response> listener
623650
) {
624-
if (request instanceof OpenPointInTimeRequest) {
651+
if (request instanceof OpenPointInTimeRequest openPitRequest) {
652+
lastOpenPitRequest.set(openPitRequest);
625653
if (pitSupported) {
626654
pitContextCounter.incrementAndGet();
627655
OpenPointInTimeResponse response = new OpenPointInTimeResponse(new BytesArray("the_pit_id"), 1, 1, 0, 0);

0 commit comments

Comments
 (0)