Skip to content

Commit cf28432

Browse files
authored
[Transform] Fix transform producing empty dest index when source query references runtime fields (elastic#142450)
When a transform's source query references a runtime field (e.g., a range filter on a runtime-mapped field), the destination index is produced empty even though _transform/_preview returns correct results. The root cause is that ClientTransformIndexer.injectPointInTimeIfNeeded() passes the source query as the PIT indexFilter, but OpenPointInTimeRequest does not support runtime_mappings. During the can_match phase, the query on the unknown runtime field is rewritten to match_none, all shards are filtered out, and the PIT opens with zero search contexts — causing every subsequent search to return empty results. Preview is unaffected because it uses a normal SearchSourceBuilder with both the query and runtime_mappings and does not use PIT. This change skips the PIT index filter optimization when the source config has non-empty runtime_mappings, since the filter cannot resolve runtime fields. The PIT is still opened for snapshot consistency, just without shard pre-filtering. Additionally, two other code paths that use the source query without propagating runtime_mappings are fixed: the initial progress search in TransformIndexer and the sourceHasChanged check in TimeBasedCheckpointProvider. All three fixes follow the same principle — wherever the source query is used, runtime_mappings from the source config must be propagated alongside it. Fixes elastic#113156
1 parent b650f35 commit cf28432

File tree

7 files changed

+228
-80
lines changed

7 files changed

+228
-80
lines changed

docs/changelog/142450.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
area: Transform
2+
issues:
3+
- 113156
4+
pr: 142450
5+
summary: Fix transform producing empty dest index when source query references runtime
6+
fields
7+
type: bug

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProvider.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public void sourceHasChanged(TransformCheckpoint lastCheckpoint, ActionListener<
7171
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().size(0)
7272
// we only want to know if there is at least 1 new document
7373
.trackTotalHitsUpTo(1)
74+
.runtimeMappings(transformConfig.getSource().getRuntimeMappings())
7475
.query(queryBuilder);
7576
SearchRequest searchRequest = new SearchRequest(transformConfig.getSource().getIndex()).allowPartialSearchResults(false)
7677
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -550,8 +550,11 @@ private void injectPointInTimeIfNeeded(
550550

551551
// no pit, create a new one
552552
OpenPointInTimeRequest pitRequest = new OpenPointInTimeRequest(searchRequest.indices()).keepAlive(PIT_KEEP_ALIVE);
553-
// use index filter for better performance
554-
pitRequest.indexFilter(transformConfig.getSource().getQueryConfig().getQuery());
553+
// Only use index filter when there are no runtime mappings, because OpenPointInTimeRequest
554+
// does not support runtime_mappings and the query may reference runtime fields
555+
if (transformConfig.getSource().getRuntimeMappings().isEmpty()) {
556+
pitRequest.indexFilter(transformConfig.getSource().getQueryConfig().getQuery());
557+
}
555558

556559
ClientHelper.executeWithHeadersAsync(
557560
transformConfig.getHeaders(),

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,9 @@ protected void onStart(long now, ActionListener<Boolean> listener) {
323323

324324
// get progress information
325325
SearchRequest request = new SearchRequest(transformConfig.getSource().getIndex());
326-
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
326+
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().runtimeMappings(
327+
transformConfig.getSource().getRuntimeMappings()
328+
);
327329

328330
function.buildSearchQueryForInitialProgress(searchSourceBuilder);
329331
searchSourceBuilder.query(QueryBuilders.boolQuery().filter(buildFilterQuery()).filter(searchSourceBuilder.query()));

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
import org.elasticsearch.threadpool.ThreadPool;
3232
import org.elasticsearch.xpack.core.transform.TransformConfigVersion;
3333
import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction;
34+
import org.elasticsearch.xpack.core.transform.transforms.QueryConfig;
3435
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
36+
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
3537
import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig;
3638
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
3739
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
@@ -50,6 +52,7 @@
5052

5153
import java.time.Clock;
5254
import java.util.Collections;
55+
import java.util.Map;
5356
import java.util.concurrent.CountDownLatch;
5457
import java.util.concurrent.TimeUnit;
5558
import java.util.function.Supplier;
@@ -283,6 +286,46 @@ private void testCreateNextCheckpoint(
283286
assertThat(exceptionHolder.get(), is(nullValue()));
284287
}
285288

289+
public void testSourceHasChangedIncludesRuntimeMappings() throws InterruptedException {
290+
// Arrange: create a config with explicit runtime_mappings
291+
Map<String, Object> runtimeMappings = Map.of(
292+
"total_price_with_tax",
293+
Map.of("type", "double", "script", Map.of("source", "emit(1.0)"))
294+
);
295+
SourceConfig sourceWithRuntimeMappings = new SourceConfig(
296+
new String[] { "source_index" },
297+
QueryConfig.matchAll(),
298+
runtimeMappings,
299+
null
300+
);
301+
TransformConfig transformConfig = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig()).setSource(
302+
sourceWithRuntimeMappings
303+
).setSyncConfig(new TimeSyncConfig(TIMESTAMP_FIELD, TimeValue.ZERO)).build();
304+
305+
final SearchResponse searchResponse = newSearchResponse(0);
306+
try {
307+
doAnswer(withResponse(searchResponse)).when(client).execute(eq(TransportSearchAction.TYPE), any(), any());
308+
TimeBasedCheckpointProvider provider = newCheckpointProvider(transformConfig);
309+
310+
// Act: call sourceHasChanged
311+
CountDownLatch latch = new CountDownLatch(1);
312+
provider.sourceHasChanged(TransformCheckpoint.EMPTY, new LatchedActionListener<>(ActionListener.wrap(r -> {}, e -> {}), latch));
313+
assertThat(latch.await(100, TimeUnit.MILLISECONDS), is(true));
314+
315+
// Assert: the search request should include runtime_mappings
316+
ArgumentCaptor<SearchRequest> searchRequestCaptor = ArgumentCaptor.forClass(SearchRequest.class);
317+
verify(client).execute(eq(TransportSearchAction.TYPE), searchRequestCaptor.capture(), any());
318+
SearchRequest capturedRequest = searchRequestCaptor.getValue();
319+
assertThat(
320+
"sourceHasChanged search should include runtime_mappings from the source config",
321+
capturedRequest.source().runtimeMappings(),
322+
is(equalTo(runtimeMappings))
323+
);
324+
} finally {
325+
searchResponse.decRef();
326+
}
327+
}
328+
286329
private TimeBasedCheckpointProvider newCheckpointProvider(TransformConfig transformConfig) {
287330
return new TimeBasedCheckpointProvider(
288331
clock,

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java

Lines changed: 104 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.elasticsearch.transport.ActionNotFoundTransportException;
5353
import org.elasticsearch.xpack.core.indexing.IndexerState;
5454
import org.elasticsearch.xpack.core.transform.TransformMetadata;
55+
import org.elasticsearch.xpack.core.transform.transforms.QueryConfig;
5556
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
5657
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
5758
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
@@ -78,6 +79,7 @@
7879
import java.util.ArrayList;
7980
import java.util.Collections;
8081
import java.util.List;
82+
import java.util.Map;
8183
import java.util.concurrent.CountDownLatch;
8284
import java.util.concurrent.ExecutorService;
8385
import java.util.concurrent.TimeUnit;
@@ -152,44 +154,7 @@ public void testPitInjection() throws InterruptedException {
152154

153155
try (var threadPool = createThreadPool()) {
154156
final var client = new PitMockClient(threadPool, true);
155-
MockClientTransformIndexer indexer = new MockClientTransformIndexer(
156-
mock(ThreadPool.class),
157-
mock(ClusterService.class),
158-
mock(IndexNameExpressionResolver.class),
159-
mock(TransformExtension.class),
160-
new TransformServices(
161-
mock(IndexBasedTransformConfigManager.class),
162-
mock(TransformCheckpointService.class),
163-
mock(TransformAuditor.class),
164-
new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO),
165-
mock(TransformNode.class),
166-
mock(CrossProjectModeDecider.class)
167-
),
168-
mock(CheckpointProvider.class),
169-
new AtomicReference<>(IndexerState.STOPPED),
170-
null,
171-
new ParentTaskAssigningClient(client, new TaskId("dummy-node:123456")),
172-
mock(TransformIndexerStats.class),
173-
config,
174-
null,
175-
new TransformCheckpoint(
176-
"transform",
177-
Instant.now().toEpochMilli(),
178-
0L,
179-
Collections.emptyMap(),
180-
Instant.now().toEpochMilli()
181-
),
182-
new TransformCheckpoint(
183-
"transform",
184-
Instant.now().toEpochMilli(),
185-
2L,
186-
Collections.emptyMap(),
187-
Instant.now().toEpochMilli()
188-
),
189-
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
190-
mock(TransformContext.class),
191-
false
192-
);
157+
MockClientTransformIndexer indexer = createMockIndexerForPitTest(client, config);
193158

194159
this.<SearchResponse>assertAsync(listener -> indexer.doNextSearch(0, listener), response -> {
195160
assertThat(response.pointInTimeId(), equalBytes(new BytesArray("the_pit_id+")));
@@ -261,44 +226,7 @@ public void testPitInjectionIfPitNotSupported() throws InterruptedException {
261226

262227
try (var threadPool = createThreadPool()) {
263228
final var client = new PitMockClient(threadPool, false);
264-
MockClientTransformIndexer indexer = new MockClientTransformIndexer(
265-
mock(ThreadPool.class),
266-
mock(ClusterService.class),
267-
mock(IndexNameExpressionResolver.class),
268-
mock(TransformExtension.class),
269-
new TransformServices(
270-
mock(IndexBasedTransformConfigManager.class),
271-
mock(TransformCheckpointService.class),
272-
mock(TransformAuditor.class),
273-
new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO),
274-
mock(TransformNode.class),
275-
mock(CrossProjectModeDecider.class)
276-
),
277-
mock(CheckpointProvider.class),
278-
new AtomicReference<>(IndexerState.STOPPED),
279-
null,
280-
new ParentTaskAssigningClient(client, new TaskId("dummy-node:123456")),
281-
mock(TransformIndexerStats.class),
282-
config,
283-
null,
284-
new TransformCheckpoint(
285-
"transform",
286-
Instant.now().toEpochMilli(),
287-
0L,
288-
Collections.emptyMap(),
289-
Instant.now().toEpochMilli()
290-
),
291-
new TransformCheckpoint(
292-
"transform",
293-
Instant.now().toEpochMilli(),
294-
2L,
295-
Collections.emptyMap(),
296-
Instant.now().toEpochMilli()
297-
),
298-
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
299-
mock(TransformContext.class),
300-
false
301-
);
229+
MockClientTransformIndexer indexer = createMockIndexerForPitTest(client, config);
302230

303231
this.<SearchResponse>assertAsync(
304232
listener -> indexer.doNextSearch(0, listener),
@@ -469,6 +397,64 @@ public void testDisablePitWhenThereIsRemoteIndexInSource() throws InterruptedExc
469397
}
470398
}
471399

400+
public void testPitIndexFilterOmittedWhenRuntimeMappingsPresent() throws InterruptedException {
401+
// Arrange: create a config with runtime_mappings and PIT enabled
402+
Map<String, Object> runtimeMappings = Map.of(
403+
"total_price_with_tax",
404+
Map.of("type", "double", "script", Map.of("source", "emit(1.0)"))
405+
);
406+
TransformConfig config = newPitEnabledConfigWithSource(
407+
new SourceConfig(new String[] { "source_index" }, QueryConfig.matchAll(), runtimeMappings, null)
408+
);
409+
410+
try (var threadPool = createThreadPool()) {
411+
final var client = new PitMockClient(threadPool, true);
412+
MockClientTransformIndexer indexer = createMockIndexerForPitTest(client, config);
413+
414+
// Act: trigger a search which opens a PIT
415+
this.<SearchResponse>assertAsync(
416+
listener -> indexer.doNextSearch(0, listener),
417+
response -> { assertNotNull(response.pointInTimeId()); }
418+
);
419+
420+
// Assert: the PIT request should NOT have an indexFilter when runtime_mappings are present,
421+
// because OpenPointInTimeRequest does not support runtime_mappings and the query may reference runtime fields
422+
OpenPointInTimeRequest capturedPitRequest = client.getLastOpenPitRequest();
423+
assertNotNull("Expected a PIT open request to have been captured", capturedPitRequest);
424+
assertNull(
425+
"PIT indexFilter should be null when runtime_mappings are present, "
426+
+ "because OpenPointInTimeRequest does not support runtime_mappings",
427+
capturedPitRequest.indexFilter()
428+
);
429+
}
430+
}
431+
432+
public void testPitIndexFilterSetWhenNoRuntimeMappings() throws InterruptedException {
433+
// Arrange: create a config WITHOUT runtime_mappings and PIT enabled
434+
TransformConfig config = newPitEnabledConfigWithSource(
435+
new SourceConfig(new String[] { "source_index" }, QueryConfig.matchAll(), Collections.emptyMap(), null)
436+
);
437+
438+
try (var threadPool = createThreadPool()) {
439+
final var client = new PitMockClient(threadPool, true);
440+
MockClientTransformIndexer indexer = createMockIndexerForPitTest(client, config);
441+
442+
// Act: trigger a search which opens a PIT
443+
this.<SearchResponse>assertAsync(
444+
listener -> indexer.doNextSearch(0, listener),
445+
response -> { assertNotNull(response.pointInTimeId()); }
446+
);
447+
448+
// Assert: the PIT request SHOULD have an indexFilter when no runtime_mappings are present
449+
OpenPointInTimeRequest capturedPitRequest = client.getLastOpenPitRequest();
450+
assertNotNull("Expected a PIT open request to have been captured", capturedPitRequest);
451+
assertNotNull(
452+
"PIT indexFilter should be set when no runtime_mappings are present for performance optimization",
453+
capturedPitRequest.indexFilter()
454+
);
455+
}
456+
}
457+
472458
public void testHandlePitIndexNotFound() throws InterruptedException {
473459
// simulate a deleted index due to ILM
474460
try (var threadPool = createThreadPool()) {
@@ -554,6 +540,41 @@ private IndexNameExpressionResolver resolver() {
554540
return resolver;
555541
}
556542

543+
private static TransformConfig newPitEnabledConfigWithSource(SourceConfig sourceConfig) {
544+
return new TransformConfig.Builder(TransformConfigTests.randomTransformConfig()).setSource(sourceConfig)
545+
.setSettings(new SettingsConfig.Builder().setUsePit(true).build())
546+
.build();
547+
}
548+
549+
private MockClientTransformIndexer createMockIndexerForPitTest(PitMockClient client, TransformConfig config) {
550+
return new MockClientTransformIndexer(
551+
mock(ThreadPool.class),
552+
mock(ClusterService.class),
553+
mock(IndexNameExpressionResolver.class),
554+
mock(TransformExtension.class),
555+
new TransformServices(
556+
mock(IndexBasedTransformConfigManager.class),
557+
mock(TransformCheckpointService.class),
558+
mock(TransformAuditor.class),
559+
new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO),
560+
mock(TransformNode.class),
561+
mock(CrossProjectModeDecider.class)
562+
),
563+
mock(CheckpointProvider.class),
564+
new AtomicReference<>(IndexerState.STOPPED),
565+
null,
566+
new ParentTaskAssigningClient(client, new TaskId("dummy-node:123456")),
567+
mock(TransformIndexerStats.class),
568+
config,
569+
null,
570+
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 0L, Collections.emptyMap(), Instant.now().toEpochMilli()),
571+
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 2L, Collections.emptyMap(), Instant.now().toEpochMilli()),
572+
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
573+
mock(TransformContext.class),
574+
false
575+
);
576+
}
577+
557578
private static class MockClientTransformIndexer extends ClientTransformIndexer {
558579

559580
MockClientTransformIndexer(
@@ -606,12 +627,17 @@ private static class PitMockClient extends NoOpClient {
606627
private final boolean pitSupported;
607628
private AtomicLong pitContextCounter = new AtomicLong();
608629
private List<Runnable> beforeCloseListeners = new ArrayList<>();
630+
private final AtomicReference<OpenPointInTimeRequest> lastOpenPitRequest = new AtomicReference<>();
609631

610632
PitMockClient(ThreadPool threadPool, boolean pitSupported) {
611633
super(threadPool);
612634
this.pitSupported = pitSupported;
613635
}
614636

637+
public OpenPointInTimeRequest getLastOpenPitRequest() {
638+
return lastOpenPitRequest.get();
639+
}
640+
615641
public void addBeforeCloseListener(Runnable listener) {
616642
this.beforeCloseListeners.add(listener);
617643
}
@@ -627,7 +653,8 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
627653
Request request,
628654
ActionListener<Response> listener
629655
) {
630-
if (request instanceof OpenPointInTimeRequest) {
656+
if (request instanceof OpenPointInTimeRequest openPitRequest) {
657+
lastOpenPitRequest.set(openPitRequest);
631658
if (pitSupported) {
632659
pitContextCounter.incrementAndGet();
633660
OpenPointInTimeResponse response = new OpenPointInTimeResponse(new BytesArray("the_pit_id"), 1, 1, 0, 0);

0 commit comments

Comments
 (0)