Skip to content

Commit 6f5d032

Browse files
Use new source loader when lower docId is accessed (elastic#128320) (elastic#128410)
When using synthetic source, runtime fields data may come from doc values. Doc values iterators can only be read once, and in increasing docId order. But if a runtime field is referenced multiple times in a query, currently the same doc value iterator will be used. This causes an error, as the second field reference will attempt to read the same iterator from a lower docId than was previously used. The fix is to create a new source loader, and thus a new doc value iterator, if the requested docId is lower than the last seen docId. (cherry picked from commit 51e87cb) # Conflicts: # server/src/main/java/org/elasticsearch/search/lookup/ConcurrentSegmentSourceProvider.java
1 parent 0de462a commit 6f5d032

File tree

3 files changed

+127
-0
lines changed

3 files changed

+127
-0
lines changed

docs/changelog/128320.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 128320
2+
summary: Use new source loader when lower `docId` is accessed
3+
area: Codec
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/search/lookup/SyntheticSourceProvider.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,22 @@ public Source getSource(LeafReaderContext ctx, int doc) throws IOException {
3838
provider = new SyntheticSourceLeafLoader(ctx);
3939
var existing = leaves.put(id, provider);
4040
assert existing == null : "unexpected source provider [" + existing + "]";
41+
} else if (doc < provider.lastSeenDocId) {
42+
// When queries reference the same runtime field in multiple clauses, each clause re-reads the values from the source in
43+
// increasing docId order. So the last docId accessed by the first clause is higher than the first docId read by the second
44+
// clause. This is okay for stored source, as stored fields do not restrict the order that docIds that can be accessed.
45+
// But with synthetic source, field values may come from doc values, which require than docIds only be read in increasing order.
46+
// To handle this, we detect lower docIds and create a new doc value reader for each clause.
47+
provider = new SyntheticSourceLeafLoader(ctx);
48+
leaves.put(id, provider);
4149
}
4250
return provider.getSource(doc);
4351
}
4452

4553
private class SyntheticSourceLeafLoader {
4654
private final LeafStoredFieldLoader leafLoader;
4755
private final SourceLoader.Leaf leaf;
56+
int lastSeenDocId = -1;
4857

4958
SyntheticSourceLeafLoader(LeafReaderContext ctx) throws IOException {
5059
this.leafLoader = (sourceLoader.requiredStoredFields().isEmpty())
@@ -54,6 +63,7 @@ private class SyntheticSourceLeafLoader {
5463
}
5564

5665
Source getSource(int doc) throws IOException {
66+
this.lastSeenDocId = doc;
5767
leafLoader.advanceTo(doc);
5868
return leaf.source(leafLoader, doc);
5969
}

x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.common.settings.Settings;
1414
import org.elasticsearch.common.time.DateFormatter;
1515
import org.elasticsearch.common.time.FormatNames;
16+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
1617
import org.elasticsearch.index.IndexSettings;
1718
import org.elasticsearch.test.cluster.ElasticsearchCluster;
1819
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
@@ -341,4 +342,115 @@ public void testLogsdbDefaultWithRecoveryUseSyntheticSource() throws IOException
341342
assertNull(settings.get("index.mapping.source.mode"));
342343
assertEquals("true", settings.get(IndexSettings.LOGSDB_SORT_ON_HOST_NAME.getKey()));
343344
}
345+
346+
public void testSyntheticSourceRuntimeFieldQueries() throws IOException {
347+
String mappings = """
348+
{
349+
"runtime": {
350+
"message_length": {
351+
"type": "long"
352+
}
353+
},
354+
"dynamic": false,
355+
"properties": {
356+
"@timestamp": {
357+
"type": "date"
358+
},
359+
"log" : {
360+
"properties": {
361+
"level": {
362+
"type": "keyword"
363+
}
364+
}
365+
}
366+
}
367+
}
368+
""";
369+
String indexName = "test-foo";
370+
createIndex(indexName, Settings.builder().put("index.mode", "logsdb").build(), mappings);
371+
372+
int numDocs = 1000;
373+
var sb = new StringBuilder();
374+
var now = Instant.now();
375+
for (int i = 0; i < numDocs; i++) {
376+
String level = randomBoolean() ? "info" : randomBoolean() ? "warning" : randomBoolean() ? "error" : "fatal";
377+
String msg = randomAlphaOfLength(20);
378+
String messageLength = Integer.toString(msg.length());
379+
sb.append("{ \"create\": {} }").append('\n');
380+
if (randomBoolean()) {
381+
sb.append("""
382+
{"@timestamp":"$now","message":"$msg","message_length":$l,"log":{"level":"$level"}}
383+
""".replace("$now", formatInstant(now)).replace("$level", level).replace("$msg", msg).replace("$l", messageLength));
384+
} else {
385+
sb.append("""
386+
{"@timestamp": "$now", "message": "$msg", "message_length": $l}
387+
""".replace("$now", formatInstant(now)).replace("$msg", msg).replace("$l", messageLength));
388+
}
389+
sb.append('\n');
390+
if (i != numDocs - 1) {
391+
now = now.plusSeconds(1);
392+
}
393+
}
394+
395+
var bulkRequest = new Request("POST", "/" + indexName + "/_bulk");
396+
bulkRequest.setJsonEntity(sb.toString());
397+
bulkRequest.addParameter("refresh", "true");
398+
var bulkResponse = client().performRequest(bulkRequest);
399+
var bulkResponseBody = responseAsMap(bulkResponse);
400+
assertThat(bulkResponseBody, Matchers.hasEntry("errors", false));
401+
402+
var forceMergeRequest = new Request("POST", "/" + indexName + "/_forcemerge");
403+
var forceMergeResponse = client().performRequest(forceMergeRequest);
404+
assertOK(forceMergeResponse);
405+
406+
var searchRequest = new Request("POST", "/" + indexName + "/_search");
407+
408+
searchRequest.setJsonEntity("""
409+
{
410+
"size": 1,
411+
"query": {
412+
"bool": {
413+
"should": [
414+
{
415+
"range": {
416+
"message_length": {
417+
"gte": 1,
418+
"lt": 900000
419+
}
420+
}
421+
},
422+
{
423+
"range": {
424+
"message_length": {
425+
"gte": 900000,
426+
"lt": 1000000
427+
}
428+
}
429+
}
430+
],
431+
"minimum_should_match": "1",
432+
"must_not": [
433+
{
434+
"range": {
435+
"message_length": {
436+
"lt": 0
437+
}
438+
}
439+
}
440+
]
441+
}
442+
}
443+
}
444+
""");
445+
var searchResponse = client().performRequest(searchRequest);
446+
assertOK(searchResponse);
447+
var searchResponseBody = responseAsMap(searchResponse);
448+
int totalHits = (int) XContentMapValues.extractValue("hits.total.value", searchResponseBody);
449+
assertThat(totalHits, equalTo(numDocs));
450+
451+
var shardsHeader = (Map<?, ?>) searchResponseBody.get("_shards");
452+
assertThat(shardsHeader.get("failed"), equalTo(0));
453+
assertThat(shardsHeader.get("successful"), equalTo(1));
454+
assertThat(shardsHeader.get("skipped"), equalTo(0));
455+
}
344456
}

0 commit comments

Comments
 (0)