diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/NestedDataFormatsTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/NestedDataFormatsTest.java new file mode 100644 index 000000000000..2f892e86d02f --- /dev/null +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/NestedDataFormatsTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.indexing; + +import org.apache.druid.indexing.common.task.TaskBuilder; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.nested.NestedCommonFormatColumnFormatSpec; +import org.apache.druid.segment.nested.ObjectStorageEncoding; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedClusterApis; +import org.apache.druid.testing.embedded.EmbeddedCoordinator; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedHistorical; +import org.apache.druid.testing.embedded.EmbeddedIndexer; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.EmbeddedRouter; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +/** + * Embedded tests for nested data, ingested in different {@link NestedCommonFormatColumnFormatSpec}. + */ +public class NestedDataFormatsTest extends EmbeddedClusterTestBase +{ + private final EmbeddedBroker broker = new EmbeddedBroker(); + private final EmbeddedOverlord overlord = new EmbeddedOverlord(); + private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator(); + + private final String datasourceWithDefaultFormat = EmbeddedClusterApis.createTestDatasourceName(); + + @Override + protected EmbeddedDruidCluster createCluster() + { + return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper() + .useLatchableEmitter() + .useDefaultTimeoutForLatchableEmitter(120) + .addServer(overlord) + .addServer(coordinator) + .addServer(new EmbeddedIndexer()) + .addServer(new EmbeddedHistorical()) + .addServer(broker) + .addServer(new EmbeddedRouter()); + } + + @BeforeAll + protected void ingestWithDefaultFormat() + { + final TaskBuilder.IndexParallel indexTask = + TaskBuilder.ofTypeIndexParallel() + .dataSource(datasourceWithDefaultFormat) + .timestampColumn("timestamp") + .jsonInputFormat() + .inputSource(Resources.HttpData.kttmNested1Day()) + .schemaDiscovery(); + + final String taskId = EmbeddedClusterApis.newTaskId(datasourceWithDefaultFormat); + cluster.callApi().runTask(indexTask.withId(taskId), overlord); + cluster.callApi().waitForAllSegmentsToBeAvailable(datasourceWithDefaultFormat, coordinator, broker); + } + + @Test + public void test_objectStorageEncoding() + { + // Ingest kttm data with skipping smile raw json format, comparing diff with defaultFormat + NestedCommonFormatColumnFormatSpec spec = + NestedCommonFormatColumnFormatSpec.builder().setObjectStorageEncoding(ObjectStorageEncoding.NONE).build(); + final TaskBuilder.IndexParallel indexTask = + TaskBuilder.ofTypeIndexParallel() + .dataSource(dataSource) + .timestampColumn("timestamp") + .jsonInputFormat() + .inputSource(Resources.HttpData.kttmNested1Day()) + .schemaDiscovery() + .tuningConfig(t -> t.withIndexSpec(IndexSpec.builder().withAutoColumnFormatSpec(spec).build())); + final String taskId = EmbeddedClusterApis.newTaskId(dataSource); + cluster.callApi().runTask(indexTask.withId(taskId), overlord); + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); + + // Test ingesting with skipping raw json smile format works, same row count, with ~20% storage saving + final String metadataSql = "select sum(num_rows), sum(size) from sys.segments where datasource = '%s'"; + final String defaultFormatResult = cluster.runSql(metadataSql, datasourceWithDefaultFormat); + final String noneObjectStorageFormatResult = cluster.runSql(metadataSql, dataSource); + Assertions.assertEquals(StringUtils.format("%d,%d", 465_346, 53_000_804), defaultFormatResult); + Assertions.assertEquals(StringUtils.format("%d,%d", 465_346, 41_938_750), noneObjectStorageFormatResult); + + // Test querying on a nested field works + final String groupByQuery = + """ + select json_value(event, '$.type') as event_type, count(*) as total from %s + group by 1 order by 2 desc, 1 asc limit 10 + """; + final String queryResultDefaultFormat = cluster.runSql(groupByQuery, datasourceWithDefaultFormat); + final String queryResultNoneObjectStorage = cluster.runSql(groupByQuery, dataSource); + Assertions.assertEquals(queryResultDefaultFormat, queryResultNoneObjectStorage); + + // Test reconstruct json column works, the ordering of the fields has changed, but all values are perserved. + final String scanQuery = + """ + select event, to_json_string(agent) as agent from %s + where json_value(event, '$.type') = 'PercentClear' and json_value(agent, '$.os') = 'Android' + order by __time asc limit 1 + """; + final String scanQueryResultDefaultFormat = cluster.runSql(scanQuery, datasourceWithDefaultFormat); + final String scanQueryResultNoneObjectStorage = cluster.runSql(scanQuery, dataSource); + // CHECKSTYLE: text blocks not supported in current Checkstyle version + Assertions.assertEquals( + """ + "{""type"":""PercentClear"",""percentage"":85}","{""type"":""Mobile Browser"",""category"":""Smartphone"",""browser"":""Chrome Mobile"",""browser_version"":""50.0.2661.89"",""os"":""Android"",""platform"":""Android""}" + """.trim(), + scanQueryResultDefaultFormat + ); + Assertions.assertEquals( + """ + "{""percentage"":85,""type"":""PercentClear""}","{""browser"":""Chrome Mobile"",""browser_version"":""50.0.2661.89"",""category"":""Smartphone"",""os"":""Android"",""platform"":""Android"",""type"":""Mobile Browser""}" + """.trim(), + scanQueryResultNoneObjectStorage + ); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskBuilder.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskBuilder.java index 15f0d62ac45d..31a8d783603e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskBuilder.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskBuilder.java @@ -62,9 +62,9 @@ * explicitly. * * @param Type of this builder itself - * @param Type of tuning config used by this builder. - * @param Type of task created by this builder. - * @param Type of tuning config builder + * @param Type of tuning config used by this builder. + * @param Type of task created by this builder. + * @param Type of tuning config builder * @see #ofTypeIndex() * @see #tuningConfig(Consumer) to specify the {@code tuningConfig}. */ @@ -286,6 +286,12 @@ public Self dimensions(String... dimensions) return (Self) this; } + public Self schemaDiscovery() + { + dataSchema.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build()); + return (Self) this; + } + public Self metricAggregates(AggregatorFactory... aggregators) { dataSchema.withAggregators(aggregators); diff --git a/pom.xml b/pom.xml index 13ce80b8f2ab..55c65a262d41 100644 --- a/pom.xml +++ b/pom.xml @@ -70,7 +70,7 @@ - 11 + 17 ${java.version} UTF-8 0.9.0.M2 @@ -1572,7 +1572,7 @@ true true - *com/fasterxml/jackson/databind/* + *com/fasterxml/jackson/databind/*,**/NestedDataFormatsTest.java diff --git a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java index 11b538530bdd..f727bc7c97b5 100644 --- a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java +++ b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java @@ -41,7 +41,6 @@ import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.http.ClientSqlQuery; import org.apache.druid.rpc.indexing.OverlordClient; -import org.apache.druid.segment.TestDataSource; import org.apache.druid.segment.TestHelper; import org.apache.druid.server.metrics.LatchableEmitter; import org.apache.druid.sql.http.ResultFormat; @@ -412,11 +411,11 @@ public SupervisorStatus getSupervisorStatus(String supervisorId) // STATIC UTILITY METHODS /** - * Creates a random datasource name prefixed with {@link TestDataSource#WIKI}. + * Creates a random datasource name prefixed with {@code datasource_}. */ public static String createTestDatasourceName() { - return TestDataSource.WIKI + "_" + IdUtils.getRandomId(); + return "datasource_" + IdUtils.getRandomId(); } /** diff --git a/services/src/test/java/org/apache/druid/testing/embedded/indexing/Resources.java b/services/src/test/java/org/apache/druid/testing/embedded/indexing/Resources.java index 1e707aa88354..9070a90ac9d4 100644 --- a/services/src/test/java/org/apache/druid/testing/embedded/indexing/Resources.java +++ b/services/src/test/java/org/apache/druid/testing/embedded/indexing/Resources.java @@ -131,6 +131,23 @@ public static HttpInputSource wikipedia1Day() throw new RuntimeException(e); } } + + public static HttpInputSource kttmNested1Day() + { + try { + return new HttpInputSource( + List.of(new URIBuilder("https://static.imply.io/example-data/kttm-nested-v2/kttm-nested-v2-2019-08-25.json.gz").build()), + null, + null, + null, + null, + new HttpInputSourceConfig(null, null) + ); + } + catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } } /**