-
Notifications
You must be signed in to change notification settings - Fork 25.6k
ESQL: Initial support for unmapped fields #119886
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
fa3dd20
c2b0d9c
7b33f15
a105f0a
0ca5e79
bf309fb
0b72c89
c046112
aca3796
dacc8f6
f71a586
2ba5935
c22b9d4
f5c77fa
d6afb24
25f0c53
165ad16
9d85915
d0959ea
d68a0ca
a12aa82
f02fd43
a8d9372
fcd4663
be4bbfd
b84ff2c
a0e1840
0b4132e
b05b327
3f9faef
903f6fa
85b84f5
400f473
fb09120
0d9b280
d7ce623
0f01ebe
5c2c7d7
af26d5f
faef67a
1204791
22569b8
a9f4a0b
8f3c235
7d07d1a
697989c
c0fe85e
ab9bfdf
5c1c2be
9805f12
d5dc662
ef56e8a
13b3dff
088a622
ea8567b
9c466e4
1fb5cb4
8017c78
4f6f6a1
971b869
ef94c52
c9fdc92
b46cb9d
b5da964
7a48277
dcc518c
770ea6f
97399fc
e32a446
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pr: 119886 | ||
summary: Initial support for unmapped fields | ||
area: ES|QL | ||
type: feature | ||
issues: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.esql.core.type; | ||
|
||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.core.Strings; | ||
import org.elasticsearch.xpack.esql.core.expression.Expression; | ||
|
||
import java.io.IOException; | ||
import java.util.Map; | ||
import java.util.TreeMap; | ||
import java.util.TreeSet; | ||
|
||
import static org.elasticsearch.xpack.esql.core.type.DataType.KEYWORD; | ||
import static org.elasticsearch.xpack.esql.core.util.PlanStreamInput.readCachedStringWithVersionCheck; | ||
import static org.elasticsearch.xpack.esql.core.util.PlanStreamOutput.writeCachedStringWithVersionCheck; | ||
|
||
/// A special marker for fields explicitly marked as unmapped in the query. These need to be explicitly marked, so we know to search for | ||
|
||
/// during data loading. By default, the [DataType] of these fields is [DataType#KEYWORD], since we can always convert a [DataType#KEYWORD] | ||
/// to any other type. However, this can cause type conflicts with partially mapped types, i.e, types which appear in some indices but not | ||
/// all. In that sense, this field is very similar to the [MultiTypeEsField] feature, except we allow for unmapped fields. | ||
|
||
/// | ||
/// Check out the [State] algebraic data type for the different kinds of unmapped fields. | ||
public class UnmappedEsField extends EsField { | ||
private UnmappedEsField(State state, String name, DataType dataType, Map<String, EsField> properties) { | ||
super(name, dataType, properties, true /* aggregatable */); | ||
this.state = state; | ||
} | ||
|
||
public sealed interface State {} | ||
|
||
/// A field which is either unmapped in all indices, or mapped to [DataType#KEYWORD] in all indices where it is mapped. | ||
public enum NoConflicts implements State { | ||
INSTANCE; | ||
} | ||
|
||
/// A field which is mapped to a single type, which is not a [DataType#KEYWORD]. This can be resolved using a cast, similar to union | ||
/// types. This is treated differently from [InvalidMappedField], since resolving this conflict doesn't use [MultiTypeEsField]. | ||
public record SimpleConflict(DataType otherType) implements State { | ||
public SimpleConflict { | ||
if (otherType == KEYWORD) { | ||
throw new IllegalArgumentException("Use NoConflicts.INSTANCE for KEYWORD"); | ||
} | ||
} | ||
} | ||
|
||
/// A field which is mapped to a single type, but that type is not [DataType#KEYWORD]. This is resolved using the specified conversions. | ||
public record SimpleResolution(Expression unmappedConversion, Expression mappedConversion) implements State {} | ||
|
||
/// A field which is mapped to more than one type in multiple indices, *in addition* to the unmapped case which is always treated as | ||
/// [DataType#KEYWORD]. This can be resolved using a cast, similar to union types. | ||
public record Invalid(InvalidMappedField invalidMappedField) implements State {} | ||
|
||
/// A field which is mapped to different types in different indices, but resolved using union types. In mapped indices, we treat this | ||
/// as a union type, and use the specified conversion for unmapped indices. | ||
public record MultiType(Expression conversionFromKeyword, MultiTypeEsField multiTypeEsField) implements State {} | ||
|
||
private final State state; | ||
|
||
public State getState() { | ||
return state; | ||
} | ||
|
||
public static UnmappedEsField fromField(EsField f) { | ||
State state = switch (f) { | ||
case InvalidMappedField imf -> { | ||
var newTypesToIndices = new TreeMap<>(imf.getTypesToIndices()); | ||
newTypesToIndices.compute(KEYWORD.typeName(), (k, v) -> v == null ? new TreeSet<>() : new TreeSet<>(v)) | ||
.add("unmapped field"); | ||
yield new Invalid(imf.withTypesToIndices(newTypesToIndices)); | ||
} | ||
case MultiTypeEsField mf -> throw new IllegalArgumentException("Use fromMultiType for MultiTypeEsField"); | ||
|
||
default -> f.getDataType() == KEYWORD ? UnmappedEsField.NoConflicts.INSTANCE : new SimpleConflict(f.getDataType()); | ||
}; | ||
return new UnmappedEsField(state, f.getName(), f.getDataType(), f.getProperties()); | ||
} | ||
|
||
public static UnmappedEsField fromStandalone(String name) { | ||
return new UnmappedEsField(NoConflicts.INSTANCE, name, KEYWORD, Map.of()); | ||
} | ||
|
||
public static UnmappedEsField fromMultiType(Expression expression, MultiTypeEsField multiTypeEsField) { | ||
return new UnmappedEsField( | ||
new MultiType(expression, multiTypeEsField), | ||
multiTypeEsField.getName(), | ||
multiTypeEsField.getDataType(), | ||
multiTypeEsField.getProperties() | ||
); | ||
} | ||
|
||
public static UnmappedEsField simpleResolution(Expression unmappedConv, Expression mappedConv, String name) { | ||
assert unmappedConv.dataType() == mappedConv.dataType() | ||
: Strings.format( | ||
"Both conversions must have the same target type, but got [%s, %s]", | ||
unmappedConv.dataType(), | ||
mappedConv.dataType() | ||
); | ||
assert unmappedConv.children().get(0).dataType() == KEYWORD | ||
: Strings.format("Unmapped conversion must be from keyword, but got [%s]", unmappedConv.children().get(0).dataType()); | ||
assert mappedConv.children().get(0).dataType() != KEYWORD : Strings.format("Unmapped conversion must be from non-keyword"); | ||
return new UnmappedEsField(new SimpleResolution(unmappedConv, mappedConv), name, unmappedConv.dataType(), Map.of()); | ||
} | ||
|
||
@Override | ||
public void writeContent(StreamOutput out) throws IOException { | ||
switch (state) { | ||
case NoConflicts unused -> { | ||
out.writeInt(0); | ||
} | ||
case SimpleConflict sf -> { | ||
out.writeInt(1); | ||
sf.otherType().writeTo(out); | ||
} | ||
case SimpleResolution sr -> { | ||
out.writeInt(2); | ||
out.writeNamedWriteable(sr.unmappedConversion()); | ||
out.writeNamedWriteable(sr.mappedConversion()); | ||
} | ||
case Invalid invalid -> { | ||
out.writeInt(3); | ||
invalid.invalidMappedField().writeTo(out); | ||
} | ||
case MultiType mt -> { | ||
out.writeInt(4); | ||
out.writeNamedWriteable(mt.conversionFromKeyword()); | ||
mt.multiTypeEsField.writeTo(out); | ||
} | ||
} | ||
writeCachedStringWithVersionCheck(out, getName()); | ||
getDataType().writeTo(out); | ||
out.writeMap(getProperties(), (o, x) -> x.writeTo(out)); | ||
} | ||
|
||
UnmappedEsField(StreamInput in) throws IOException { | ||
this(readState(in), readCachedStringWithVersionCheck(in), DataType.readFrom(in), in.readImmutableMap(EsField::readFrom)); | ||
} | ||
|
||
private static State readState(StreamInput in) throws IOException { | ||
var ordinal = in.readInt(); | ||
return switch (ordinal) { | ||
case 0 -> NoConflicts.INSTANCE; | ||
case 1 -> new SimpleConflict(DataType.readFrom(in)); | ||
case 2 -> new SimpleResolution(in.readNamedWriteable(Expression.class), in.readNamedWriteable(Expression.class)); | ||
case 3 -> new Invalid(InvalidMappedField.readFrom(in)); | ||
case 4 -> new MultiType(in.readNamedWriteable(Expression.class), MultiTypeEsField.readFrom(in)); | ||
default -> throw new AssertionError("Unexpected ordinal: " + ordinal); | ||
}; | ||
} | ||
|
||
@Override | ||
public String getWriteableName() { | ||
return "UnmappedEsField"; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return Strings.format("UnmappedEsField{state=%s}", state); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,7 +10,9 @@ | |
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.OptionalInt; | ||
import java.util.Set; | ||
import java.util.function.Predicate; | ||
|
||
import static java.util.Collections.emptyList; | ||
|
||
|
@@ -83,4 +85,21 @@ public static <T> List<T> nullSafeList(T... entries) { | |
} | ||
return list; | ||
} | ||
|
||
public static <T, S extends T> List<S> collectType(Collection<T> list, Class<S> clazz) { | ||
GalLalouche marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
return list.stream().<S>mapMulti((e, c) -> { | ||
if (clazz.isInstance(e)) { | ||
c.accept(clazz.cast(e)); | ||
} | ||
}).toList(); | ||
} | ||
|
||
public static <T> OptionalInt findIndex(List<T> list, Predicate<T> predicate) { | ||
|
||
for (int i = 0; i < list.size(); i++) { | ||
if (predicate.test(list.get(i))) { | ||
return OptionalInt.of(i); | ||
} | ||
} | ||
return OptionalInt.empty(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -120,7 +120,9 @@ public static Tuple<Version, Version> skipVersionRange(String testName, String i | |
return null; | ||
} | ||
|
||
public static Tuple<Page, List<String>> loadPageFromCsv(URL source, Map<String, String> typeMapping) throws Exception { | ||
public record PageColumn(String name, DataType dataType) {} | ||
|
||
|
||
public static Tuple<Page, List<PageColumn>> loadPageFromCsv(URL source, Map<String, String> typeMapping) throws Exception { | ||
|
||
record CsvColumn(String name, Type type, BuilderWrapper builderWrapper) implements Releasable { | ||
void append(String stringValue) { | ||
|
@@ -230,13 +232,13 @@ public void close() { | |
lineNumber++; | ||
} | ||
} | ||
var columnNames = new ArrayList<String>(columns.length); | ||
var pageColumns = new ArrayList<PageColumn>(columns.length); | ||
try { | ||
var blocks = Arrays.stream(columns) | ||
.peek(b -> columnNames.add(b.name)) | ||
.peek(b -> pageColumns.add(new PageColumn(b.name, DataType.fromTypeName(b.type.name())))) | ||
.map(b -> b.builderWrapper.builder().build()) | ||
.toArray(Block[]::new); | ||
return new Tuple<>(new Page(blocks), columnNames); | ||
return new Tuple<>(new Page(blocks), pageColumns); | ||
} finally { | ||
Releasables.closeExpectNoException(columns); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
@timestamp:date,client_ip:ip,event_duration:long,message:keyword,unmapped_message:keyword,unmapped_event_duration:keyword | ||
2024-10-23T13:55:01.543Z,173.21.3.15,1756466,Connected to 10.1.0.1!,Disconnected from 10.1.0.1,1756468 | ||
2024-10-23T13:53:55.832Z,173.21.3.15,5033754,Connection error?,Disconnection error,5033756 | ||
2024-10-23T13:52:55.015Z,173.21.3.15,8268152,Connection error?,Disconnection error,8268154 | ||
2024-10-23T13:51:54.732Z,173.21.3.15,725447,Connection error?,Disconnection error,725449 | ||
2024-10-23T13:33:34.937Z,173.21.0.5,1232381,42,43,1232383 | ||
2024-10-23T12:27:28.948Z,173.21.2.113,2764888,Connected to 10.1.0.2!,Disconnected from 10.1.0.2,2764890 | ||
2024-10-23T12:15:03.360Z,173.21.2.162,3450232,Connected to 10.1.0.3!,Disconnected from 10.1.0.3,3450234 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
{ | ||
"dynamic": "false", | ||
"properties": {} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
{ | ||
"dynamic": "false", | ||
"properties": { | ||
"@timestamp": { | ||
"type": "date" | ||
}, | ||
"client_ip": { | ||
"type": "ip" | ||
}, | ||
"event_duration": { | ||
"type": "long" | ||
}, | ||
"message": { | ||
"type": "keyword" | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.