Skip to content

Commit 6770b79

Browse files
committed
Add block loader from stored field and source for ip field
1 parent 5ad383f commit 6770b79

File tree

13 files changed

+280
-7
lines changed

13 files changed

+280
-7
lines changed

server/src/main/java/org/elasticsearch/index/mapper/BlockSourceReader.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.index.mapper;
1111

12+
import org.apache.lucene.document.InetAddressPoint;
1213
import org.apache.lucene.index.LeafReaderContext;
1314
import org.apache.lucene.index.PostingsEnum;
1415
import org.apache.lucene.index.SortedSetDocValues;
@@ -20,6 +21,7 @@
2021
import org.elasticsearch.search.fetch.StoredFieldsSpec;
2122

2223
import java.io.IOException;
24+
import java.net.InetAddress;
2325
import java.util.ArrayList;
2426
import java.util.List;
2527
import java.util.Objects;
@@ -381,6 +383,46 @@ public String toString() {
381383
}
382384
}
383385

386+
/**
387+
* Load {@code ip}s from {@code _source}.
388+
*/
389+
public static class IpsBlockLoader extends SourceBlockLoader {
390+
public IpsBlockLoader(ValueFetcher fetcher, LeafIteratorLookup lookup) {
391+
super(fetcher, lookup);
392+
}
393+
394+
@Override
395+
public Builder builder(BlockFactory factory, int expectedCount) {
396+
return factory.bytesRefs(expectedCount);
397+
}
398+
399+
@Override
400+
public RowStrideReader rowStrideReader(LeafReaderContext context, DocIdSetIterator iter) {
401+
return new Ips(fetcher, iter);
402+
}
403+
404+
@Override
405+
protected String name() {
406+
return "Ips";
407+
}
408+
}
409+
410+
private static class Ips extends BlockSourceReader {
411+
Ips(ValueFetcher fetcher, DocIdSetIterator iter) {
412+
super(fetcher, iter);
413+
}
414+
415+
@Override
416+
protected void append(BlockLoader.Builder builder, Object v) {
417+
((BlockLoader.BytesRefBuilder) builder).appendBytesRef(new BytesRef(InetAddressPoint.encode((InetAddress) v)));
418+
}
419+
420+
@Override
421+
public String toString() {
422+
return "BlockSourceReader.Ips";
423+
}
424+
}
425+
384426
/**
385427
* Convert a {@link String} into a utf-8 {@link BytesRef}.
386428
*/

server/src/main/java/org/elasticsearch/index/mapper/IpFieldMapper.java

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
4444
import org.elasticsearch.search.lookup.FieldValues;
4545
import org.elasticsearch.search.lookup.SearchLookup;
46+
import org.elasticsearch.xcontent.XContentParser;
4647

4748
import java.io.IOException;
4849
import java.net.InetAddress;
@@ -51,8 +52,10 @@
5152
import java.util.Arrays;
5253
import java.util.Collection;
5354
import java.util.Collections;
55+
import java.util.List;
5456
import java.util.Map;
5557
import java.util.Objects;
58+
import java.util.Set;
5659
import java.util.function.BiFunction;
5760

