Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cron-job-its.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ jobs:
strategy:
fail-fast: false
matrix:
testing_group: [ query, query-retry, query-error, security ]
testing_group: [ query, security ]
uses: ./.github/workflows/reusable-standard-its.yml
needs: build
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/standard-its.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
strategy:
fail-fast: false
matrix:
testing_group: [query, query-retry, query-error, security, centralized-datasource-schema]
testing_group: [query, security, centralized-datasource-schema]
uses: ./.github/workflows/reusable-standard-its.yml
if: ${{ needs.changes.outputs.core == 'true' || needs.changes.outputs.common-extensions == 'true' }}
with:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,15 @@
* under the License.
*/

package org.apache.druid.tests.query;
package org.apache.druid.testing.embedded.query;

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import org.apache.calcite.avatica.AvaticaSqlException;
import org.apache.druid.https.SSLClientConfig;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.tools.IntegrationTestingConfig;
import org.apache.druid.testing.utils.DataLoaderHelper;
import org.apache.druid.tests.TestNGGroup;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;


import java.sql.Connection;
import java.sql.DatabaseMetaData;
Expand All @@ -47,69 +40,40 @@
import java.util.Properties;
import java.util.Set;

@Test(groups = {TestNGGroup.QUERY, TestNGGroup.CENTRALIZED_DATASOURCE_SCHEMA})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITJdbcQueryTest
/**
* JDBC query integration tests.
* Note: we need to correspond queries with TLS support to fullfill the conversion
*/
public class JdbcQueryTest extends QueryTestBase
{
private static final Logger LOG = new Logger(ITJdbcQueryTest.class);
private static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream";
private static final Logger LOG = new Logger(JdbcQueryTest.class);
private static final String CONNECTION_TEMPLATE = "jdbc:avatica:remote:url=%s/druid/v2/sql/avatica/";
private static final String TLS_CONNECTION_TEMPLATE =
"jdbc:avatica:remote:url=%s/druid/v2/sql/avatica/;truststore=%s;truststore_password=%s;keystore=%s;keystore_password=%s;key_password=%s";

private static final String QUERY_TEMPLATE =
"SELECT \"user\", SUM(\"added\"), COUNT(*)" +
"FROM \"wikipedia\" " +
"WHERE \"__time\" >= CURRENT_TIMESTAMP - INTERVAL '99' YEAR AND \"language\" = %s" +
"GROUP BY 1 ORDER BY 3 DESC LIMIT 10";
private static final String QUERY = StringUtils.format(QUERY_TEMPLATE, "'en'");

private static final String QUERY_PARAMETERIZED = StringUtils.format(QUERY_TEMPLATE, "?");
"SELECT \"item\", SUM(\"value\"), COUNT(*) "
+ "FROM \"%s\" "
+ "WHERE \"__time\" >= CURRENT_TIMESTAMP - INTERVAL '99' YEAR AND \"value\" < %s \n"
+ "GROUP BY 1 ORDER BY 3 DESC LIMIT 10";

private String[] connections;
private Properties connectionProperties;

@Inject
private IntegrationTestingConfig config;

@Inject
SSLClientConfig sslConfig;
private String dataSourceName;

@Inject
private DataLoaderHelper dataLoaderHelper;

@BeforeMethod
public void before()
@Override
protected void beforeAll()
{
connectionProperties = new Properties();
connectionProperties.setProperty("user", "admin");
connectionProperties.setProperty("password", "priest");
connections = new String[]{
StringUtils.format(CONNECTION_TEMPLATE, config.getRouterUrl()),
StringUtils.format(CONNECTION_TEMPLATE, config.getBrokerUrl()),
StringUtils.format(
TLS_CONNECTION_TEMPLATE,
config.getRouterTLSUrl(),
sslConfig.getTrustStorePath(),
sslConfig.getTrustStorePasswordProvider().getPassword(),
sslConfig.getKeyStorePath(),
sslConfig.getKeyStorePasswordProvider().getPassword(),
sslConfig.getKeyManagerPasswordProvider().getPassword()
),
StringUtils.format(
TLS_CONNECTION_TEMPLATE,
config.getBrokerTLSUrl(),
sslConfig.getTrustStorePath(),
sslConfig.getTrustStorePasswordProvider().getPassword(),
sslConfig.getKeyStorePath(),
sslConfig.getKeyStorePasswordProvider().getPassword(),
sslConfig.getKeyManagerPasswordProvider().getPassword()
)
};
// ensure that wikipedia segments are loaded completely
dataLoaderHelper.waitUntilDatasourceIsReady(WIKIPEDIA_DATA_SOURCE);
dataLoaderHelper.waitUntilDatasourceIsReady("wikipedia");
dataLoaderHelper.waitUntilDatasourceIsReady("twitterstream");
StringUtils.format(CONNECTION_TEMPLATE, getServerUrl(router)),
StringUtils.format(CONNECTION_TEMPLATE, getServerUrl(broker)),
};

dataSourceName = ingestBasicData();
}

