|
| 1 | +/* |
| 2 | + * Licensed to the Apache Software Foundation (ASF) under one |
| 3 | + * or more contributor license agreements. See the NOTICE file |
| 4 | + * distributed with this work for additional information |
| 5 | + * regarding copyright ownership. The ASF licenses this file |
| 6 | + * to you under the Apache License, Version 2.0 (the |
| 7 | + * "License"); you may not use this file except in compliance |
| 8 | + * with the License. You may obtain a copy of the License at |
| 9 | + * |
| 10 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | + * |
| 12 | + * Unless required by applicable law or agreed to in writing, |
| 13 | + * software distributed under the License is distributed on an |
| 14 | + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | + * KIND, either express or implied. See the License for the |
| 16 | + * specific language governing permissions and limitations |
| 17 | + * under the License. |
| 18 | + */ |
| 19 | + |
| 20 | +package org.apache.druid.testing.embedded.query; |
| 21 | + |
| 22 | +import org.apache.druid.data.input.impl.LocalInputSource; |
| 23 | +import org.apache.druid.indexer.TaskState; |
| 24 | +import org.apache.druid.java.util.common.StringUtils; |
| 25 | +import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; |
| 26 | +import org.apache.druid.query.DruidProcessingConfigTest; |
| 27 | +import org.apache.druid.sql.calcite.planner.Calcites; |
| 28 | +import org.apache.druid.testing.embedded.EmbeddedClusterApis; |
| 29 | +import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis; |
| 30 | +import org.junit.jupiter.api.Assertions; |
| 31 | +import org.junit.jupiter.api.Disabled; |
| 32 | +import org.junit.jupiter.api.Test; |
| 33 | +import org.testcontainers.shaded.com.google.common.io.ByteStreams; |
| 34 | + |
| 35 | +import java.io.File; |
| 36 | +import java.io.IOException; |
| 37 | +import java.nio.file.Files; |
| 38 | +import java.util.Collections; |
| 39 | + |
| 40 | +/** |
| 41 | + * Embedded test to verify nested group-by queries using the {@code forcePushDownNestedQuery} context. |
| 42 | + */ |
| 43 | +public class ForcePushDownNestedQueryTest extends QueryTestBase |
| 44 | +{ |
| 45 | + @Override |
| 46 | + public void beforeAll() throws IOException |
| 47 | + { |
| 48 | + dataSource = EmbeddedClusterApis.createTestDatasourceName(); |
| 49 | + loadWikipediaTable(); |
| 50 | + } |
| 51 | + |
| 52 | + @Override |
| 53 | + protected void refreshDatasourceName() |
| 54 | + { |
| 55 | + // don't change the datasource name for each run because we set things up before all tests |
| 56 | + } |
| 57 | + |
| 58 | + @Test |
| 59 | + public void test_forcePushDownNestedQueries() |
| 60 | + { |
| 61 | + // Nested group by double agg query with force push down |
| 62 | + cluster.callApi().verifySqlQuery( |
| 63 | + "SET forcePushDownNestedQuery = TRUE;\n" |
| 64 | + + "SELECT SUM(sumAdded) AS \"groupedSumAdded\" FROM (\n" |
| 65 | + + " SELECT channel, \"user\", SUM(added) AS sumAdded\n" |
| 66 | + + " FROM %s\n" |
| 67 | + + " GROUP BY channel, \"user\"\n" |
| 68 | + + ")", |
| 69 | + dataSource, |
| 70 | + "9385573" |
| 71 | + ); |
| 72 | + |
| 73 | + // Nested group by query with force push down and renamed dimensions |
| 74 | + cluster.callApi().verifySqlQuery( |
| 75 | + "SET forcePushDownNestedQuery = TRUE;\n" |
| 76 | + + "SELECT SUM(sumAdded) AS groupedSumAdded FROM (\n" |
| 77 | + + " SELECT channel AS renamedChannel, \"user\" AS renamedUser, SUM(added) AS sumAdded\n" |
| 78 | + + " FROM %s\n" |
| 79 | + + " GROUP BY channel, \"user\"\n" |
| 80 | + + ") inner_q", |
| 81 | + dataSource, |
| 82 | + "9385573" |
| 83 | + ); |
| 84 | + |
| 85 | + // Nested group-by query with force pushdown disabled and filters on both outer and inner queries. |
| 86 | + // forcePushDownNestedQuery is intentionally set to false here; enabling it causes the test to fail due to a SQL bug. |
| 87 | + // See test_forcePushDownNestedQuery_doesNotReturnAdditionalResults() |
| 88 | + cluster.callApi().verifySqlQuery( |
| 89 | + "SET forcePushDownNestedQuery = FALSE;\n" |
| 90 | + + "SELECT renamedChannel, SUM(sumAdded) AS groupedSumAdded\n" |
| 91 | + + "FROM (\n" |
| 92 | + + " SELECT channel AS renamedChannel, \"user\", SUM(added) AS sumAdded\n" |
| 93 | + + " FROM %s\n" |
| 94 | + + " WHERE channel IN ('#zh.wikipedia', '#es.wikipedia')\n" |
| 95 | + + " GROUP BY channel, \"user\"\n" |
| 96 | + + ") inner_q\n" |
| 97 | + + "WHERE renamedChannel = '#zh.wikipedia'\n" |
| 98 | + + "GROUP BY renamedChannel", |
| 99 | + dataSource, |
| 100 | + "#zh.wikipedia,191033" |
| 101 | + ); |
| 102 | + |
| 103 | + // Nested group by query with force push down and having clause |
| 104 | + cluster.callApi().verifySqlQuery( |
| 105 | + "SET forcePushDownNestedQuery = FALSE;\n" |
| 106 | + + "SELECT renamedChannel, SUM(sumAdded) AS \"groupedSumAdded\"\n" |
| 107 | + + "FROM (\n" |
| 108 | + + " SELECT channel AS renamedChannel, \"user\", SUM(added) AS sumAdded\n" |
| 109 | + + " FROM %s\n" |
| 110 | + + " WHERE channel IN ('#zh.wikipedia', '#es.wikipedia')\n" |
| 111 | + + " GROUP BY channel, \"user\"\n" |
| 112 | + + ") inner_q\n" |
| 113 | + + "WHERE renamedChannel = '#zh.wikipedia'\n" |
| 114 | + + "GROUP BY renamedChannel", |
| 115 | + dataSource, |
| 116 | + "#zh.wikipedia,191033" |
| 117 | + ); |
| 118 | + |
| 119 | + // Nested group by query with force push down and having clause. This asserts that the post-processing is invoked |
| 120 | + cluster.callApi().verifySqlQuery( |
| 121 | + "SET forcePushDownNestedQuery = TRUE;\n" |
| 122 | + + "SELECT SUM(sumAdded) FROM (\n" |
| 123 | + + " SELECT channel, \"user\", SUM(added) AS sumAdded\n" |
| 124 | + + " FROM %s GROUP BY channel, \"user\"\n" |
| 125 | + + ") inner_q" |
| 126 | + + " HAVING SUM(sumAdded) > 100000000", |
| 127 | + dataSource, |
| 128 | + "" |
| 129 | + ); |
| 130 | + } |
| 131 | + |
| 132 | + @Disabled("Setting forcePushDownNestedQuery = TRUE with filters returns additional results, which appears to be a bug" |
| 133 | + + " in the SQL layer. The same query with forcePushDownNestedQuery = FALSE works as expected; see test above.") |
| 134 | + @Test |
| 135 | + public void test_forcePushDownNestedQuery_doesNotReturnAdditionalResults() |
| 136 | + { |
| 137 | + // When forcePushDownNestedQuery is set to TRUE, this test will fail as there's an extra row: |
| 138 | + // #es.wikipedia,634670\n#zh.wikipedia,191033 |
| 139 | + cluster.callApi().verifySqlQuery( |
| 140 | + "SET forcePushDownNestedQuery = TRUE;\n" |
| 141 | + + "SELECT renamedChannel, SUM(sumAdded) AS groupedSumAdded\n" |
| 142 | + + "FROM (\n" |
| 143 | + + " SELECT channel AS renamedChannel, \"user\", SUM(added) AS sumAdded\n" |
| 144 | + + " FROM %s\n" |
| 145 | + + " WHERE channel IN ('#zh.wikipedia', '#es.wikipedia')\n" |
| 146 | + + " GROUP BY channel, \"user\"\n" |
| 147 | + + ") inner_q\n" |
| 148 | + + "WHERE renamedChannel = '#zh.wikipedia'\n" |
| 149 | + + "GROUP BY renamedChannel", |
| 150 | + dataSource, |
| 151 | + "#zh.wikipedia,191033" |
| 152 | + ); |
| 153 | + } |
| 154 | + |
| 155 | + private void loadWikipediaTable() throws IOException |
| 156 | + { |
| 157 | + final File tmpDir = cluster.getTestFolder().newFolder(); |
| 158 | + final File wikiFile = new File(tmpDir, "wiki.gz"); |
| 159 | + |
| 160 | + ByteStreams.copy( |
| 161 | + DruidProcessingConfigTest.class.getResourceAsStream("/wikipedia/wikiticker-2015-09-12-sampled.json.gz"), |
| 162 | + Files.newOutputStream(wikiFile.toPath()) |
| 163 | + ); |
| 164 | + |
| 165 | + final String sql = StringUtils.format( |
| 166 | + "SET waitUntilSegmentsLoad = TRUE;\n" |
| 167 | + + "REPLACE INTO \"%s\" OVERWRITE ALL\n" |
| 168 | + + "SELECT\n" |
| 169 | + + " TIME_PARSE(\"time\") AS __time,\n" |
| 170 | + + " channel,\n" |
| 171 | + + " countryName,\n" |
| 172 | + + " page,\n" |
| 173 | + + " \"user\",\n" |
| 174 | + + " added,\n" |
| 175 | + + " deleted,\n" |
| 176 | + + " delta\n" |
| 177 | + + "FROM TABLE(\n" |
| 178 | + + " EXTERN(\n" |
| 179 | + + " %s,\n" |
| 180 | + + " '{\"type\":\"json\"}',\n" |
| 181 | + + " '[{\"name\":\"isRobot\",\"type\":\"string\"},{\"name\":\"channel\",\"type\":\"string\"},{\"name\":\"time\",\"type\":\"string\"},{\"name\":\"flags\",\"type\":\"string\"},{\"name\":\"isUnpatrolled\",\"type\":\"string\"},{\"name\":\"page\",\"type\":\"string\"},{\"name\":\"diffUrl\",\"type\":\"string\"},{\"name\":\"added\",\"type\":\"long\"},{\"name\":\"comment\",\"type\":\"string\"},{\"name\":\"commentLength\",\"type\":\"long\"},{\"name\":\"isNew\",\"type\":\"string\"},{\"name\":\"isMinor\",\"type\":\"string\"},{\"name\":\"delta\",\"type\":\"long\"},{\"name\":\"isAnonymous\",\"type\":\"string\"},{\"name\":\"user\",\"type\":\"string\"},{\"name\":\"deltaBucket\",\"type\":\"long\"},{\"name\":\"deleted\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"},{\"name\":\"cityName\",\"type\":\"string\"},{\"name\":\"countryName\",\"type\":\"string\"},{\"name\":\"regionIsoCode\",\"type\":\"string\"},{\"name\":\"metroCode\",\"type\":\"long\"},{\"name\":\"countryIsoCode\",\"type\":\"string\"},{\"name\":\"regionName\",\"type\":\"string\"}]'\n" |
| 182 | + + " )\n" |
| 183 | + + " )\n" |
| 184 | + + "PARTITIONED BY DAY\n" |
| 185 | + + "CLUSTERED BY channel", |
| 186 | + dataSource, |
| 187 | + Calcites.escapeStringLiteral( |
| 188 | + broker.bindings() |
| 189 | + .jsonMapper() |
| 190 | + .writeValueAsString(new LocalInputSource(null, null, Collections.singletonList(wikiFile), null)) |
| 191 | + ) |
| 192 | + ); |
| 193 | + |
| 194 | + final MSQTaskReportPayload payload = new EmbeddedMSQApis(cluster, overlord).runTaskSqlAndGetReport(sql); |
| 195 | + Assertions.assertEquals(TaskState.SUCCESS, payload.getStatus().getStatus()); |
| 196 | + Assertions.assertEquals(1, payload.getStatus().getSegmentLoadWaiterStatus().getTotalSegments()); |
| 197 | + Assertions.assertNull(payload.getStatus().getErrorReport()); |
| 198 | + } |
| 199 | +} |
0 commit comments