From 91793b574d4613efaf48693aefa60578ccb5fcbd Mon Sep 17 00:00:00 2001 From: Parker Timmins Date: Fri, 23 May 2025 16:32:46 -0500 Subject: [PATCH] Use new source loader when lower docId is accessed (#128320) When using synthetic source, runtime fields data may come from doc values. Doc values iterators can only be read once, and in increasing docId order. But if a runtime field is referenced multiple times in a query, currently the same doc value iterator will be used. This causes an error, as the second field reference will attempt to read the same iterator from a lower docId than was previously used. The fix is to create a new source loader, and thus a new doc value iterator, if the requested docId is lower than the last seen docId. --- docs/changelog/128320.yaml | 5 + .../lookup/SyntheticSourceProvider.java | 10 ++ .../xpack/logsdb/LogsdbRestIT.java | 112 ++++++++++++++++++ 3 files changed, 127 insertions(+) create mode 100644 docs/changelog/128320.yaml diff --git a/docs/changelog/128320.yaml b/docs/changelog/128320.yaml new file mode 100644 index 0000000000000..ecd575d1ce93f --- /dev/null +++ b/docs/changelog/128320.yaml @@ -0,0 +1,5 @@ +pr: 128320 +summary: Use new source loader when lower `docId` is accessed +area: Codec +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/search/lookup/SyntheticSourceProvider.java b/server/src/main/java/org/elasticsearch/search/lookup/SyntheticSourceProvider.java index 8078f4cb9cb8e..763c97635a75e 100644 --- a/server/src/main/java/org/elasticsearch/search/lookup/SyntheticSourceProvider.java +++ b/server/src/main/java/org/elasticsearch/search/lookup/SyntheticSourceProvider.java @@ -38,6 +38,14 @@ public Source getSource(LeafReaderContext ctx, int doc) throws IOException { provider = new SyntheticSourceLeafLoader(ctx); var existing = leaves.put(id, provider); assert existing == null : "unexpected source provider [" + existing + "]"; + } else if (doc < provider.lastSeenDocId) { + // When queries reference the same runtime field in multiple clauses, each clause re-reads the values from the source in + // increasing docId order. So the last docId accessed by the first clause is higher than the first docId read by the second + // clause. This is okay for stored source, as stored fields do not restrict the order that docIds that can be accessed. + // But with synthetic source, field values may come from doc values, which require than docIds only be read in increasing order. + // To handle this, we detect lower docIds and create a new doc value reader for each clause. + provider = new SyntheticSourceLeafLoader(ctx); + leaves.put(id, provider); } return provider.getSource(doc); } @@ -45,6 +53,7 @@ public Source getSource(LeafReaderContext ctx, int doc) throws IOException { private class SyntheticSourceLeafLoader { private final LeafStoredFieldLoader leafLoader; private final SourceLoader.Leaf leaf; + int lastSeenDocId = -1; SyntheticSourceLeafLoader(LeafReaderContext ctx) throws IOException { this.leafLoader = (sourceLoader.requiredStoredFields().isEmpty()) @@ -54,6 +63,7 @@ private class SyntheticSourceLeafLoader { } Source getSource(int doc) throws IOException { + this.lastSeenDocId = doc; leafLoader.advanceTo(doc); return leaf.source(leafLoader, doc); } diff --git a/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java b/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java index d42c1aa240a64..ad4904130d9f4 100644 --- a/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java +++ b/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.common.time.FormatNames; +import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.elasticsearch.test.cluster.local.distribution.DistributionType; import org.elasticsearch.test.rest.ESRestTestCase; @@ -221,6 +222,117 @@ public void testEsqlRuntimeFields() throws IOException { assertThat(sumLength, equalTo(20 * numDocs)); } + public void testSyntheticSourceRuntimeFieldQueries() throws IOException { + String mappings = """ + { + "runtime": { + "message_length": { + "type": "long" + } + }, + "dynamic": false, + "properties": { + "@timestamp": { + "type": "date" + }, + "log" : { + "properties": { + "level": { + "type": "keyword" + } + } + } + } + } + """; + String indexName = "test-foo"; + createIndex(indexName, Settings.builder().put("index.mode", "logsdb").build(), mappings); + + int numDocs = 1000; + var sb = new StringBuilder(); + var now = Instant.now(); + for (int i = 0; i < numDocs; i++) { + String level = randomBoolean() ? "info" : randomBoolean() ? "warning" : randomBoolean() ? "error" : "fatal"; + String msg = randomAlphaOfLength(20); + String messageLength = Integer.toString(msg.length()); + sb.append("{ \"create\": {} }").append('\n'); + if (randomBoolean()) { + sb.append(""" + {"@timestamp":"$now","message":"$msg","message_length":$l,"log":{"level":"$level"}} + """.replace("$now", formatInstant(now)).replace("$level", level).replace("$msg", msg).replace("$l", messageLength)); + } else { + sb.append(""" + {"@timestamp": "$now", "message": "$msg", "message_length": $l} + """.replace("$now", formatInstant(now)).replace("$msg", msg).replace("$l", messageLength)); + } + sb.append('\n'); + if (i != numDocs - 1) { + now = now.plusSeconds(1); + } + } + + var bulkRequest = new Request("POST", "/" + indexName + "/_bulk"); + bulkRequest.setJsonEntity(sb.toString()); + bulkRequest.addParameter("refresh", "true"); + var bulkResponse = client().performRequest(bulkRequest); + var bulkResponseBody = responseAsMap(bulkResponse); + assertThat(bulkResponseBody, Matchers.hasEntry("errors", false)); + + var forceMergeRequest = new Request("POST", "/" + indexName + "/_forcemerge"); + var forceMergeResponse = client().performRequest(forceMergeRequest); + assertOK(forceMergeResponse); + + var searchRequest = new Request("POST", "/" + indexName + "/_search"); + + searchRequest.setJsonEntity(""" + { + "size": 1, + "query": { + "bool": { + "should": [ + { + "range": { + "message_length": { + "gte": 1, + "lt": 900000 + } + } + }, + { + "range": { + "message_length": { + "gte": 900000, + "lt": 1000000 + } + } + } + ], + "minimum_should_match": "1", + "must_not": [ + { + "range": { + "message_length": { + "lt": 0 + } + } + } + ] + } + } + } + """); + var searchResponse = client().performRequest(searchRequest); + assertOK(searchResponse); + var searchResponseBody = responseAsMap(searchResponse); + int totalHits = (int) XContentMapValues.extractValue("hits.total.value", searchResponseBody); + assertThat(totalHits, equalTo(numDocs)); + + var shardsHeader = (Map) searchResponseBody.get("_shards"); + assertThat(shardsHeader.get("failed"), equalTo(0)); + assertThat(shardsHeader.get("successful"), equalTo(1)); + assertThat(shardsHeader.get("skipped"), equalTo(0)); + } + static String formatInstant(Instant instant) { return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant); }