@Test
Expand All @@ -126,7 +90,7 @@ public void testJdbcMetadata()
catalogs.add(catalog);
}
LOG.info("catalogs %s", catalogs);
Assert.assertEquals(catalogs, ImmutableList.of("druid"));
Assertions.assertEquals(catalogs, ImmutableList.of("druid"));

Set<String> schemas = new HashSet<>();
ResultSet schemasMetadata = metadata.getSchemas("druid", null);
Expand All @@ -136,7 +100,7 @@ public void testJdbcMetadata()
}
LOG.info("'druid' catalog schemas %s", schemas);
// maybe more schemas than this, but at least should have these
Assert.assertTrue(schemas.containsAll(ImmutableList.of("INFORMATION_SCHEMA", "druid", "lookup", "sys")));
Assertions.assertTrue(schemas.containsAll(ImmutableList.of("INFORMATION_SCHEMA", "druid", "lookup", "sys")));

Set<String> druidTables = new HashSet<>();
ResultSet tablesMetadata = metadata.getTables("druid", "druid", null, null);
Expand All @@ -145,83 +109,90 @@ public void testJdbcMetadata()
druidTables.add(table);
}
LOG.info("'druid' schema tables %s", druidTables);
// maybe more tables than this, but at least should have these
Assert.assertTrue(
druidTables.containsAll(ImmutableList.of("twitterstream", "wikipedia", WIKIPEDIA_DATA_SOURCE))
// There may be more tables than this, but at least should have @tableName
Assertions.assertTrue(
druidTables.containsAll(ImmutableList.of(dataSourceName))
);

Set<String> wikiColumns = new HashSet<>();
ResultSet columnsMetadata = metadata.getColumns("druid", "druid", WIKIPEDIA_DATA_SOURCE, null);
ResultSet columnsMetadata = metadata.getColumns("druid", "druid", dataSourceName, null);
while (columnsMetadata.next()) {
final String column = columnsMetadata.getString(4);
wikiColumns.add(column);
}
LOG.info("'%s' columns %s", WIKIPEDIA_DATA_SOURCE, wikiColumns);
LOG.info("'%s' columns %s", dataSourceName, wikiColumns);
// a lot more columns than this, but at least should have these
Assert.assertTrue(
wikiColumns.containsAll(ImmutableList.of("added", "city", "delta", "language"))
Assertions.assertTrue(
wikiColumns.containsAll(ImmutableList.of("__time", "item", "value"))
);
}
catch (SQLException throwables) {
Assert.fail(throwables.getMessage());
Assertions.fail(throwables.getMessage());
}
}
}

@Test
public void testJdbcStatementQuery()
{
String query = StringUtils.format(QUERY_TEMPLATE, dataSourceName, "1000");
for (String url : connections) {
try (Connection connection = DriverManager.getConnection(url, connectionProperties)) {
try (Statement statement = connection.createStatement()) {
final ResultSet resultSet = statement.executeQuery(QUERY);
final ResultSet resultSet = statement.executeQuery(query);
int resultRowCount = 0;
while (resultSet.next()) {
resultRowCount++;
LOG.info("%s,%s,%s", resultSet.getString(1), resultSet.getLong(2), resultSet.getLong(3));
}
Assert.assertEquals(resultRowCount, 10);
Assertions.assertEquals(7, resultRowCount);
resultSet.close();
}
}
catch (SQLException throwables) {
Assert.fail(throwables.getMessage());
Assertions.fail(throwables.getMessage());
}
}
}

