Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
630b673
ESQL: Ignore multivalued key columns in lookup index
ivancea Jan 23, 2025
4692f62
Fixed test to work with Query rewrites
ivancea Jan 24, 2025
172abd8
Added custom warnings
ivancea Jan 24, 2025
91f2684
Merge branch 'main' into esql-join-multivalue-on-index
ivancea Jan 27, 2025
6bd297d
Merge branch 'esql-join-multivalue-on-index' into esql-join-multivalu…
ivancea Jan 27, 2025
a801c8e
Fix merge
ivancea Jan 27, 2025
b82b446
Merge branch 'main' into esql-join-multivalue-on-index
ivancea Jan 28, 2025
1624f46
Merge branch 'esql-join-multivalue-on-index' into esql-join-multivalu…
ivancea Jan 28, 2025
2134d83
First warnings draft, randomly failing on CSV tests with missing warn…
ivancea Jan 29, 2025
0f23fea
Restore warning on isFinished
ivancea Jan 29, 2025
e134e93
[CI] Auto commit changes from spotless
Jan 29, 2025
ce0849b
Merge branch 'main' into esql-join-multivalue-on-index-warnings
ivancea Jan 30, 2025
9c37b16
Update capability for warnings
ivancea Jan 30, 2025
b2c11ad
Merge branch 'main' into esql-join-multivalue-on-index-warnings
ivancea Jan 31, 2025
b0ea3d4
Send source in Join serialization, and add tests
ivancea Jan 31, 2025
393da75
Update Join serialization test
ivancea Jan 31, 2025
f40de01
Merge branch 'main' into esql-join-multivalue-on-index-warnings
ivancea Feb 3, 2025
e627316
Serialize Configuration to get the source in the LOOKUP request, + Tr…
ivancea Feb 4, 2025
e6b1e5d
Merge branch 'main' into esql-join-multivalue-on-index-warnings
ivancea Feb 4, 2025
08c8ec0
Use existing ResponseHeadersCollector instead of manually taking warn…
ivancea Feb 4, 2025
e9b0456
Remove unused code
ivancea Feb 4, 2025
2bc5bb5
Move headers collection to AsyncOperator
ivancea Feb 4, 2025
7faf418
Fixed tests
ivancea Feb 5, 2025
12022ad
Merge branch 'main' into esql-join-multivalue-on-index-warnings
ivancea Feb 5, 2025
73e163b
Avoid creating exceptions for warnings
ivancea Feb 6, 2025
2ec796d
Javadoc typo fix
ivancea Feb 6, 2025
353f3d9
Only transport the Source text, instead of the full Configuration
ivancea Feb 6, 2025
537c0f1
Merge branch 'main' into esql-join-multivalue-on-index-warnings
ivancea Feb 6, 2025
e148592
[CI] Auto commit changes from spotless
Feb 6, 2025
d5dfaf4
Fix compilation
ivancea Feb 6, 2025
af6f021
[CI] Auto commit changes from spotless
Feb 6, 2025
7b67a04
Fixed outdated test
ivancea Feb 6, 2025
98ef65b
Update IT test
ivancea Feb 6, 2025
af97131
Merge branch 'main' into esql-join-multivalue-on-index-warnings
ivancea Feb 10, 2025
4355bcd
Merge branch 'main' into esql-join-multivalue-on-index-warnings
ivancea Feb 11, 2025
ce19cc6
Added 8.x and 9.0 transport versions
ivancea Feb 11, 2025
1bba291
Add link from TVs to Versioning.md
ivancea Feb 11, 2025
eed7649
Revert "Added 8.x and 9.0 transport versions"
ivancea Feb 12, 2025
511f093
Merge branch 'main' into esql-join-multivalue-on-index-warnings
ivancea Feb 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.BitSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Locale;
import java.util.Objects;
Expand Down Expand Up @@ -408,6 +409,17 @@ public static void addWarning(String message, Object... params) {
addWarning(THREAD_CONTEXT, message, params);
}

/**
* Adds a warning header without any formatting or prefix
*/
public static void addRawWarning(String warningHeader) {
addRawWarning(THREAD_CONTEXT, warningHeader);
}

public static Set<String> getWarnings() {
return getWarnings(THREAD_CONTEXT);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did we manage to do this shift without this before? I feel like we shift threads and merge warnings in other ways without requiring this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess rather than make a change to the core warnings stuff I'd prefer we continue to hack around it until we can replace the thread-local-warnings in ESQL. With, like, something else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to what Enrich does (A custom class that does a similar thing, but with all the headers). And moved the logic instead to AsyncOperator, as to avoid repeating it, as it looks quite a standard thing to do there.
Added it to the javadoc too


// package scope for testing
static void addWarning(Set<ThreadContext> threadContexts, String message, Object... params) {
final Iterator<ThreadContext> iterator = threadContexts.iterator();
Expand All @@ -426,4 +438,25 @@ static void addWarning(Set<ThreadContext> threadContexts, String message, Object
}
}
}

static void addRawWarning(Set<ThreadContext> threadContexts, String warningHeader) {
for (ThreadContext threadContext : threadContexts) {
threadContext.addResponseHeader("Warning", warningHeader);
}
}

static Set<String> getWarnings(Set<ThreadContext> threadContexts) {
final Iterator<ThreadContext> iterator = threadContexts.iterator();
if (iterator.hasNext()) {
final Set<String> warnings = new HashSet<>();
while (iterator.hasNext()) {
final ThreadContext next = iterator.next();
final Set<String> contextWarnings = next.getResponseHeader("Warning");
warnings.addAll(contextWarnings);

}
return warnings;
}
return Set.of();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,23 @@ public Map<String, List<String>> getResponseHeaders() {
return Collections.unmodifiableMap(map);
}

/**
* Get a copy of the specified <em>response</em> header.
*
* @param header The header to get.
* @return Never {@code null}.
*/
public Set<String> getResponseHeader(String header) {
Map<String, Set<String>> responseHeaders = threadLocal.get().responseHeaders;
Set<String> values = responseHeaders.get(header);

if (values == null) {
return Set.of();
}

return Set.copyOf(values);
}

/**
* Copies all header key, value pairs into the current context
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.HeaderWarning;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.TimeValue;
Expand All @@ -26,6 +27,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.LongAdder;

/**
Expand All @@ -41,6 +43,7 @@ public abstract class AsyncOperator<Fetched> implements Operator {
private volatile SubscribableListener<Void> blockedFuture;

private final Map<Long, Fetched> buffers = ConcurrentCollections.newConcurrentMap();
private final Set<String> storedWarningHeaders = ConcurrentCollections.newConcurrentSet();
private final FailureCollector failureCollector = new FailureCollector();
private final DriverContext driverContext;

Expand Down Expand Up @@ -89,9 +92,11 @@ public void addInput(Page input) {
try {
final ActionListener<Fetched> listener = ActionListener.wrap(output -> {
buffers.put(seqNo, output);
storeWarnings();
onSeqNoCompleted(seqNo);
}, e -> {
releasePageOnAnyThread(input);
storeWarnings();
failureCollector.unwrapAndCollect(e);
onSeqNoCompleted(seqNo);
});
Expand Down Expand Up @@ -184,6 +189,7 @@ public void finish() {
public boolean isFinished() {
if (finished && checkpoint.getPersistedCheckpoint() == checkpoint.getMaxSeqNo()) {
checkFailure();
restoreWarnings();
return true;
} else {
return false;
Expand All @@ -197,6 +203,7 @@ public boolean isFinished() {
public final Fetched fetchFromBuffer() {
checkFailure();
long persistedCheckpoint = checkpoint.getPersistedCheckpoint();
restoreWarnings();
if (persistedCheckpoint < checkpoint.getProcessedCheckpoint()) {
persistedCheckpoint++;
Fetched result = buffers.remove(persistedCheckpoint);
Expand All @@ -207,6 +214,18 @@ public final Fetched fetchFromBuffer() {
}
}

private void storeWarnings() {
Set<String> warnings = HeaderWarning.getWarnings();
storedWarningHeaders.addAll(warnings);
}

private void restoreWarnings() {
for (String warning : storedWarningHeaders) {
HeaderWarning.addRawWarning(warning);
}
// TODO: Remove warnings?
}

@Override
public IsBlockedResult isBlocked() {
// TODO: Add an exchange service between async operation instead?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.index.query.SearchExecutionContext;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.IntFunction;
Expand All @@ -47,13 +48,19 @@ public abstract class QueryList {
protected final SearchExecutionContext searchExecutionContext;
protected final MappedFieldType field;
protected final Block block;
protected final boolean onlySingleValues;
@Nullable
protected final OnlySingleValueParams onlySingleValueParams;

protected QueryList(MappedFieldType field, SearchExecutionContext searchExecutionContext, Block block, boolean onlySingleValues) {
protected QueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
Block block,
OnlySingleValueParams onlySingleValueParams
) {
this.searchExecutionContext = searchExecutionContext;
this.field = field;
this.block = block;
this.onlySingleValues = onlySingleValues;
this.onlySingleValueParams = onlySingleValueParams;
}

/**
Expand All @@ -66,19 +73,27 @@ int getPositionCount() {
/**
* Returns a copy of this query list that only returns queries for single-valued positions.
* That is, it returns `null` queries for either multivalued or null positions.
* <p>
* Whenever a multi-value position is encountered, whether in the input block or in the queried index, a warning is emitted.
* </p>
*/
public abstract QueryList onlySingleValues();
public abstract QueryList onlySingleValues(Warnings warnings, String multiValueWarningMessage);

final Query getQuery(int position) {
final int valueCount = block.getValueCount(position);
if (onlySingleValues && valueCount != 1) {
if (onlySingleValueParams != null && valueCount != 1) {
if (valueCount > 1) {
onlySingleValueParams.warnings.registerException(
new IllegalArgumentException(onlySingleValueParams.multiValueWarningMessage)
);
}
return null;
}
final int firstValueIndex = block.getFirstValueIndex(position);

Query query = doGetQuery(position, firstValueIndex, valueCount);

if (onlySingleValues) {
if (onlySingleValueParams != null) {
query = wrapSingleValueQuery(query);
}

Expand All @@ -92,22 +107,24 @@ final Query getQuery(int position) {
abstract Query doGetQuery(int position, int firstValueIndex, int valueCount);

private Query wrapSingleValueQuery(Query query) {
assert onlySingleValueParams != null : "Requested to wrap single value query without single value params";

SingleValueMatchQuery singleValueQuery = new SingleValueMatchQuery(
searchExecutionContext.getForField(field, MappedFieldType.FielddataOperation.SEARCH),
// Not emitting warnings for multivalued fields not matching
Warnings.NOOP_WARNINGS
onlySingleValueParams.warnings,
onlySingleValueParams.multiValueWarningMessage
);

Query rewrite = singleValueQuery;
Query rewrite;
try {
rewrite = singleValueQuery.rewrite(searchExecutionContext.searcher());
if (rewrite instanceof MatchAllDocsQuery) {
// nothing to filter
return query;
}
} catch (IOException e) {
// ignore
// TODO: Should we do something with the exception?
throw new UncheckedIOException("Error while rewriting SingleValueQuery", e);
}

BooleanQuery.Builder builder = new BooleanQuery.Builder();
Expand Down Expand Up @@ -152,7 +169,7 @@ public static QueryList rawTermQueryList(MappedFieldType field, SearchExecutionC
case COMPOSITE -> throw new IllegalArgumentException("can't read values from [composite] block");
case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]");
};
return new TermQueryList(field, searchExecutionContext, block, false, blockToJavaObject);
return new TermQueryList(field, searchExecutionContext, block, null, blockToJavaObject);
}

/**
Expand All @@ -162,7 +179,7 @@ public static QueryList rawTermQueryList(MappedFieldType field, SearchExecutionC
public static QueryList ipTermQueryList(MappedFieldType field, SearchExecutionContext searchExecutionContext, BytesRefBlock block) {
BytesRef scratch = new BytesRef();
byte[] ipBytes = new byte[InetAddressPoint.BYTES];
return new TermQueryList(field, searchExecutionContext, block, false, offset -> {
return new TermQueryList(field, searchExecutionContext, block, null, offset -> {
final var bytes = block.getBytesRef(offset, scratch);
if (ipBytes.length != bytes.length) {
// Lucene only support 16-byte IP addresses, even IPv4 is encoded in 16 bytes
Expand All @@ -182,7 +199,7 @@ public static QueryList dateTermQueryList(MappedFieldType field, SearchExecution
field,
searchExecutionContext,
block,
false,
null,
field instanceof RangeFieldMapper.RangeFieldType rangeFieldType
? offset -> rangeFieldType.dateTimeFormatter().formatMillis(block.getLong(offset))
: block::getLong
Expand All @@ -193,7 +210,7 @@ public static QueryList dateTermQueryList(MappedFieldType field, SearchExecution
* Returns a list of geo_shape queries for the given field and the input block.
*/
public static QueryList geoShapeQueryList(MappedFieldType field, SearchExecutionContext searchExecutionContext, Block block) {
return new GeoShapeQueryList(field, searchExecutionContext, block, false);
return new GeoShapeQueryList(field, searchExecutionContext, block, null);
}

private static class TermQueryList extends QueryList {
Expand All @@ -203,16 +220,22 @@ private TermQueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
Block block,
boolean onlySingleValues,
OnlySingleValueParams onlySingleValueParams,
IntFunction<Object> blockValueReader
) {
super(field, searchExecutionContext, block, onlySingleValues);
super(field, searchExecutionContext, block, onlySingleValueParams);
this.blockValueReader = blockValueReader;
}

@Override
public TermQueryList onlySingleValues() {
return new TermQueryList(field, searchExecutionContext, block, true, blockValueReader);
public TermQueryList onlySingleValues(Warnings warnings, String multiValueWarningMessage) {
return new TermQueryList(
field,
searchExecutionContext,
block,
new OnlySingleValueParams(warnings, multiValueWarningMessage),
blockValueReader
);
}

@Override
Expand Down Expand Up @@ -241,17 +264,22 @@ private GeoShapeQueryList(
MappedFieldType field,
SearchExecutionContext searchExecutionContext,
Block block,
boolean onlySingleValues
OnlySingleValueParams onlySingleValueParams
) {
super(field, searchExecutionContext, block, onlySingleValues);
super(field, searchExecutionContext, block, onlySingleValueParams);

this.blockValueReader = blockToGeometry(block);
this.shapeQuery = shapeQuery();
}

@Override
public GeoShapeQueryList onlySingleValues() {
return new GeoShapeQueryList(field, searchExecutionContext, block, true);
public GeoShapeQueryList onlySingleValues(Warnings warnings, String multiValueWarningMessage) {
return new GeoShapeQueryList(
field,
searchExecutionContext,
block,
new OnlySingleValueParams(warnings, multiValueWarningMessage)
);
}

@Override
Expand Down Expand Up @@ -295,4 +323,6 @@ private IntFunction<Query> shapeQuery() {
throw new IllegalArgumentException("Unsupported field type for geo_match ENRICH: " + field.typeName());
}
}

protected record OnlySingleValueParams(Warnings warnings, String multiValueWarningMessage) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,14 @@ public final class SingleValueMatchQuery extends Query {
* This avoids reporting warnings when queries are not matching multi-values
*/
private static final int MULTI_VALUE_MATCH_COST = 1000;
private static final IllegalArgumentException MULTI_VALUE_EXCEPTION = new IllegalArgumentException(
"single-value function encountered multi-value"
);
private final IndexFieldData<?> fieldData;
private final Warnings warnings;
private final IllegalArgumentException multiValueException;

public SingleValueMatchQuery(IndexFieldData<?> fieldData, Warnings warnings) {
public SingleValueMatchQuery(IndexFieldData<?> fieldData, Warnings warnings, String multiValueExceptionMessage) {
this.fieldData = fieldData;
this.warnings = warnings;
this.multiValueException = new IllegalArgumentException(multiValueExceptionMessage);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we init this one lazily? It's pretty expensive to build it on every query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the exception instantiation, and added a addWarning overload that just receives the class + message, for cases like this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

@Override
Expand Down Expand Up @@ -123,7 +122,7 @@ private ScorerSupplier scorerSupplier(
return false;
}
if (sortedNumerics.docValueCount() != 1) {
warnings.registerException(MULTI_VALUE_EXCEPTION);
warnings.registerException(multiValueException);
return false;
}
return true;
Expand Down Expand Up @@ -158,7 +157,7 @@ private ScorerSupplier scorerSupplier(
return false;
}
if (sortedSetDocValues.docValueCount() != 1) {
warnings.registerException(MULTI_VALUE_EXCEPTION);
warnings.registerException(multiValueException);
return false;
}
return true;
Expand Down Expand Up @@ -187,7 +186,7 @@ private ScorerSupplier scorerSupplier(
return false;
}
if (sortedBinaryDocValues.docValueCount() != 1) {
warnings.registerException(MULTI_VALUE_EXCEPTION);
warnings.registerException(multiValueException);
return false;
}
return true;
Expand Down
Loading