Skip to content
Merged
5 changes: 5 additions & 0 deletions docs/changelog/128320.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 128320
summary: Use new source loader when lower `docId` is accessed
area: Codec
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ class ConcurrentSegmentSourceProvider implements SourceProvider {
private final SourceLoader sourceLoader;
private final StoredFieldLoader storedFieldLoader;
private final Map<Object, Leaf> leaves = ConcurrentCollections.newConcurrentMap();
private final boolean isStoredSource;

ConcurrentSegmentSourceProvider(SourceLoader loader, boolean loadSource) {
ConcurrentSegmentSourceProvider(SourceLoader loader, boolean isStoredSource) {
this.sourceLoader = loader;
// we force a sequential reader here since it is used during query execution where documents are scanned sequentially
this.storedFieldLoader = StoredFieldLoader.create(loadSource, sourceLoader.requiredStoredFields(), true);
this.storedFieldLoader = StoredFieldLoader.create(isStoredSource, sourceLoader.requiredStoredFields(), true);
this.isStoredSource = isStoredSource;
}

@Override
Expand All @@ -44,6 +46,11 @@ public Source getSource(LeafReaderContext ctx, int doc) throws IOException {
leaf = new Leaf(sourceLoader.leaf(ctx.reader(), null), storedFieldLoader.getLoader(ctx, null));
var existing = leaves.put(id, leaf);
assert existing == null : "unexpected source provider [" + existing + "]";
} else if (isStoredSource == false && doc < leaf.doc) {
// For synthetic source, if a runtime field is used more than once, a new source loader must be used
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also incorporate the following into this comment: with both synthetic and stored source the docid can go backwards. This isn't a problem for stored fields (somehow it validate when docid goes backwards and doesn't run into EOF like errors), but it is for doc values (which gets used with synthetic source). ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue here seems to be when multiple clauses in a query reference runtime fields on the same request, no? So it's not only about using the same runtime field twice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue here seems to be when multiple clauses in a query reference runtime fields on the same request

Yes I think that is a more accurate way to describe the issue. I confirmed that, unsurprisingly, aggregating twice on the same runtime field does indeed work fine as is. So this is specific to there being 2+ query clauses referencing the same runtime field.

// for each use of the field, as doc value iterators may only be read once in increasing docId order.
leaf = new Leaf(sourceLoader.leaf(ctx.reader(), null), storedFieldLoader.getLoader(ctx, null));
leaves.put(id, leaf);
}
return leaf.getSource(ctx, doc);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,4 +340,108 @@ public void testLogsdbDefaultWithRecoveryUseSyntheticSource() throws IOException
assertNull(settings.get("index.mapping.source.mode"));
assertEquals("true", settings.get(IndexSettings.LOGSDB_SORT_ON_HOST_NAME.getKey()));
}

public void testSyntheticSourceRuntimeFieldQueries() throws IOException {
String mappings = """
{
"runtime": {
"message_length": {
"type": "long"
}
},
"dynamic": false,
"properties": {
"@timestamp": {
"type": "date"
}
}
}
""";
String indexName = "test-foo";
createIndex(indexName, Settings.builder().put("index.mode", "logsdb").build(), mappings);

int numDocs = 100_000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The amount of data that this test generates / indexes, makes this is an expensive test. Does the problem re-occur if we lower the number of documents here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh yep, 100k is a lot. Your original test example actually triggers the issue with 10 docs. Reverting the test to be closer to your original version, and decreasing to 10 docs.

var sb = new StringBuilder();
var now = Instant.now();

for (int i = 0; i < numDocs; i++) {
String msg = randomAlphaOfLength(20);
String messageLength = Integer.toString(msg.length());
sb.append("{ \"create\": {} }").append('\n');
sb.append("""
{"@timestamp": "$now", "message_length": $l}
""".replace("$now", formatInstant(now)).replace("$l", messageLength));
sb.append('\n');
if (i != numDocs - 1) {
now = now.plusSeconds(1);
}

if (i % 1000 == 0) {
var bulkRequest = new Request("POST", "/" + indexName + "/_bulk");
bulkRequest.setJsonEntity(sb.toString());
var bulkResponse = client().performRequest(bulkRequest);
var bulkResponseBody = responseAsMap(bulkResponse);
assertThat(bulkResponseBody, Matchers.hasEntry("errors", false));
sb = new StringBuilder();
}
}

var bulkRequest = new Request("POST", "/" + indexName + "/_bulk");
bulkRequest.setJsonEntity(sb.toString());
bulkRequest.addParameter("refresh", "true");
var bulkResponse = client().performRequest(bulkRequest);
var bulkResponseBody = responseAsMap(bulkResponse);
assertThat(bulkResponseBody, Matchers.hasEntry("errors", false));

var forceMergeRequest = new Request("POST", "/" + indexName + "/_forcemerge");
var forceMergeResponse = client().performRequest(forceMergeRequest);
assertOK(forceMergeResponse);

var searchRequest = new Request("POST", "/" + indexName + "/_search");
searchRequest.setJsonEntity("""
{
"size": 1,
"query": {
"bool": {
"should": [
{
"range": {
"message_length": {
"gte": 1,
"lt": 900000
}
}
},
{
"range": {
"message_length": {
"gte": 900000,
"lt": 1000000
}
}
}
],
"minimum_should_match": "1",
"must_not": [
{
"range": {
"message_length": {
"lt": 0
}
}
}
]
}
}
}
""");
var searchResponse = client().performRequest(searchRequest);
assertOK(searchResponse);
var searchResponseBody = responseAsMap(searchResponse);
var shardsHeader = (Map<?, ?>) searchResponseBody.get("_shards");
assertThat(shardsHeader.get("failed"), equalTo(0));
assertThat(shardsHeader.get("successful"), equalTo(1));
assertThat(shardsHeader.get("skipped"), equalTo(0));
logger.info("searchResponse: {}", searchResponseBody);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also assert total hits here?

}
}
Loading