Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,11 @@
import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
import org.apache.druid.msq.indexing.error.InvalidNullByteFault;
import org.apache.druid.msq.querykit.scan.ExternalColumnSelectorFactory;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
Expand Down Expand Up @@ -316,9 +314,9 @@ public void testIngestWithSanitizedNullByteUsingContextParameter() throws IOExce
}

@Test
public void testMultiValueStringWithIncorrectType() throws IOException
public void testCannotParseJson() throws IOException
{
final File toRead = getResourceAsTemporaryFile("/unparseable-mv-string-array.json");
final File toRead = getResourceAsTemporaryFile("/not-json.txt");
final String toReadAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());

RowSignature rowSignature = RowSignature.builder()
Expand All @@ -337,25 +335,20 @@ public void testMultiValueStringWithIncorrectType() throws IOException

testSelectQuery()
.setSql("WITH\n"
+ "kttm_data AS (\n"
+ "ext AS (\n"
+ "SELECT * FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{ \"files\": [" + toReadAsJson + "],\"type\":\"local\"}',\n"
+ " '{\"type\":\"json\"}',\n"
+ " '[{\"name\":\"timestamp\",\"type\":\"string\"},{\"name\":\"agent_category\",\"type\":\"string\"},{\"name\":\"agent_type\",\"type\":\"string\"},{\"name\":\"browser\",\"type\":\"string\"},{\"name\":\"browser_version\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string\"},{\"name\":\"continent\",\"type\":\"string\"},{\"name\":\"country\",\"type\":\"string\"},{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"event_type\",\"type\":\"string\"},{\"name\":\"event_subtype\",\"type\":\"string\"},{\"name\":\"loaded_image\",\"type\":\"string\"},{\"name\":\"adblock_list\",\"type\":\"string\"},{\"name\":\"forwarded_for\",\"type\":\"string\"},{\"name\":\"language\",\"type\":\"string\"},{\"name\":\"number\",\"type\":\"long\"},{\"name\":\"os\",\"type\":\"string\"},{\"name\":\"path\",\"type\":\"string\"},{\"name\":\"platform\",\"type\":\"string\"},{\"name\":\"referrer\",\"type\":\"string\"},{\"name\":\"referrer_host\",\"type\":\"string\"},{\"name\":\"region\",\"type\":\"string\"},{\"name\":\"remote_address\",\"type\":\"string\"},{\"name\":\"screen\",\"type\":\"string\"},{\"name\":\"session\",\"type\":\"string\"},{\"name\":\"session_length\",\"type\":\"long\"},{\"name\":\"timezone\",\"type\":\"string\"},{\"name\":\"timezone_offset\",\"type\":\"long\"},{\"name\":\"window\",\"type\":\"string\"}]'\n"
+ " '[{\"name\":\"timestamp\",\"type\":\"string\"},{\"name\":\"thisRow\",\"type\":\"string\"}]'\n"
+ " )\n"
+ "))\n"
+ "\n"
+ "SELECT\n"
+ " FLOOR(TIME_PARSE(\"timestamp\") TO MINUTE) AS __time,\n"
+ " MV_TO_ARRAY(\"language\") AS \"language\"\n"
+ "FROM kttm_data")
+ " TIME_PARSE(\"timestamp\") AS __time,\n"
+ " thisRow\n"
+ "FROM ext")
.setExpectedRowSignature(rowSignature)
.setExpectedResultRows(ImmutableList.of(
new Object[]{1566691200000L, ImmutableList.of("en")},
new Object[]{1566691200000L, ImmutableList.of("en", "es", "es-419", "es-MX")},
new Object[]{1566691200000L, ImmutableList.of("en", "es", "es-419", "es-US")}
))
.setExpectedMSQSpec(
LegacyMSQSpec
.builder()
Expand All @@ -370,14 +363,8 @@ public void testMultiValueStringWithIncorrectType() throws IOException
.build())
.setExpectedMSQFault(
new CannotParseExternalDataFault(
ExternalColumnSelectorFactory
.createException(
new Exception("dummy"),
"v1",
new LocalInputSource(null, null, ImmutableList.of(toRead), SystemFields.none()),
new SimpleAscendingOffset(Integer.MAX_VALUE)
)
.getMessage()
"Unable to parse row [this row is not json] "
+ "(Path: file:" + toRead.getAbsolutePath() + ", Record: 3, Line: 3)"
)
)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
Expand Down
3 changes: 3 additions & 0 deletions multi-stage-query/src/test/resources/not-json.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"timestamp":"2000-01-01","thisRow":"isJson"}
{"timestamp":"2000-01-01","thisRow":"isAlsoJson"}
this row is not json

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,15 @@
package org.apache.druid.segment;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.primitives.Doubles;
import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.data.input.Rows;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.math.expr.ExprType;
import org.apache.druid.math.expr.ExpressionType;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.filter.DruidObjectPredicate;
Expand Down Expand Up @@ -75,8 +83,7 @@ public RowBasedColumnSelectorFactory(
this.rowSupplier = rowSupplier;
this.rowIdSupplier = rowIdSupplier;
this.adapter = adapter;
this.columnInspector =
Preconditions.checkNotNull(columnInspector, "columnInspector must be nonnull");
this.columnInspector = Preconditions.checkNotNull(columnInspector, "columnInspector must be nonnull");
this.throwParseExceptions = throwParseExceptions;
this.useStringValueOfNullInLists = useStringValueOfNullInLists;
}
Expand Down Expand Up @@ -336,6 +343,45 @@ public Class classOfObject()
return Object.class;
}

@Override
public boolean isNull()
{
updateCurrentValues();
return DimensionHandlerUtils.isNumericNull(getObject());
}

@Override
public float getFloat()
{
updateCurrentValues();
return (float) getDouble();
}

@Override
public double getDouble()
{
updateCurrentValues();

// Below is safe since isNull() returned true.
final String str = Iterables.getOnlyElement(dimensionValues);
return Doubles.tryParse(str);
}

@Override
public long getLong()
{
updateCurrentValues();

// Below is safe since isNull() returned true.
final String str = Iterables.getOnlyElement(dimensionValues);
final Long n = GuavaUtils.tryParseLong(str);
if (n != null) {
return n;
} else {
return (long) getDouble();
}
}

@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
Expand Down Expand Up @@ -412,6 +458,8 @@ private void updateCurrentValues()
@Override
public ColumnValueSelector<?> makeColumnValueSelector(String columnName)
{
final ExpressionType expressionType = columnInspector.getType(columnName);

if (columnName.equals(ColumnHolder.TIME_COLUMN_NAME)) {
final ToLongFunction<T> timestampFunction = adapter.timestampFunction();

Expand All @@ -437,6 +485,8 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
}
}
return new TimeLongColumnSelector();
} else if (expressionType != null && expressionType.is(ExprType.STRING)) {
return makeDimensionSelector(DefaultDimensionSpec.of(columnName));
} else {
final Function<T, Object> columnFunction = adapter.columnFunction(columnName);
final ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(columnName);
Expand Down Expand Up @@ -488,7 +538,33 @@ public long getLong()
public Object getObject()
{
updateCurrentValue();
return currentValue;

if (expressionType != null && !expressionType.is(ExprType.COMPLEX)) {
try {
final Object val = ExprEval.bestEffortOf(currentValue).castTo(expressionType).value();
if (val != null && expressionType.is(ExprType.DOUBLE) && numberType == ValueType.FLOAT) {
// Adjustment for FLOAT. Expressions don't speak float, so we need to cast it ourselves.
return ((Number) val).floatValue();
} else {
return val;
}
}
catch (Exception e) {
if (throwParseExceptions) {
throw new ParseException(
String.valueOf(currentValue),
"Error reading column[%s] as type[%s]",
columnName,
expressionType
);
} else {
// if !throwParseExceptions, return the original uncasted value and hope for the best.
return currentValue;
}
}
} else {
return currentValue;
}
}

@Override
Expand Down Expand Up @@ -557,4 +633,14 @@ public ColumnCapabilities getColumnCapabilities(String columnName)
{
return getColumnCapabilities(columnInspector, columnName);
}

/**
* Determines whether the provided object should be coerced using the provided type. Generally this is true,
* except for STRING type with List objects. This allows multi-value strings to be passed through without being
* coereced by the expression engine, which would turn arrays into nulls.
*/
private static boolean shouldCoerce(@Nullable final Object obj, @Nullable final ExpressionType expressionType)
{
return obj != null && expressionType != null && !(expressionType.is(ExprType.STRING) && obj instanceof List);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public class IterableRowsCursorHelperTest
{

List<Object[]> rows = ImmutableList.of(
new Object[]{1, "a"},
new Object[]{3, "b"},
new Object[]{2, "b"}
new Object[]{1L, "a"},
new Object[]{3L, "b"},
new Object[]{2L, "b"}
);

RowSignature rowSignature = RowSignature.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,6 @@ private List<Integer> runQuery(final ScanQuery query, final QueryRunner<ScanResu
brokerRunner.run(QueryPlus.wrap(query))
).toList();

return results.stream().mapToInt(row -> (int) row[1]).boxed().collect(Collectors.toList());
return results.stream().mapToInt(row -> ((Number) row[1]).intValue()).boxed().collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -717,10 +717,10 @@ public void testGroupByOnScanMultiValue()
query.withDataSource(
InlineDataSource.fromIterable(
ImmutableList.of(
new Object[]{ImmutableList.of("a", "b"), 1},
new Object[]{ImmutableList.of("a", "c"), 2},
new Object[]{ImmutableList.of("b"), 3},
new Object[]{ImmutableList.of("c"), 4}
new Object[]{List.of("a", "b"), 1L},
new Object[]{List.of("a", "c"), 2L},
new Object[]{"b", 3L},
new Object[]{"c", 4L}
),
RowSignature.builder().add("s", null).add("n", null).build()
)
Expand Down Expand Up @@ -767,10 +767,10 @@ public void testTopNScanMultiValue()
query.withDataSource(
InlineDataSource.fromIterable(
ImmutableList.of(
new Object[]{ImmutableList.of("a", "b"), 1},
new Object[]{ImmutableList.of("a", "c"), 2},
new Object[]{ImmutableList.of("b"), 3},
new Object[]{ImmutableList.of("c"), 4}
new Object[]{List.of("a", "b"), 1L},
new Object[]{List.of("a", "c"), 2L},
new Object[]{"b", 3L},
new Object[]{"c", 4L}
),
RowSignature.builder().add("s", null).add("n", null).build()
)
Expand Down
Loading