5861
import static org.elasticsearch.index.mapper.FieldArrayContext.getOffsetsFieldName;
@@ -213,7 +216,8 @@ public IpFieldMapper build(MapperBuilderContext context) {
213216
parseNullValue(),
214217
scriptValues(),
215218
meta.getValue(),
216-
dimension.getValue()
219+
dimension.getValue(),
220+
context.isSourceSynthetic()
217221
),
218222
builderParams(this, context),
219223
context.isSourceSynthetic(),
@@ -234,6 +238,7 @@ public static final class IpFieldType extends SimpleMappedFieldType {
234238
private final InetAddress nullValue;
235239
private final FieldValues<InetAddress> scriptValues;
236240
private final boolean isDimension;
241+
private final boolean isSyntheticSource;
237242

238243
public IpFieldType(
239244
String name,
@@ -243,12 +248,14 @@ public IpFieldType(
243248
InetAddress nullValue,
244249
FieldValues<InetAddress> scriptValues,
245250
Map<String, String> meta,
246-
boolean isDimension
251+
boolean isDimension,
252+
boolean isSyntheticSource
247253
) {
248254
super(name, indexed, stored, hasDocValues, TextSearchInfo.SIMPLE_MATCH_WITHOUT_TERMS, meta);
249255
this.nullValue = nullValue;
250256
this.scriptValues = scriptValues;
251257
this.isDimension = isDimension;
258+
this.isSyntheticSource = isSyntheticSource;
252259
}
253260

254261
public IpFieldType(String name) {
@@ -260,7 +267,7 @@ public IpFieldType(String name, boolean isIndexed) {
260267
}
261268

262269
public IpFieldType(String name, boolean isIndexed, boolean hasDocValues) {
263-
this(name, isIndexed, false, hasDocValues, null, null, Collections.emptyMap(), false);
270+
this(name, isIndexed, false, hasDocValues, null, null, Collections.emptyMap(), false, false);
264271
}
265272

266273
@Override
@@ -455,7 +462,75 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) {
455462
if (hasDocValues()) {
456463
return new BlockDocValuesReader.BytesRefsFromOrdsBlockLoader(name());
457464
}
458-
return null;
465+
466+
if (isStored()) {
467+
return new BlockStoredFieldsReader.BytesFromBytesRefsBlockLoader(name());
468+
}
469+
470+
if (isSyntheticSource) {
471+
return blockLoaderFromFallbackSyntheticSource(blContext);
472+
}
473+
474+
BlockSourceReader.LeafIteratorLookup lookup = isIndexed()
475+
? BlockSourceReader.lookupFromFieldNames(blContext.fieldNames(), name())
476+
: BlockSourceReader.lookupMatchingAll();
477+
return new BlockSourceReader.IpsBlockLoader(sourceValueFetcher(blContext.sourcePaths(name())), lookup);
478+
}
479+
480+
private BlockLoader blockLoaderFromFallbackSyntheticSource(BlockLoaderContext blContext) {
481+
var reader = new FallbackSyntheticSourceBlockLoader.SingleValueReader<InetAddress>(nullValue) {
482+
@Override
483+
public void convertValue(Object value, List<InetAddress> accumulator) {
484+
if (value instanceof InetAddress ia) {
485+
accumulator.add(ia);
486+
}
487+
488+
try {
489+
var address = InetAddresses.forString(value.toString());
490+
accumulator.add(address);
491+
} catch (Exception e) {
492+
// Malformed value, skip it
493+
}
494+
}
495+
496+
@Override
497+
protected void parseNonNullValue(XContentParser parser, List<InetAddress> accumulator) throws IOException {
498+
// aligned with #parseCreateField()
499+
String value = parser.text();
500+
501+
try {
502+
var address = InetAddresses.forString(value);
503+
accumulator.add(address);
504+
} catch (Exception e) {
505+
// Malformed value, skip it
506+
}
507+
}
508+
509+
@Override
510+
public void writeToBlock(List<InetAddress> values, BlockLoader.Builder blockBuilder) {
511+
var bytesRefBuilder = (BlockLoader.BytesRefBuilder) blockBuilder;
512+
513+
for (var value : values) {
514+
bytesRefBuilder.appendBytesRef(new BytesRef(InetAddressPoint.encode(value)));
515+
}
516+
}
517+
};
518+
519+
return new FallbackSyntheticSourceBlockLoader(reader, name()) {
520+
@Override
521+
public Builder builder(BlockFactory factory, int expectedCount) {
522+
return factory.bytesRefs(expectedCount);
523+
}
524+
};
525+
}
526+
527+
private SourceValueFetcher sourceValueFetcher(Set<String> sourcePaths) {
528+
return new SourceValueFetcher(sourcePaths, nullValue) {
529+
@Override
530+
public InetAddress parseSourceValue(Object value) {
531+
return parse(value);
532+
}
533+
};
459534
}
460535

461536
@Override

server/src/test/java/org/elasticsearch/index/mapper/IpFieldTypeTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ public void testTermQuery() {
105105
null,
106106
null,
107107
Collections.emptyMap(),
108+
false,
108109
false
109110
);
110111
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> unsearchable.termQuery("::1", MOCK_CONTEXT));
@@ -339,6 +340,7 @@ public void testRangeQuery() {
339340
null,
340341
null,
341342
Collections.emptyMap(),
343+
false,
342344
false
343345
);
344346
IllegalArgumentException e = expectThrows(

server/src/test/java/org/elasticsearch/index/mapper/blockloader/IpFieldBlockLoaderTests.java

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,64 @@
99

1010
package org.elasticsearch.index.mapper.blockloader;
1111

12+
import org.apache.lucene.document.InetAddressPoint;
13+
import org.apache.lucene.util.BytesRef;
14+
import org.elasticsearch.common.network.InetAddresses;
1215
import org.elasticsearch.index.mapper.BlockLoaderTestCase;
16+
import org.elasticsearch.logsdb.datageneration.FieldType;
1317

18+
import java.util.List;
1419
import java.util.Map;
20+
import java.util.Objects;
1521

1622
public class IpFieldBlockLoaderTests extends BlockLoaderTestCase {
17-
protected IpFieldBlockLoaderTests(Params params) {
18-
super(null, params);
23+
public IpFieldBlockLoaderTests(Params params) {
24+
super(FieldType.IP.toString(), params);
1925
}
2026

2127
@Override
28+
@SuppressWarnings("unchecked")
2229
protected Object expected(Map<String, Object> fieldMapping, Object value, TestContext testContext) {
30+
var rawNullValue = (String) fieldMapping.get("null_value");
31+
BytesRef nullValue = convert(rawNullValue, null);
32+
33+
if (value == null) {
34+
return convert(null, nullValue);
35+
}
36+
if (value instanceof String s) {
37+
return convert(s, nullValue);
38+
}
39+
40+
if (hasDocValues(fieldMapping, true)) {
41+
var resultList = ((List<String>) value).stream()
42+
.map(v -> convert(v, nullValue))
43+
.filter(Objects::nonNull)
44+
.distinct()
45+
.sorted()
46+
.toList();
47+
return maybeFoldList(resultList);
48+
}
49+
50+
// field is stored or using source
51+
var resultList = ((List<String>) value).stream().map(v -> convert(v, nullValue)).filter(Objects::nonNull).toList();
52+
return maybeFoldList(resultList);
53+
}
54+
55+
private static BytesRef convert(Object value, BytesRef nullValue) {
56+
if (value == null) {
57+
return nullValue;
58+
}
59+
60+
if (value instanceof String s) {
61+
try {
62+
var address = InetAddresses.forString(s);
63+
return new BytesRef(InetAddressPoint.encode(address));
64+
} catch (Exception ex) {
65+
// malformed
66+
return null;
67+
}
68+
}
69+
2370
return null;
2471
}
2572
}

server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1328,6 +1328,7 @@ public void testIpField() throws Exception {
13281328
null,
13291329
null,
13301330
Collections.emptyMap(),
1331+
false,
13311332
false
13321333
);
13331334
testCase(iw -> {

test/framework/src/main/java/org/elasticsearch/logsdb/datageneration/FieldType.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.logsdb.datageneration.fields.leaf.GeoPointFieldDataGenerator;
2020
import org.elasticsearch.logsdb.datageneration.fields.leaf.HalfFloatFieldDataGenerator;
2121
import org.elasticsearch.logsdb.datageneration.fields.leaf.IntegerFieldDataGenerator;
22+
import org.elasticsearch.logsdb.datageneration.fields.leaf.IpFieldDataGenerator;
2223
import org.elasticsearch.logsdb.datageneration.fields.leaf.KeywordFieldDataGenerator;
2324
import org.elasticsearch.logsdb.datageneration.fields.leaf.LongFieldDataGenerator;
2425
import org.elasticsearch.logsdb.datageneration.fields.leaf.ScaledFloatFieldDataGenerator;
@@ -44,7 +45,8 @@ public enum FieldType {
4445
BOOLEAN("boolean"),
4546
DATE("date"),
4647
GEO_POINT("geo_point"),
47-
TEXT("text");
48+
TEXT("text"),
49+
IP("ip");
4850

4951
private final String name;
5052

@@ -69,6 +71,7 @@ public FieldDataGenerator generator(String fieldName, DataSource dataSource) {
6971
case DATE -> new DateFieldDataGenerator(dataSource);
7072
case GEO_POINT -> new GeoPointFieldDataGenerator(dataSource);
7173
case TEXT -> new TextFieldDataGenerator(dataSource);
74+
case IP -> new IpFieldDataGenerator(dataSource);
7275
};
7376
}
7477

@@ -89,6 +92,7 @@ public static FieldType tryParse(String name) {
8992
case "date" -> FieldType.DATE;
9093
case "geo_point" -> FieldType.GEO_POINT;
9194
case "text" -> FieldType.TEXT;
95+
case "ip" -> FieldType.IP;
9296
default -> null;
9397
};
9498
}

test/framework/src/main/java/org/elasticsearch/logsdb/datageneration/datasource/DataSourceHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ default DataSourceResponse.PointGenerator handle(DataSourceRequest.PointGenerato
7474
return null;
7575
}
7676

77+
default DataSourceResponse.IpGenerator handle(DataSourceRequest.IpGenerator request) {
78+
return null;
79+
}
80+
7781
default DataSourceResponse.NullWrapper handle(DataSourceRequest.NullWrapper request) {
7882
return null;
7983
}

test/framework/src/main/java/org/elasticsearch/logsdb/datageneration/datasource/DataSourceRequest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,12 @@ public DataSourceResponse.PointGenerator accept(DataSourceHandler handler) {
120120
}
121121
}
122122

123+
record IpGenerator() implements DataSourceRequest<DataSourceResponse.IpGenerator> {
124+
public DataSourceResponse.IpGenerator accept(DataSourceHandler handler) {
125+
return handler.handle(this);
126+
}
127+
}
128+
123129
record NullWrapper() implements DataSourceRequest<DataSourceResponse.NullWrapper> {
124130
public DataSourceResponse.NullWrapper accept(DataSourceHandler handler) {
125131
return handler.handle(this);

test/framework/src/main/java/org/elasticsearch/logsdb/datageneration/datasource/DataSourceResponse.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.elasticsearch.geometry.Geometry;
1313

14+
import java.net.InetAddress;
1415
import java.time.Instant;
1516
import java.util.Map;
1617
import java.util.Optional;
@@ -50,6 +51,8 @@ record PointGenerator(Supplier<Object> generator) implements DataSourceResponse
5051

5152
record GeoPointGenerator(Supplier<Object> generator) implements DataSourceResponse {}
5253

54+
record IpGenerator(Supplier<InetAddress> generator) implements DataSourceResponse {}
55+
5356
record NullWrapper(Function<Supplier<Object>, Supplier<Object>> wrapper) implements DataSourceResponse {}
5457

5558
record ArrayWrapper(Function<Supplier<Object>, Supplier<Object>> wrapper) implements DataSourceResponse {}

0 commit comments

Comments
 (0)