Skip to content

Commit b8afe64

Browse files
authored
[8.x] Address mapping and compute engine runtime field issues (#117792) (#118049)
* Address mapping and compute engine runtime field issues (#117792) This change addresses the following issues: Fields mapped as runtime fields not getting stored if source mode is synthetic. Address java.io.EOFException when an es|ql query uses multiple runtime fields that fallback to source when source mode is synthetic. (1) Address concurrency issue when runtime fields get pushed down to Lucene. (2) 1: ValueSourceOperator can read values in row striding or columnar fashion. When values are read in columnar fashion and multiple runtime fields synthetize source then this can cause the same SourceProvider evaluation the same range of docs ids multiple times. This can then result in unexpected io errors at the codec level. This is because the same doc value instances are used by SourceProvider. Re-evaluating the same docids is in violation of the contract of the DocIdSetIterator#advance(...) / DocIdSetIterator#advanceExact(...) methods, which documents that unexpected behaviour can occur if target docid is lower than current docid position. Note that this is only an issue for synthetic source loader and not for stored source loader. And not when executing in row stride fashion which sometimes happen in compute engine and always happen in _search api. 2: The concurrency issue that arrises with source provider if source operator executes in parallel with data portioning set to DOC. The same SourceProvider instance then gets access by multiple threads concurrently. SourceProviders implementations are not designed to handle concurrent access. Closes #117644
1 parent aecb48c commit b8afe64

File tree

9 files changed

+250
-10
lines changed

9 files changed

+250
-10
lines changed

docs/changelog/117792.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 117792
2+
summary: Address mapping and compute engine runtime field issues
3+
area: Mapping
4+
type: bug
5+
issues:
6+
- 117644

server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -946,7 +946,9 @@ public Query termQuery(Object value, SearchExecutionContext context) {
946946
protected void parseCreateField(DocumentParserContext context) {
947947
// Run-time fields are mapped to this mapper, so it needs to handle storing values for use in synthetic source.
948948
// #parseValue calls this method once the run-time field is created.
949-
if (context.dynamic() == ObjectMapper.Dynamic.RUNTIME && context.canAddIgnoredField()) {
949+
var fieldType = context.mappingLookup().getFieldType(path);
950+
boolean isRuntimeField = fieldType instanceof AbstractScriptFieldType;
951+
if ((context.dynamic() == ObjectMapper.Dynamic.RUNTIME || isRuntimeField) && context.canAddIgnoredField()) {
950952
try {
951953
context.addIgnoredField(
952954
IgnoredSourceFieldMapper.NameValue.fromContext(context, path, context.encodeFlattenedToken())

server/src/main/java/org/elasticsearch/index/query/SearchExecutionContext.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -493,14 +493,18 @@ public boolean containsBrokenAnalysis(String field) {
493493
*/
494494
public SearchLookup lookup() {
495495
if (this.lookup == null) {
496-
SourceProvider sourceProvider = isSourceSynthetic()
497-
? SourceProvider.fromSyntheticSource(mappingLookup.getMapping(), mapperMetrics.sourceFieldMetrics())
498-
: SourceProvider.fromStoredFields();
496+
var sourceProvider = createSourceProvider();
499497
setLookupProviders(sourceProvider, LeafFieldLookupProvider.fromStoredFields());
500498
}
501499
return this.lookup;
502500
}
503501

502+
public SourceProvider createSourceProvider() {
503+
return isSourceSynthetic()
504+
? SourceProvider.fromSyntheticSource(mappingLookup.getMapping(), mapperMetrics.sourceFieldMetrics())
505+
: SourceProvider.fromStoredFields();
506+
}
507+
504508
/**
505509
* Replace the standard source provider and field lookup provider on the SearchLookup
506510
*

server/src/main/java/org/elasticsearch/search/lookup/SearchLookup.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,14 @@ private SearchLookup(SearchLookup searchLookup, Set<String> fieldChain) {
102102
this.fieldLookupProvider = searchLookup.fieldLookupProvider;
103103
}
104104

105+
private SearchLookup(SearchLookup searchLookup, SourceProvider sourceProvider, Set<String> fieldChain) {
106+
this.fieldChain = Collections.unmodifiableSet(fieldChain);
107+
this.sourceProvider = sourceProvider;
108+
this.fieldTypeLookup = searchLookup.fieldTypeLookup;
109+
this.fieldDataLookup = searchLookup.fieldDataLookup;
110+
this.fieldLookupProvider = searchLookup.fieldLookupProvider;
111+
}
112+
105113
/**
106114
* Creates a copy of the current {@link SearchLookup} that looks fields up in the same way, but also tracks field references
107115
* in order to detect cycles and prevent resolving fields that depend on more than {@link #MAX_FIELD_CHAIN_DEPTH} other fields.
@@ -144,4 +152,8 @@ public IndexFieldData<?> getForField(MappedFieldType fieldType, MappedFieldType.
144152
public Source getSource(LeafReaderContext ctx, int doc) throws IOException {
145153
return sourceProvider.getSource(ctx, doc);
146154
}
155+
156+
public SearchLookup swapSourceProvider(SourceProvider sourceProvider) {
157+
return new SearchLookup(this, sourceProvider, fieldChain);
158+
}
147159
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.client.internal.ClusterAdminClient;
1919
import org.elasticsearch.cluster.metadata.IndexMetadata;
2020
import org.elasticsearch.cluster.node.DiscoveryNode;
21+
import org.elasticsearch.common.collect.Iterators;
2122
import org.elasticsearch.common.settings.Setting;
2223
import org.elasticsearch.common.settings.Settings;
2324
import org.elasticsearch.index.Index;
@@ -1648,6 +1649,44 @@ public void testMaxTruncationSizeSetting() {
16481649
}
16491650
}
16501651

1652+
public void testScriptField() throws Exception {
1653+
XContentBuilder mapping = JsonXContent.contentBuilder();
1654+
mapping.startObject();
1655+
{
1656+
mapping.startObject("runtime");
1657+
{
1658+
mapping.startObject("k1");
1659+
mapping.field("type", "long");
1660+
mapping.endObject();
1661+
mapping.startObject("k2");
1662+
mapping.field("type", "long");
1663+
mapping.endObject();
1664+
}
1665+
mapping.endObject();
1666+
{
1667+
mapping.startObject("properties");
1668+
mapping.startObject("meter").field("type", "double").endObject();
1669+
mapping.endObject();
1670+
}
1671+
}
1672+
mapping.endObject();
1673+
String sourceMode = randomBoolean() ? "stored" : "synthetic";
1674+
Settings.Builder settings = indexSettings(1, 0).put(indexSettings()).put("index.mapping.source.mode", sourceMode);
1675+
client().admin().indices().prepareCreate("test-script").setMapping(mapping).setSettings(settings).get();
1676+
for (int i = 0; i < 10; i++) {
1677+
index("test-script", Integer.toString(i), Map.of("k1", i, "k2", "b-" + i, "meter", 10000 * i));
1678+
}
1679+
refresh("test-script");
1680+
try (EsqlQueryResponse resp = run("FROM test-script | SORT k1 | LIMIT 10")) {
1681+
List<Object> k1Column = Iterators.toList(resp.column(0));
1682+
assertThat(k1Column, contains(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L));
1683+
List<Object> k2Column = Iterators.toList(resp.column(1));
1684+
assertThat(k2Column, contains(null, null, null, null, null, null, null, null, null, null));
1685+
List<Object> meterColumn = Iterators.toList(resp.column(2));
1686+
assertThat(meterColumn, contains(0.0, 10000.0, 20000.0, 30000.0, 40000.0, 50000.0, 60000.0, 70000.0, 80000.0, 90000.0));
1687+
}
1688+
}
1689+
16511690
private void clearPersistentSettings(Setting<?>... settings) {
16521691
Settings.Builder clearedSettings = Settings.builder();
16531692

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.index.mapper.FieldNamesFieldMapper;
4040
import org.elasticsearch.index.mapper.MappedFieldType;
4141
import org.elasticsearch.index.mapper.NestedLookup;
42+
import org.elasticsearch.index.mapper.SourceFieldMapper;
4243
import org.elasticsearch.index.mapper.SourceLoader;
4344
import org.elasticsearch.index.query.QueryBuilder;
4445
import org.elasticsearch.index.query.QueryBuilders;
@@ -348,7 +349,16 @@ public MappedFieldType.FieldExtractPreference fieldExtractPreference() {
348349

349350
@Override
350351
public SearchLookup lookup() {
351-
return ctx.lookup();
352+
boolean syntheticSource = SourceFieldMapper.isSynthetic(indexSettings());
353+
var searchLookup = ctx.lookup();
354+
if (syntheticSource) {
355+
// in the context of scripts and when synthetic source is used the search lookup can't always be reused between
356+
// users of SearchLookup. This is only an issue when scripts fallback to _source, but since we can't always
357+
// accurately determine whether a script uses _source, we should do this for all script usages.
358+
// This lookup() method is only invoked for scripts / runtime fields, so it is ok to do here.
359+
searchLookup = searchLookup.swapSourceProvider(ctx.createSourceProvider());
360+
}
361+
return searchLookup;
352362
}
353363

354364
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.elasticsearch.search.internal.AliasFilter;
4646
import org.elasticsearch.search.internal.SearchContext;
4747
import org.elasticsearch.search.internal.ShardSearchRequest;
48+
import org.elasticsearch.search.lookup.SourceProvider;
4849
import org.elasticsearch.tasks.CancellableTask;
4950
import org.elasticsearch.tasks.Task;
5051
import org.elasticsearch.tasks.TaskCancelledException;
@@ -82,6 +83,7 @@
8283
import java.util.concurrent.Executor;
8384
import java.util.concurrent.atomic.AtomicBoolean;
8485
import java.util.concurrent.atomic.AtomicLong;
86+
import java.util.function.Supplier;
8587

8688
import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME;
8789

@@ -428,12 +430,17 @@ void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan,
428430
List<EsPhysicalOperationProviders.ShardContext> contexts = new ArrayList<>(context.searchContexts.size());
429431
for (int i = 0; i < context.searchContexts.size(); i++) {
430432
SearchContext searchContext = context.searchContexts.get(i);
433+
var searchExecutionContext = new SearchExecutionContext(searchContext.getSearchExecutionContext()) {
434+
435+
@Override
436+
public SourceProvider createSourceProvider() {
437+
final Supplier<SourceProvider> supplier = () -> super.createSourceProvider();
438+
return new ReinitializingSourceProvider(supplier);
439+
440+
}
441+
};
431442
contexts.add(
432-
new EsPhysicalOperationProviders.DefaultShardContext(
433-
i,
434-
searchContext.getSearchExecutionContext(),
435-
searchContext.request().getAliasFilter()
436-
)
443+
new EsPhysicalOperationProviders.DefaultShardContext(i, searchExecutionContext, searchContext.request().getAliasFilter())
437444
);
438445
}
439446
final List<Driver> drivers;
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.plugin;
9+
10+
import org.apache.lucene.index.LeafReaderContext;
11+
import org.elasticsearch.search.lookup.Source;
12+
import org.elasticsearch.search.lookup.SourceProvider;
13+
14+
import java.io.IOException;
15+
import java.util.function.Supplier;
16+
17+
/**
18+
* This is a workaround for when compute engine executes concurrently with data partitioning by docid.
19+
*/
20+
final class ReinitializingSourceProvider implements SourceProvider {
21+
22+
private PerThreadSourceProvider perThreadProvider;
23+
private final Supplier<SourceProvider> sourceProviderFactory;
24+
25+
ReinitializingSourceProvider(Supplier<SourceProvider> sourceProviderFactory) {
26+
this.sourceProviderFactory = sourceProviderFactory;
27+
}
28+
29+
@Override
30+
public Source getSource(LeafReaderContext ctx, int doc) throws IOException {
31+
var currentThread = Thread.currentThread();
32+
PerThreadSourceProvider provider = perThreadProvider;
33+
if (provider == null || provider.creatingThread != currentThread) {
34+
provider = new PerThreadSourceProvider(sourceProviderFactory.get(), currentThread);
35+
this.perThreadProvider = provider;
36+
}
37+
return perThreadProvider.source.getSource(ctx, doc);
38+
}
39+
40+
private record PerThreadSourceProvider(SourceProvider source, Thread creatingThread) {
41+
42+
}
43+
}

x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,16 @@
1010
import org.elasticsearch.client.Request;
1111
import org.elasticsearch.cluster.metadata.DataStream;
1212
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.common.time.DateFormatter;
14+
import org.elasticsearch.common.time.FormatNames;
1315
import org.elasticsearch.test.cluster.ElasticsearchCluster;
1416
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
1517
import org.elasticsearch.test.rest.ESRestTestCase;
1618
import org.hamcrest.Matchers;
1719
import org.junit.ClassRule;
1820

1921
import java.io.IOException;
22+
import java.time.Instant;
2023
import java.util.List;
2124
import java.util.Map;
2225

@@ -108,4 +111,118 @@ public void testLogsdbSourceModeForLogsIndex() throws IOException {
108111
assertNull(settings.get("index.mapping.source.mode"));
109112
}
110113

114+
public void testEsqlRuntimeFields() throws IOException {
115+
String mappings = """
116+
{
117+
"runtime": {
118+
"message_length": {
119+
"type": "long"
120+
},
121+
"log.offset": {
122+
"type": "long"
123+
}
124+
},
125+
"dynamic": false,
126+
"properties": {
127+
"@timestamp": {
128+
"type": "date"
129+
},
130+
"log" : {
131+
"properties": {
132+
"level": {
133+
"type": "keyword"
134+
},
135+
"file": {
136+
"type": "keyword"
137+
}
138+
}
139+
}
140+
}
141+
}
142+
""";
143+
String indexName = "test-foo";
144+
createIndex(indexName, Settings.builder().put("index.mode", "logsdb").build(), mappings);
145+
146+
int numDocs = 500;
147+
var sb = new StringBuilder();
148+
var now = Instant.now();
149+
150+
var expectedMinTimestamp = now;
151+
for (int i = 0; i < numDocs; i++) {
152+
String level = randomBoolean() ? "info" : randomBoolean() ? "warning" : randomBoolean() ? "error" : "fatal";
153+
String msg = randomAlphaOfLength(20);
154+
String path = randomAlphaOfLength(8);
155+
String messageLength = Integer.toString(msg.length());
156+
String offset = Integer.toString(randomNonNegativeInt());
157+
sb.append("{ \"create\": {} }").append('\n');
158+
if (randomBoolean()) {
159+
sb.append(
160+
"""
161+
{"@timestamp":"$now","message":"$msg","message_length":$l,"file":{"level":"$level","offset":5,"file":"$path"}}
162+
""".replace("$now", formatInstant(now))
163+
.replace("$level", level)
164+
.replace("$msg", msg)
165+
.replace("$path", path)
166+
.replace("$l", messageLength)
167+
.replace("$o", offset)
168+
);
169+
} else {
170+
sb.append("""
171+
{"@timestamp": "$now", "message": "$msg", "message_length": $l}
172+
""".replace("$now", formatInstant(now)).replace("$msg", msg).replace("$l", messageLength));
173+
}
174+
sb.append('\n');
175+
if (i != numDocs - 1) {
176+
now = now.plusSeconds(1);
177+
}
178+
}
179+
var expectedMaxTimestamp = now;
180+
181+
var bulkRequest = new Request("POST", "/" + indexName + "/_bulk");
182+
bulkRequest.setJsonEntity(sb.toString());
183+
bulkRequest.addParameter("refresh", "true");
184+
var bulkResponse = client().performRequest(bulkRequest);
185+
var bulkResponseBody = responseAsMap(bulkResponse);
186+
assertThat(bulkResponseBody, Matchers.hasEntry("errors", false));
187+
188+
var forceMergeRequest = new Request("POST", "/" + indexName + "/_forcemerge");
189+
forceMergeRequest.addParameter("max_num_segments", "1");
190+
var forceMergeResponse = client().performRequest(forceMergeRequest);
191+
assertOK(forceMergeResponse);
192+
193+
String query = "FROM test-foo | STATS count(*), min(@timestamp), max(@timestamp), min(message_length), max(message_length)"
194+
+ " ,sum(message_length), avg(message_length), min(log.offset), max(log.offset) | LIMIT 1";
195+
final Request esqlRequest = new Request("POST", "/_query");
196+
esqlRequest.setJsonEntity("""
197+
{
198+
"query": "$query"
199+
}
200+
""".replace("$query", query));
201+
var esqlResponse = client().performRequest(esqlRequest);
202+
assertOK(esqlResponse);
203+
Map<String, Object> esqlResponseBody = responseAsMap(esqlResponse);
204+
205+
List<?> values = (List<?>) esqlResponseBody.get("values");
206+
assertThat(values, Matchers.not(Matchers.empty()));
207+
var count = ((List<?>) values.get(0)).get(0);
208+
assertThat(count, equalTo(numDocs));
209+
logger.warn("VALUES: {}", values);
210+
211+
var minTimestamp = ((List<?>) values.get(0)).get(1);
212+
assertThat(minTimestamp, equalTo(formatInstant(expectedMinTimestamp)));
213+
var maxTimestamp = ((List<?>) values.get(0)).get(2);
214+
assertThat(maxTimestamp, equalTo(formatInstant(expectedMaxTimestamp)));
215+
216+
var minLength = ((List<?>) values.get(0)).get(3);
217+
assertThat(minLength, equalTo(20));
218+
var maxLength = ((List<?>) values.get(0)).get(4);
219+
assertThat(maxLength, equalTo(20));
220+
var sumLength = ((List<?>) values.get(0)).get(5);
221+
assertThat(sumLength, equalTo(20 * numDocs));
222+
}
223+
224+
static String formatInstant(Instant instant) {
225+
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
226+
}
227+
111228
}

0 commit comments

Comments
 (0)