@Test
public void testJdbcPrepareStatementQuery()
{
String query = StringUtils.format(QUERY_TEMPLATE, dataSourceName, "?");
for (String url : connections) {
try (Connection connection = DriverManager.getConnection(url, connectionProperties)) {
try (PreparedStatement statement = connection.prepareStatement(QUERY_PARAMETERIZED)) {
statement.setString(1, "en");
try (PreparedStatement statement = connection.prepareStatement(query)) {
statement.setLong(1, 1000);
final ResultSet resultSet = statement.executeQuery();
int resultRowCount = 0;
while (resultSet.next()) {
resultRowCount++;
LOG.info("%s,%s,%s", resultSet.getString(1), resultSet.getLong(2), resultSet.getLong(3));
}
Assert.assertEquals(resultRowCount, 10);
Assertions.assertEquals(7, resultRowCount);
resultSet.close();
}
}
catch (SQLException throwables) {
Assert.fail(throwables.getMessage());
Assertions.fail(throwables.getMessage());
}
}
}

@Test(expectedExceptions = AvaticaSqlException.class, expectedExceptionsMessageRegExp = ".* No value bound for parameter \\(position \\[1]\\)")
public void testJdbcPrepareStatementQueryMissingParameters() throws SQLException
@Test
public void testJdbcPrepareStatementQueryMissingParameters()
{
String query = StringUtils.format(QUERY_TEMPLATE, dataSourceName, "?");
for (String url : connections) {
try (Connection connection = DriverManager.getConnection(url, connectionProperties);
PreparedStatement statement = connection.prepareStatement(QUERY_PARAMETERIZED);
PreparedStatement statement = connection.prepareStatement(query);
ResultSet resultSet = statement.executeQuery()) {
// This won't actually run as we expect the exception to be thrown before it gets here
throw new IllegalStateException(resultSet.toString());
Assertions.fail(resultSet.toString());
}
catch (SQLException e) {
Assertions.assertInstanceOf(AvaticaSqlException.class, e);
Assertions.assertTrue(e.getMessage().contains("No value bound for parameter (position [1])"));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.druid.query.http.SqlTaskStatus;
import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.testing.embedded.EmbeddedClusterApis;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
Expand All @@ -56,42 +55,22 @@
public class QueryErrorTest extends QueryTestBase
{
// Introduce onAnyRouter(...) and use it; add TLS tests in the follow-up patches
protected String tableName;

@Override
protected EmbeddedDruidCluster createCluster()
{
overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s");
indexer.setServerMemory(600_000_000)
.addProperty("druid.worker.capacity", "4")
.addProperty("druid.processing.numThreads", "2")
.addProperty("druid.segment.handoff.pollDuration", "PT0.1s");

return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
.useLatchableEmitter()
.addServer(overlord)
.addServer(coordinator)
.addServer(broker)
.addServer(router)
.addServer(indexer)
.addServer(historical)
.addExtension(ServerManagerForQueryErrorTestModule.class);
}
protected String testDataSourceName;

@Override
protected void beforeAll()
{
tableName = EmbeddedClusterApis.createTestDatasourceName();
testDataSourceName = EmbeddedClusterApis.createTestDatasourceName();
EmbeddedMSQApis msqApi = new EmbeddedMSQApis(cluster, overlord);
SqlTaskStatus ingestionStatus = msqApi.submitTaskSql(StringUtils.format(
"REPLACE INTO %s\n"
+ "OVERWRITE ALL\n"
+ "SELECT CURRENT_TIMESTAMP AS __time, 1 AS d PARTITIONED BY ALL",
tableName
testDataSourceName
));

cluster.callApi().waitForTaskToSucceed(ingestionStatus.getTaskId(), overlord);
cluster.callApi().waitForAllSegmentsToBeAvailable(tableName, coordinator, broker);
cluster.callApi().waitForAllSegmentsToBeAvailable(testDataSourceName, coordinator, broker);
}

@Test
Expand Down Expand Up @@ -267,7 +246,7 @@ private static Map<String, Object> buildTestContext(String key)
private ListenableFuture<String> sqlQueryFuture(BrokerClient b, String contextKey)
{
return b.submitSqlQuery(new ClientSqlQuery(
StringUtils.format("SELECT * FROM %s LIMIT 1", tableName),
StringUtils.format("SELECT * FROM %s LIMIT 1", testDataSourceName),
null,
false,
false,
Expand All @@ -283,7 +262,7 @@ private ListenableFuture<String> sqlQueryFuture(BrokerClient b, String contextKe
private ListenableFuture<String> nativeQueryFuture(BrokerClient b, String contextKey)
{
return b.submitNativeQuery(new Druids.ScanQueryBuilder()
.dataSource(tableName)
.dataSource(testDataSourceName)
.eternityInterval()
.limit(1)
.context(buildTestContext(contextKey))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,11 @@
package org.apache.druid.testing.embedded.query;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.http.ClientSqlQuery;
import org.apache.druid.testing.embedded.EmbeddedClusterApis;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
import org.apache.druid.testing.embedded.indexing.MoreResources;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -69,41 +63,13 @@ private enum Expectation
QUERY_FAILURE
}

private ObjectMapper jsonMapper;
private String tableName;

@Override
protected EmbeddedDruidCluster createCluster()
{
overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s");
coordinator.addProperty("druid.manager.segments.useIncrementalCache", "always");
indexer.setServerMemory(400_000_000)
.addProperty("druid.worker.capacity", "4")
.addProperty("druid.processing.numThreads", "2")
.addProperty("druid.segment.handoff.pollDuration", "PT0.1s");

return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
.useLatchableEmitter()
.addServer(overlord)
.addServer(coordinator)
.addServer(broker)
.addServer(router)
.addServer(indexer)
.addServer(historical)
.addExtension(ServerManagerForQueryErrorTestModule.class);
}

@Override
public void beforeAll()
{
jsonMapper = overlord.bindings().jsonMapper();
tableName = EmbeddedClusterApis.createTestDatasourceName();

final String taskId = IdUtils.getRandomId();
final IndexTask task = MoreResources.Task.BASIC_INDEX.get().dataSource(tableName).withId(taskId);
cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
cluster.callApi().waitForTaskToSucceed(taskId, overlord);
cluster.callApi().waitForAllSegmentsToBeAvailable(tableName, coordinator, broker);
tableName = ingestBasicData();
}

@Test
Expand Down
Loading
Loading