Skip to content

Commit 332b443

Browse files
committed
Use new source loader when lower docId is accessed
When using synthetic source, runtime fields data may come from doc values. Doc values iterators can only be read once, and in increasing docId order. But if a runtime field is referenced multiple times in a query, currently the same doc value iterator will be used. This causes an error, as the second field reference will attempt to read the same iterator from a lower docId than was previously used. The fix is to create a new source loader, and thus a new doc value iterator, if the requested docId is lower than the last seen docId.
1 parent b8e2fce commit 332b443

File tree

2 files changed

+113
-2
lines changed

2 files changed

+113
-2
lines changed

server/src/main/java/org/elasticsearch/search/lookup/ConcurrentSegmentSourceProvider.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@ class ConcurrentSegmentSourceProvider implements SourceProvider {
2929
private final SourceLoader sourceLoader;
3030
private final StoredFieldLoader storedFieldLoader;
3131
private final Map<Object, Leaf> leaves = ConcurrentCollections.newConcurrentMap();
32+
private final boolean isStoredSource;
3233

33-
ConcurrentSegmentSourceProvider(SourceLoader loader, boolean loadSource) {
34+
ConcurrentSegmentSourceProvider(SourceLoader loader, boolean isStoredSource) {
3435
this.sourceLoader = loader;
3536
// we force a sequential reader here since it is used during query execution where documents are scanned sequentially
36-
this.storedFieldLoader = StoredFieldLoader.create(loadSource, sourceLoader.requiredStoredFields(), true);
37+
this.storedFieldLoader = StoredFieldLoader.create(isStoredSource, sourceLoader.requiredStoredFields(), true);
38+
this.isStoredSource = isStoredSource;
3739
}
3840

3941
@Override
@@ -44,6 +46,11 @@ public Source getSource(LeafReaderContext ctx, int doc) throws IOException {
4446
leaf = new Leaf(sourceLoader.leaf(ctx.reader(), null), storedFieldLoader.getLoader(ctx, null));
4547
var existing = leaves.put(id, leaf);
4648
assert existing == null : "unexpected source provider [" + existing + "]";
49+
} else if (isStoredSource == false && doc < leaf.doc) {
50+
// For synthetic source, if a runtime field is used more than once, a new source loader must be used
51+
// for each use of the field, as doc value iterators may only be read once in increasing docId order.
52+
leaf = new Leaf(sourceLoader.leaf(ctx.reader(), null), storedFieldLoader.getLoader(ctx, null));
53+
leaves.put(id, leaf);
4754
}
4855
return leaf.getSource(ctx, doc);
4956
}

x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,4 +340,108 @@ public void testLogsdbDefaultWithRecoveryUseSyntheticSource() throws IOException
340340
assertNull(settings.get("index.mapping.source.mode"));
341341
assertEquals("true", settings.get(IndexSettings.LOGSDB_SORT_ON_HOST_NAME.getKey()));
342342
}
343+
344+
public void testSyntheticSourceRuntimeFieldQueries() throws IOException {
345+
String mappings = """
346+
{
347+
"runtime": {
348+
"message_length": {
349+
"type": "long"
350+
}
351+
},
352+
"dynamic": false,
353+
"properties": {
354+
"@timestamp": {
355+
"type": "date"
356+
}
357+
}
358+
}
359+
""";
360+
String indexName = "test-foo";
361+
createIndex(indexName, Settings.builder().put("index.mode", "logsdb").build(), mappings);
362+
363+
int numDocs = 100_000;
364+
var sb = new StringBuilder();
365+
var now = Instant.now();
366+
367+
for (int i = 0; i < numDocs; i++) {
368+
String msg = randomAlphaOfLength(20);
369+
String messageLength = Integer.toString(msg.length());
370+
sb.append("{ \"create\": {} }").append('\n');
371+
sb.append("""
372+
{"@timestamp": "$now", "message_length": $l}
373+
""".replace("$now", formatInstant(now)).replace("$l", messageLength));
374+
sb.append('\n');
375+
if (i != numDocs - 1) {
376+
now = now.plusSeconds(1);
377+
}
378+
379+
if (i % 1000 == 0) {
380+
var bulkRequest = new Request("POST", "/" + indexName + "/_bulk");
381+
bulkRequest.setJsonEntity(sb.toString());
382+
var bulkResponse = client().performRequest(bulkRequest);
383+
var bulkResponseBody = responseAsMap(bulkResponse);
384+
assertThat(bulkResponseBody, Matchers.hasEntry("errors", false));
385+
sb = new StringBuilder();
386+
}
387+
}
388+
389+
var bulkRequest = new Request("POST", "/" + indexName + "/_bulk");
390+
bulkRequest.setJsonEntity(sb.toString());
391+
bulkRequest.addParameter("refresh", "true");
392+
var bulkResponse = client().performRequest(bulkRequest);
393+
var bulkResponseBody = responseAsMap(bulkResponse);
394+
assertThat(bulkResponseBody, Matchers.hasEntry("errors", false));
395+
396+
var forceMergeRequest = new Request("POST", "/" + indexName + "/_forcemerge");
397+
var forceMergeResponse = client().performRequest(forceMergeRequest);
398+
assertOK(forceMergeResponse);
399+
400+
var searchRequest = new Request("POST", "/" + indexName + "/_search");
401+
searchRequest.setJsonEntity("""
402+
{
403+
"size": 1,
404+
"query": {
405+
"bool": {
406+
"should": [
407+
{
408+
"range": {
409+
"message_length": {
410+
"gte": 1,
411+
"lt": 900000
412+
}
413+
}
414+
},
415+
{
416+
"range": {
417+
"message_length": {
418+
"gte": 900000,
419+
"lt": 1000000
420+
}
421+
}
422+
}
423+
],
424+
"minimum_should_match": "1",
425+
"must_not": [
426+
{
427+
"range": {
428+
"message_length": {
429+
"lt": 0
430+
}
431+
}
432+
}
433+
]
434+
}
435+
}
436+
}
437+
""");
438+
var searchResponse = client().performRequest(searchRequest);
439+
assertOK(searchResponse);
440+
var searchResponseBody = responseAsMap(searchResponse);
441+
var shardsHeader = (Map<?, ?>) searchResponseBody.get("_shards");
442+
assertThat(shardsHeader.get("failed"), equalTo(0));
443+
assertThat(shardsHeader.get("successful"), equalTo(1));
444+
assertThat(shardsHeader.get("skipped"), equalTo(0));
445+
logger.info("searchResponse: {}", searchResponseBody);
446+
}
343447
}

0 commit comments

Comments
 (0)