Skip to content

Commit 12e21f6

Browse files
[9.3] [Transform] Fix transform producing empty dest index when source query references runtime fields (elastic#142450) (elastic#142829)
* [Transform] Fix transform producing empty dest index when source query references runtime fields (elastic#142450) When a transform's source query references a runtime field (e.g., a range filter on a runtime-mapped field), the destination index is produced empty even though _transform/_preview returns correct results. The root cause is that ClientTransformIndexer.injectPointInTimeIfNeeded() passes the source query as the PIT indexFilter, but OpenPointInTimeRequest does not support runtime_mappings. During the can_match phase, the query on the unknown runtime field is rewritten to match_none, all shards are filtered out, and the PIT opens with zero search contexts — causing every subsequent search to return empty results. Preview is unaffected because it uses a normal SearchSourceBuilder with both the query and runtime_mappings and does not use PIT. This change skips the PIT index filter optimization when the source config has non-empty runtime_mappings, since the filter cannot resolve runtime fields. The PIT is still opened for snapshot consistency, just without shard pre-filtering. Additionally, two other code paths that use the source query without propagating runtime_mappings are fixed: the initial progress search in TransformIndexer and the sourceHasChanged check in TimeBasedCheckpointProvider. All three fixes follow the same principle — wherever the source query is used, runtime_mappings from the source config must be propagated alongside it. Fixes elastic#113156 (cherry picked from commit cf28432) # Conflicts: # x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java * Fix compilation errors in transform tests Update SourceConfig constructor calls to match new signature (removed 4th parameter) and remove CrossProjectModeDecider from TransformServices constructor. Co-authored-by: Cursor <cursoragent@cursor.com> --------- Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 1ff2427 commit 12e21f6

File tree

7 files changed

+217
-78
lines changed

7 files changed

+217
-78
lines changed

docs/changelog/142450.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
area: Transform
2+
issues:
3+
- 113156
4+
pr: 142450
5+
summary: Fix transform producing empty dest index when source query references runtime
6+
fields
7+
type: bug

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProvider.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public void sourceHasChanged(TransformCheckpoint lastCheckpoint, ActionListener<
7171
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().size(0)
7272
// we only want to know if there is at least 1 new document
7373
.trackTotalHitsUpTo(1)
74+
.runtimeMappings(transformConfig.getSource().getRuntimeMappings())
7475
.query(queryBuilder);
7576
SearchRequest searchRequest = new SearchRequest(transformConfig.getSource().getIndex()).allowPartialSearchResults(false)
7677
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -550,8 +550,11 @@ private void injectPointInTimeIfNeeded(
550550

551551
// no pit, create a new one
552552
OpenPointInTimeRequest pitRequest = new OpenPointInTimeRequest(searchRequest.indices()).keepAlive(PIT_KEEP_ALIVE);
553-
// use index filter for better performance
554-
pitRequest.indexFilter(transformConfig.getSource().getQueryConfig().getQuery());
553+
// Only use index filter when there are no runtime mappings, because OpenPointInTimeRequest
554+
// does not support runtime_mappings and the query may reference runtime fields
555+
if (transformConfig.getSource().getRuntimeMappings().isEmpty()) {
556+
pitRequest.indexFilter(transformConfig.getSource().getQueryConfig().getQuery());
557+
}
555558

556559
ClientHelper.executeWithHeadersAsync(
557560
transformConfig.getHeaders(),

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,9 @@ protected void onStart(long now, ActionListener<Boolean> listener) {
323323

324324
// get progress information
325325
SearchRequest request = new SearchRequest(transformConfig.getSource().getIndex());
326-
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
326+
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().runtimeMappings(
327+
transformConfig.getSource().getRuntimeMappings()
328+
);
327329

328330
function.buildSearchQueryForInitialProgress(searchSourceBuilder);
329331
searchSourceBuilder.query(QueryBuilders.boolQuery().filter(buildFilterQuery()).filter(searchSourceBuilder.query()));

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
import org.elasticsearch.threadpool.ThreadPool;
3232
import org.elasticsearch.xpack.core.transform.TransformConfigVersion;
3333
import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction;
34+
import org.elasticsearch.xpack.core.transform.transforms.QueryConfig;
3435
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
36+
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
3537
import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig;
3638
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
3739
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
@@ -50,6 +52,7 @@
5052

5153
import java.time.Clock;
5254
import java.util.Collections;
55+
import java.util.Map;
5356
import java.util.concurrent.CountDownLatch;
5457
import java.util.concurrent.TimeUnit;
5558
import java.util.function.Supplier;
@@ -283,6 +286,41 @@ private void testCreateNextCheckpoint(
283286
assertThat(exceptionHolder.get(), is(nullValue()));
284287
}
285288

289+
public void testSourceHasChangedIncludesRuntimeMappings() throws InterruptedException {
290+
// Arrange: create a config with explicit runtime_mappings
291+
Map<String, Object> runtimeMappings = Map.of(
292+
"total_price_with_tax",
293+
Map.of("type", "double", "script", Map.of("source", "emit(1.0)"))
294+
);
295+
SourceConfig sourceWithRuntimeMappings = new SourceConfig(new String[] { "source_index" }, QueryConfig.matchAll(), runtimeMappings);
296+
TransformConfig transformConfig = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig()).setSource(
297+
sourceWithRuntimeMappings
298+
).setSyncConfig(new TimeSyncConfig(TIMESTAMP_FIELD, TimeValue.ZERO)).build();
299+
300+
final SearchResponse searchResponse = newSearchResponse(0);
301+
try {
302+
doAnswer(withResponse(searchResponse)).when(client).execute(eq(TransportSearchAction.TYPE), any(), any());
303+
TimeBasedCheckpointProvider provider = newCheckpointProvider(transformConfig);
304+
305+
// Act: call sourceHasChanged
306+
CountDownLatch latch = new CountDownLatch(1);
307+
provider.sourceHasChanged(TransformCheckpoint.EMPTY, new LatchedActionListener<>(ActionListener.wrap(r -> {}, e -> {}), latch));
308+
assertThat(latch.await(100, TimeUnit.MILLISECONDS), is(true));
309+
310+
// Assert: the search request should include runtime_mappings
311+
ArgumentCaptor<SearchRequest> searchRequestCaptor = ArgumentCaptor.forClass(SearchRequest.class);
312+
verify(client).execute(eq(TransportSearchAction.TYPE), searchRequestCaptor.capture(), any());
313+
SearchRequest capturedRequest = searchRequestCaptor.getValue();
314+
assertThat(
315+
"sourceHasChanged search should include runtime_mappings from the source config",
316+
capturedRequest.source().runtimeMappings(),
317+
is(equalTo(runtimeMappings))
318+
);
319+
} finally {
320+
searchResponse.decRef();
321+
}
322+
}
323+
286324
private TimeBasedCheckpointProvider newCheckpointProvider(TransformConfig transformConfig) {
287325
return new TimeBasedCheckpointProvider(
288326
clock,

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java

Lines changed: 103 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.elasticsearch.transport.ActionNotFoundTransportException;
5252
import org.elasticsearch.xpack.core.indexing.IndexerState;
5353
import org.elasticsearch.xpack.core.transform.TransformMetadata;
54+
import org.elasticsearch.xpack.core.transform.transforms.QueryConfig;
5455
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
5556
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
5657
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
@@ -77,6 +78,7 @@
7778
import java.util.ArrayList;
7879
import java.util.Collections;
7980
import java.util.List;
81+
import java.util.Map;
8082
import java.util.concurrent.CountDownLatch;
8183
import java.util.concurrent.ExecutorService;
8284
import java.util.concurrent.TimeUnit;
@@ -151,43 +153,7 @@ public void testPitInjection() throws InterruptedException {
151153

152154
try (var threadPool = createThreadPool()) {
153155
final var client = new PitMockClient(threadPool, true);
154-
MockClientTransformIndexer indexer = new MockClientTransformIndexer(
155-
mock(ThreadPool.class),
156-
mock(ClusterService.class),
157-
mock(IndexNameExpressionResolver.class),
158-
mock(TransformExtension.class),
159-
new TransformServices(
160-
mock(IndexBasedTransformConfigManager.class),
161-
mock(TransformCheckpointService.class),
162-
mock(TransformAuditor.class),
163-
new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO),
164-
mock(TransformNode.class)
165-
),
166-
mock(CheckpointProvider.class),
167-
new AtomicReference<>(IndexerState.STOPPED),
168-
null,
169-
new ParentTaskAssigningClient(client, new TaskId("dummy-node:123456")),
170-
mock(TransformIndexerStats.class),
171-
config,
172-
null,
173-
new TransformCheckpoint(
174-
"transform",
175-
Instant.now().toEpochMilli(),
176-
0L,
177-
Collections.emptyMap(),
178-
Instant.now().toEpochMilli()
179-
),
180-
new TransformCheckpoint(
181-
"transform",
182-
Instant.now().toEpochMilli(),
183-
2L,
184-
Collections.emptyMap(),
185-
Instant.now().toEpochMilli()
186-
),
187-
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
188-
mock(TransformContext.class),
189-
false
190-
);
156+
MockClientTransformIndexer indexer = createMockIndexerForPitTest(client, config);
191157

192158
this.<SearchResponse>assertAsync(listener -> indexer.doNextSearch(0, listener), response -> {
193159
assertThat(response.pointInTimeId(), equalBytes(new BytesArray("the_pit_id+")));
@@ -259,43 +225,7 @@ public void testPitInjectionIfPitNotSupported() throws InterruptedException {
259225

260226
try (var threadPool = createThreadPool()) {
261227
final var client = new PitMockClient(threadPool, false);
262-
MockClientTransformIndexer indexer = new MockClientTransformIndexer(
263-
mock(ThreadPool.class),
264-
mock(ClusterService.class),
265-
mock(IndexNameExpressionResolver.class),
266-
mock(TransformExtension.class),
267-
new TransformServices(
268-
mock(IndexBasedTransformConfigManager.class),
269-
mock(TransformCheckpointService.class),
270-
mock(TransformAuditor.class),
271-
new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO),
272-
mock(TransformNode.class)
273-
),
274-
mock(CheckpointProvider.class),
275-
new AtomicReference<>(IndexerState.STOPPED),
276-
null,
277-
new ParentTaskAssigningClient(client, new TaskId("dummy-node:123456")),
278-
mock(TransformIndexerStats.class),
279-
config,
280-
null,
281-
new TransformCheckpoint(
282-
"transform",
283-
Instant.now().toEpochMilli(),
284-
0L,
285-
Collections.emptyMap(),
286-
Instant.now().toEpochMilli()
287-
),
288-
new TransformCheckpoint(
289-
"transform",
290-
Instant.now().toEpochMilli(),
291-
2L,
292-
Collections.emptyMap(),
293-
Instant.now().toEpochMilli()
294-
),
295-
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
296-
mock(TransformContext.class),
297-
false
298-
);
228+
MockClientTransformIndexer indexer = createMockIndexerForPitTest(client, config);
299229

300230
this.<SearchResponse>assertAsync(
301231
listener -> indexer.doNextSearch(0, listener),
@@ -464,6 +394,64 @@ public void testDisablePitWhenThereIsRemoteIndexInSource() throws InterruptedExc
464394
}
465395
}
466396

397+
public void testPitIndexFilterOmittedWhenRuntimeMappingsPresent() throws InterruptedException {
398+
// Arrange: create a config with runtime_mappings and PIT enabled
399+
Map<String, Object> runtimeMappings = Map.of(
400+
"total_price_with_tax",
401+
Map.of("type", "double", "script", Map.of("source", "emit(1.0)"))
402+
);
403+
TransformConfig config = newPitEnabledConfigWithSource(
404+
new SourceConfig(new String[] { "source_index" }, QueryConfig.matchAll(), runtimeMappings)
405+
);
406+
407+
try (var threadPool = createThreadPool()) {
408+
final var client = new PitMockClient(threadPool, true);
409+
MockClientTransformIndexer indexer = createMockIndexerForPitTest(client, config);
410+
411+
// Act: trigger a search which opens a PIT
412+
this.<SearchResponse>assertAsync(
413+
listener -> indexer.doNextSearch(0, listener),
414+
response -> { assertNotNull(response.pointInTimeId()); }
415+
);
416+
417+
// Assert: the PIT request should NOT have an indexFilter when runtime_mappings are present,
418+
// because OpenPointInTimeRequest does not support runtime_mappings and the query may reference runtime fields
419+
OpenPointInTimeRequest capturedPitRequest = client.getLastOpenPitRequest();
420+
assertNotNull("Expected a PIT open request to have been captured", capturedPitRequest);
421+
assertNull(
422+
"PIT indexFilter should be null when runtime_mappings are present, "
423+
+ "because OpenPointInTimeRequest does not support runtime_mappings",
424+
capturedPitRequest.indexFilter()
425+
);
426+
}
427+
}
428+
429+
public void testPitIndexFilterSetWhenNoRuntimeMappings() throws InterruptedException {
430+
// Arrange: create a config WITHOUT runtime_mappings and PIT enabled
431+
TransformConfig config = newPitEnabledConfigWithSource(
432+
new SourceConfig(new String[] { "source_index" }, QueryConfig.matchAll(), Collections.emptyMap())
433+
);
434+
435+
try (var threadPool = createThreadPool()) {
436+
final var client = new PitMockClient(threadPool, true);
437+
MockClientTransformIndexer indexer = createMockIndexerForPitTest(client, config);
438+
439+
// Act: trigger a search which opens a PIT
440+
this.<SearchResponse>assertAsync(
441+
listener -> indexer.doNextSearch(0, listener),
442+
response -> { assertNotNull(response.pointInTimeId()); }
443+
);
444+
445+
// Assert: the PIT request SHOULD have an indexFilter when no runtime_mappings are present
446+
OpenPointInTimeRequest capturedPitRequest = client.getLastOpenPitRequest();
447+
assertNotNull("Expected a PIT open request to have been captured", capturedPitRequest);
448+
assertNotNull(
449+
"PIT indexFilter should be set when no runtime_mappings are present for performance optimization",
450+
capturedPitRequest.indexFilter()
451+
);
452+
}
453+
}
454+
467455
public void testHandlePitIndexNotFound() throws InterruptedException {
468456
// simulate a deleted index due to ILM
469457
try (var threadPool = createThreadPool()) {
@@ -549,6 +537,40 @@ private IndexNameExpressionResolver resolver() {
549537
return resolver;
550538
}
551539

540+
private static TransformConfig newPitEnabledConfigWithSource(SourceConfig sourceConfig) {
541+
return new TransformConfig.Builder(TransformConfigTests.randomTransformConfig()).setSource(sourceConfig)
542+
.setSettings(new SettingsConfig.Builder().setUsePit(true).build())
543+
.build();
544+
}
545+
546+
private MockClientTransformIndexer createMockIndexerForPitTest(PitMockClient client, TransformConfig config) {
547+
return new MockClientTransformIndexer(
548+
mock(ThreadPool.class),
549+
mock(ClusterService.class),
550+
mock(IndexNameExpressionResolver.class),
551+
mock(TransformExtension.class),
552+
new TransformServices(
553+
mock(IndexBasedTransformConfigManager.class),
554+
mock(TransformCheckpointService.class),
555+
mock(TransformAuditor.class),
556+
new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO),
557+
mock(TransformNode.class)
558+
),
559+
mock(CheckpointProvider.class),
560+
new AtomicReference<>(IndexerState.STOPPED),
561+
null,
562+
new ParentTaskAssigningClient(client, new TaskId("dummy-node:123456")),
563+
mock(TransformIndexerStats.class),
564+
config,
565+
null,
566+
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 0L, Collections.emptyMap(), Instant.now().toEpochMilli()),
567+
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 2L, Collections.emptyMap(), Instant.now().toEpochMilli()),
568+
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
569+
mock(TransformContext.class),
570+
false
571+
);
572+
}
573+
552574
private static class MockClientTransformIndexer extends ClientTransformIndexer {
553575

554576
MockClientTransformIndexer(
@@ -601,12 +623,17 @@ private static class PitMockClient extends NoOpClient {
601623
private final boolean pitSupported;
602624
private AtomicLong pitContextCounter = new AtomicLong();
603625
private List<Runnable> beforeCloseListeners = new ArrayList<>();
626+
private final AtomicReference<OpenPointInTimeRequest> lastOpenPitRequest = new AtomicReference<>();
604627

605628
PitMockClient(ThreadPool threadPool, boolean pitSupported) {
606629
super(threadPool);
607630
this.pitSupported = pitSupported;
608631
}
609632

633+
public OpenPointInTimeRequest getLastOpenPitRequest() {
634+
return lastOpenPitRequest.get();
635+
}
636+
610637
public void addBeforeCloseListener(Runnable listener) {
611638
this.beforeCloseListeners.add(listener);
612639
}
@@ -622,7 +649,8 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
622649
Request request,
623650
ActionListener<Response> listener
624651
) {
625-
if (request instanceof OpenPointInTimeRequest) {
652+
if (request instanceof OpenPointInTimeRequest openPitRequest) {
653+
lastOpenPitRequest.set(openPitRequest);
626654
if (pitSupported) {
627655
pitContextCounter.incrementAndGet();
628656
OpenPointInTimeResponse response = new OpenPointInTimeResponse(new BytesArray("the_pit_id"), 1, 1, 0, 0);

0 commit comments

Comments
 (0)