Skip to content

Commit b9fd27c

Browse files
authored
[ESQL] Cleanup and bug fixes in NDJSON datasource (#143969)
* Cleanup constructors, add settings for the number of lines used for schema inference * Move implicit schema inference to the top-level `NdJsonFormatReader` * Fix schema inference (priorities were wrong) * Fix unexpected value parsing leading to an imbalance in block sizes. A `null` is added. * Fix nested arrays and structures in arrays parsing. All values are flattened. * Fix null column handling. * Use the "strict_date_optional_time" to parse date, to accept both date+time and date only. * Fall back to double for big integers.
1 parent 9846b8e commit b9fd27c

File tree

8 files changed

+576
-172
lines changed

8 files changed

+576
-172
lines changed

docs/changelog/143969.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
area: ES|QL
2+
issues: []
3+
pr: 143969
4+
summary: Cleanup and bug fixes in NDJSON datasource
5+
type: bug

x-pack/plugin/esql-datasource-ndjson/src/main/java/org/elasticsearch/xpack/esql/datasource/ndjson/NdJsonDataSourcePlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,6 @@ public Set<FormatSpec> formatSpecs() {
3232

3333
@Override
3434
public Map<String, FormatReaderFactory> formatReaders(Settings settings) {
35-
return Map.of("ndjson", (s, blockFactory) -> new NdJsonFormatReader(blockFactory));
35+
return Map.of("ndjson", NdJsonFormatReader::new);
3636
}
3737
}

x-pack/plugin/esql-datasource-ndjson/src/main/java/org/elasticsearch/xpack/esql/datasource/ndjson/NdJsonFormatReader.java

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.datasource.ndjson;
99

10+
import org.elasticsearch.common.settings.Settings;
1011
import org.elasticsearch.compute.data.BlockFactory;
1112
import org.elasticsearch.compute.data.Page;
1213
import org.elasticsearch.xpack.esql.core.expression.Attribute;
@@ -27,28 +28,47 @@
2728
*/
2829
public class NdJsonFormatReader implements SegmentableFormatReader {
2930

31+
public static final String SCHEMA_SAMPLE_SIZE_SETTING = "esql.datasource.ndjson.schema_sample_size";
32+
public static final int DEFAULT_SCHEMA_SAMPLE_SIZE = 100;
33+
3034
private final BlockFactory blockFactory;
35+
private final Settings settings;
3136
private final List<Attribute> resolvedSchema;
3237

33-
public NdJsonFormatReader(BlockFactory blockFactory) {
34-
this(blockFactory, null);
35-
}
36-
37-
private NdJsonFormatReader(BlockFactory blockFactory, List<Attribute> resolvedSchema) {
38+
public NdJsonFormatReader(Settings settings, BlockFactory blockFactory, List<Attribute> resolvedSchema) {
3839
this.blockFactory = blockFactory;
40+
this.settings = settings == null ? Settings.EMPTY : settings;
3941
this.resolvedSchema = resolvedSchema;
4042
}
4143

44+
NdJsonFormatReader(Settings settings, BlockFactory blockFactory) {
45+
this(settings, blockFactory, null);
46+
}
47+
4248
@Override
4349
public NdJsonFormatReader withSchema(List<Attribute> schema) {
44-
return new NdJsonFormatReader(blockFactory, schema);
50+
return new NdJsonFormatReader(settings, blockFactory, schema);
51+
}
52+
53+
private List<Attribute> inferSchemaIfNeeded(List<Attribute> attributes, StorageObject object) throws IOException {
54+
if (attributes != null && attributes.isEmpty() == false) {
55+
return attributes;
56+
}
57+
58+
try (var stream = object.newStream()) {
59+
return NdJsonSchemaInferrer.inferSchema(stream, schemaSampleSize(settings));
60+
}
61+
}
62+
63+
private static int schemaSampleSize(Settings settings) {
64+
return settings.getAsInt(SCHEMA_SAMPLE_SIZE_SETTING, DEFAULT_SCHEMA_SAMPLE_SIZE);
4565
}
4666

4767
@Override
4868
public SourceMetadata metadata(StorageObject object) throws IOException {
4969
List<Attribute> schema;
5070
try (var stream = object.newStream()) {
51-
schema = NdJsonSchemaInferrer.inferSchema(stream);
71+
schema = NdJsonSchemaInferrer.inferSchema(stream, schemaSampleSize(settings));
5272
}
5373
return new SimpleSourceMetadata(schema, formatName(), object.path().toString());
5474
}
@@ -64,7 +84,7 @@ public CloseableIterator<Page> read(StorageObject object, FormatReadContext cont
6484
blockFactory,
6585
skipFirstLine,
6686
trimLastPartialLine,
67-
resolvedSchema
87+
inferSchemaIfNeeded(resolvedSchema, object)
6888
);
6989
}
7090

x-pack/plugin/esql-datasource-ndjson/src/main/java/org/elasticsearch/xpack/esql/datasource/ndjson/NdJsonPageDecoder.java

Lines changed: 101 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.fasterxml.jackson.core.JsonParseException;
1111
import com.fasterxml.jackson.core.JsonParser;
1212
import com.fasterxml.jackson.core.JsonToken;
13+
import com.fasterxml.jackson.core.exc.InputCoercionException;
1314
import com.fasterxml.jackson.core.io.JsonEOFException;
1415

1516
import org.apache.lucene.util.BytesRef;
@@ -33,15 +34,14 @@
3334
import java.io.Closeable;
3435
import java.io.IOException;
3536
import java.io.InputStream;
36-
import java.time.Instant;
3737
import java.util.BitSet;
3838
import java.util.HashMap;
3939
import java.util.List;
4040
import java.util.Map;
4141

4242
public class NdJsonPageDecoder implements Closeable {
4343

44-
private static final Logger LOGGER = LogManager.getLogger(NdJsonPageDecoder.class);
44+
private static final Logger logger = LogManager.getLogger(NdJsonPageDecoder.class);
4545

4646
private InputStream input;
4747
private final BlockDecoder decoder;
@@ -64,7 +64,7 @@ public class NdJsonPageDecoder implements Closeable {
6464
this.input = input;
6565

6666
var projectedAttributes = attributes;
67-
if (projectedColumns.isEmpty() == false) {
67+
if (projectedColumns != null && projectedColumns.isEmpty() == false) {
6868
// Keep projected columns in order, adding NULL for missing columns
6969
projectedAttributes = projectedColumns.stream()
7070
.map(
@@ -99,9 +99,9 @@ Page decodePage() throws IOException {
9999
}
100100
} catch (JsonParseException e) {
101101
if (e instanceof JsonEOFException) {
102-
LOGGER.debug("Truncated NDJSON at line {} (expected at split boundaries): {}", lineCount, e.getOriginalMessage());
102+
logger.debug("Truncated NDJSON at line {} (expected at split boundaries): {}", lineCount, e.getOriginalMessage());
103103
} else {
104-
LOGGER.warn("Malformed NDJSON at line {}: {}", lineCount, e.getOriginalMessage());
104+
logger.debug("Malformed NDJSON at line {}: {}", lineCount, e.getOriginalMessage());
105105
}
106106
this.input = NdJsonUtils.moveToNextLine(parser, this.input);
107107
parser = NdJsonUtils.JSON_FACTORY.createParser(this.input);
@@ -112,12 +112,12 @@ Page decodePage() throws IOException {
112112
this.blockTracker.clear();
113113

114114
try {
115-
decoder.decodeObject(parser);
115+
decoder.decodeObject(parser, false);
116116
} catch (JsonParseException e) {
117117
if (e instanceof JsonEOFException) {
118-
LOGGER.debug("Truncated NDJSON at line {} (expected at split boundaries): {}", lineCount, e.getOriginalMessage());
118+
logger.debug("Truncated NDJSON at line {} (expected at split boundaries): {}", lineCount, e.getOriginalMessage());
119119
} else {
120-
LOGGER.warn("Malformed NDJSON at line {}: {}", lineCount, e.getOriginalMessage());
120+
logger.debug("Malformed NDJSON at line {}: {}", lineCount, e.getOriginalMessage());
121121
}
122122
this.input = NdJsonUtils.moveToNextLine(parser, this.input);
123123
parser = NdJsonUtils.JSON_FACTORY.createParser(this.input);
@@ -185,20 +185,22 @@ public void close() throws IOException {
185185
// A tree of decoders. Avoids path reconstruction when traversing nested objects.
186186
private class BlockDecoder {
187187
@Nullable
188-
Attribute attribute;
189-
Block.Builder blockBuilder;
188+
DataType dataType;
189+
String name;
190190
int blockIdx;
191+
Block.Builder blockBuilder;
191192
Map<String, BlockDecoder> children;
192193

193194
void setAttribute(Attribute attribute, int blockIdx) {
194-
this.attribute = attribute;
195+
this.dataType = attribute.dataType();
196+
this.name = attribute.name();
195197
this.blockIdx = blockIdx;
196198
}
197199

198200
// Builders setup independently as we need to create new ones for each page.
199201
void setupBuilders(Block.Builder[] blockBuilders) {
200-
if (attribute != null) {
201-
blockBuilder = switch (attribute.dataType()) {
202+
if (dataType != null) {
203+
blockBuilder = switch (dataType) {
202204
// Keep in sync with NdJsonSchemaInferrer.inferValueSchema
203205
case BOOLEAN -> blockFactory.newBooleanBlockBuilder(batchSize);
204206
case NULL -> new ConstantNullBlock.Builder(blockFactory);
@@ -207,7 +209,7 @@ void setupBuilders(Block.Builder[] blockBuilders) {
207209
case DOUBLE -> blockFactory.newDoubleBlockBuilder(batchSize);
208210
case KEYWORD -> blockFactory.newBytesRefBlockBuilder(batchSize);
209211
case DATETIME -> blockFactory.newLongBlockBuilder(batchSize); // milliseconds since epoch
210-
default -> throw new IllegalArgumentException("Unsupported data type: " + attribute.dataType());
212+
default -> throw new IllegalArgumentException("Unsupported data type: " + dataType);
211213
};
212214
blockBuilders[blockIdx] = blockBuilder;
213215
}
@@ -219,7 +221,7 @@ void setupBuilders(Block.Builder[] blockBuilders) {
219221
}
220222
}
221223

222-
private void decodeObject(JsonParser parser) throws IOException {
224+
private void decodeObject(JsonParser parser, boolean inArray) throws IOException {
223225
JsonToken token = parser.currentToken();
224226
if (token != JsonToken.START_OBJECT) {
225227
throw new NdJsonParseException(parser, "Expected JSON object");
@@ -234,69 +236,125 @@ private void decodeObject(JsonParser parser) throws IOException {
234236
// Unknown field, skip it
235237
parser.skipChildren();
236238
} else {
237-
childDecoder.decodeValue(parser);
239+
childDecoder.decodeValue(parser, inArray);
238240
}
239241
}
240242
}
241243

242-
private void decodeValue(JsonParser parser) throws IOException {
244+
private void beginPositionEntry() {
245+
// We may have DataType.NULL for unknown columns. And NullBlock.Builder throws on beginPositionEntry()
246+
if (blockBuilder != null && dataType != DataType.NULL) {
247+
blockBuilder.beginPositionEntry();
248+
}
249+
if (children != null) {
250+
for (var child : children.values()) {
251+
child.beginPositionEntry();
252+
}
253+
}
254+
}
255+
256+
private void endPositionEntry() {
257+
if (blockBuilder != null && dataType != DataType.NULL) {
258+
blockBuilder.endPositionEntry();
259+
}
260+
if (children != null) {
261+
for (var child : children.values()) {
262+
child.endPositionEntry();
263+
}
264+
}
265+
}
266+
267+
private void decodeValue(JsonParser parser, boolean inArray) throws IOException {
243268
JsonToken token = parser.currentToken();
244-
blockTracker.set(blockIdx);
269+
270+
if (dataType == DataType.NULL) {
271+
// Don't do anything. We must do a single appendNull() on null blocks, this will be done
272+
// at the end of decodePage() when we check that all blocks have moved forward.
273+
parser.skipChildren();
274+
return;
275+
}
276+
245277
if (token == JsonToken.START_ARRAY) {
246-
this.blockBuilder.beginPositionEntry();
278+
// Start a multi-value entry on this decoder and all its children (nested arrays are flattened).
279+
// Note: the `inArray` flag is needed because blockBuilder.beginPositionEntry() is not idempotent.
280+
// Calling it twice implicitly calls endPositionEntry().
281+
if (!inArray) {
282+
beginPositionEntry();
283+
}
247284
while (parser.nextToken() != JsonToken.END_ARRAY) {
248-
decodeValue(parser);
285+
decodeValue(parser, true);
249286
}
250-
this.blockBuilder.endPositionEntry();
287+
if (!inArray) {
288+
endPositionEntry();
289+
}
290+
return;
291+
}
292+
293+
if (token == JsonToken.START_OBJECT) {
294+
decodeObject(parser, inArray);
251295
return;
252296
}
253297

254-
if (token == JsonToken.VALUE_NULL) {
298+
blockTracker.set(blockIdx);
299+
if (token == JsonToken.VALUE_NULL && inArray == false) {
300+
// Nulls in arrays aren't supported. Furthermore, appendNull will implicitly call endPositionEntry()
255301
blockBuilder.appendNull();
256302
return;
257303
}
258304

259-
switch (attribute.dataType()) {
305+
switch (dataType) {
260306
case BOOLEAN -> {
261307
if (token == JsonToken.VALUE_TRUE) {
262308
((BooleanBlock.Builder) blockBuilder).appendBoolean(true);
263309
} else if (token == JsonToken.VALUE_FALSE) {
264310
((BooleanBlock.Builder) blockBuilder).appendBoolean(false);
265311
} else {
266-
unexpectedValue(parser);
312+
unexpectedValue(blockBuilder, parser, inArray);
267313
}
268314
}
269315
case NULL -> {
270316
// NULL handled above
271-
unexpectedValue(parser);
317+
unexpectedValue(blockBuilder, parser, inArray);
272318
}
273319
case INTEGER -> {
274320
if (token == JsonToken.VALUE_NUMBER_INT || token == JsonToken.VALUE_NUMBER_FLOAT) {
275-
((IntBlock.Builder) blockBuilder).appendInt(parser.getIntValue());
321+
try {
322+
((IntBlock.Builder) blockBuilder).appendInt(parser.getIntValue());
323+
} catch (InputCoercionException e) {
324+
unexpectedValue(blockBuilder, parser, inArray);
325+
}
276326
} else {
277-
unexpectedValue(parser);
327+
unexpectedValue(blockBuilder, parser, inArray);
278328
}
279329
}
280330
case LONG -> {
281331
if (token == JsonToken.VALUE_NUMBER_INT || token == JsonToken.VALUE_NUMBER_FLOAT) {
282-
((LongBlock.Builder) blockBuilder).appendLong(parser.getLongValue());
332+
try {
333+
((LongBlock.Builder) blockBuilder).appendLong(parser.getLongValue());
334+
} catch (InputCoercionException e) {
335+
unexpectedValue(blockBuilder, parser, inArray);
336+
}
283337
} else {
284-
unexpectedValue(parser);
338+
unexpectedValue(blockBuilder, parser, inArray);
285339
}
286340
}
287341
case DOUBLE -> {
288342
if (token == JsonToken.VALUE_NUMBER_INT || token == JsonToken.VALUE_NUMBER_FLOAT) {
289-
((DoubleBlock.Builder) blockBuilder).appendDouble(parser.getDoubleValue());
343+
try {
344+
((DoubleBlock.Builder) blockBuilder).appendDouble(parser.getDoubleValue());
345+
} catch (InputCoercionException e) {
346+
unexpectedValue(blockBuilder, parser, inArray);
347+
}
290348
} else {
291-
unexpectedValue(parser);
349+
unexpectedValue(blockBuilder, parser, inArray);
292350
}
293351
}
294352
case DATETIME -> {
295353
try {
296-
var millis = Instant.parse(parser.getValueAsString()).toEpochMilli();
354+
var millis = NdJsonSchemaInferrer.DATE_FORMATTER.parseMillis(parser.getValueAsString());
297355
((LongBlock.Builder) blockBuilder).appendLong(millis);
298356
} catch (Exception e) {
299-
unexpectedValue(parser);
357+
unexpectedValue(blockBuilder, parser, inArray);
300358
}
301359
}
302360
case KEYWORD -> {
@@ -305,20 +363,21 @@ private void decodeValue(JsonParser parser) throws IOException {
305363
if (str != null) {
306364
((BytesRefBlock.Builder) blockBuilder).appendBytesRef(new BytesRef(str));
307365
} else {
308-
unexpectedValue(parser);
366+
unexpectedValue(blockBuilder, parser, inArray);
309367
}
310368
}
311-
default -> throw new IllegalArgumentException("Unsupported data type: " + attribute.dataType());
369+
default -> throw new IllegalArgumentException("Unsupported data type: " + dataType);
312370
}
313371
}
314372

315-
private void unexpectedValue(JsonParser parser) throws IOException {
316-
LOGGER.warn(
317-
"Unexpected token type: {} for attribute: {} at {}",
318-
parser.currentToken(),
319-
attribute.name(),
320-
parser.getTokenLocation()
321-
);
373+
private void unexpectedValue(Block.Builder builder, JsonParser parser, boolean inArray) throws IOException {
374+
// Append a null and log the problem
375+
if (inArray == false) {
376+
// See previous comment about nulls and arrays
377+
builder.appendNull();
378+
}
379+
380+
logger.debug("Unexpected token type: {} for attribute: {} at {}", parser.currentToken(), name, parser.getTokenLocation());
322381
// Ignore any children to keep reading other values
323382
parser.skipChildren();
324383
}

0 commit comments

Comments
 (0)