Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cron-job-its.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ jobs:
strategy:
fail-fast: false
matrix:
testing_group: [ query, query-retry, query-error, security ]
testing_group: [ query, security ]
uses: ./.github/workflows/reusable-standard-its.yml
needs: build
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/standard-its.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
strategy:
fail-fast: false
matrix:
testing_group: [query, query-retry, query-error, security, centralized-datasource-schema]
testing_group: [query, security, centralized-datasource-schema]
uses: ./.github/workflows/reusable-standard-its.yml
if: ${{ needs.changes.outputs.core == 'true' || needs.changes.outputs.common-extensions == 'true' }}
with:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,15 @@
* under the License.
*/

package org.apache.druid.tests.query;
package org.apache.druid.testing.embedded.query;

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import org.apache.calcite.avatica.AvaticaSqlException;
import org.apache.druid.https.SSLClientConfig;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.tools.IntegrationTestingConfig;
import org.apache.druid.testing.utils.DataLoaderHelper;
import org.apache.druid.tests.TestNGGroup;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;


import java.sql.Connection;
import java.sql.DatabaseMetaData;
Expand All @@ -47,69 +40,55 @@
import java.util.Properties;
import java.util.Set;

@Test(groups = {TestNGGroup.QUERY, TestNGGroup.CENTRALIZED_DATASOURCE_SCHEMA})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITJdbcQueryTest
public class JdbcQueryTest extends QueryTestBase
{
private static final Logger LOG = new Logger(ITJdbcQueryTest.class);
private static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream";
private static final Logger LOG = new Logger(JdbcQueryTest.class);
private static final String CONNECTION_TEMPLATE = "jdbc:avatica:remote:url=%s/druid/v2/sql/avatica/";
private static final String TLS_CONNECTION_TEMPLATE =
"jdbc:avatica:remote:url=%s/druid/v2/sql/avatica/;truststore=%s;truststore_password=%s;keystore=%s;keystore_password=%s;key_password=%s";

private static final String QUERY_TEMPLATE =
"SELECT \"user\", SUM(\"added\"), COUNT(*)" +
"FROM \"wikipedia\" " +
"WHERE \"__time\" >= CURRENT_TIMESTAMP - INTERVAL '99' YEAR AND \"language\" = %s" +
"GROUP BY 1 ORDER BY 3 DESC LIMIT 10";
private static final String QUERY = StringUtils.format(QUERY_TEMPLATE, "'en'");

private static final String QUERY_PARAMETERIZED = StringUtils.format(QUERY_TEMPLATE, "?");
"SELECT \"item\", SUM(\"value\"), COUNT(*) "
+ "FROM \"%s\" "
+ "WHERE \"__time\" >= CURRENT_TIMESTAMP - INTERVAL '99' YEAR AND \"value\" < %s \n"
+ "GROUP BY 1 ORDER BY 3 DESC LIMIT 10";

private String[] connections;
private Properties connectionProperties;

@Inject
private IntegrationTestingConfig config;

@Inject
SSLClientConfig sslConfig;

@Inject
private DataLoaderHelper dataLoaderHelper;
private String tableName;

@BeforeMethod
public void before()
@Override
protected void beforeAll()
{
connectionProperties = new Properties();
connectionProperties.setProperty("user", "admin");
connectionProperties.setProperty("password", "priest");
connections = new String[]{
StringUtils.format(CONNECTION_TEMPLATE, config.getRouterUrl()),
StringUtils.format(CONNECTION_TEMPLATE, config.getBrokerUrl()),
StringUtils.format(
TLS_CONNECTION_TEMPLATE,
config.getRouterTLSUrl(),
sslConfig.getTrustStorePath(),
sslConfig.getTrustStorePasswordProvider().getPassword(),
sslConfig.getKeyStorePath(),
sslConfig.getKeyStorePasswordProvider().getPassword(),
sslConfig.getKeyManagerPasswordProvider().getPassword()
),
StringUtils.format(
TLS_CONNECTION_TEMPLATE,
config.getBrokerTLSUrl(),
sslConfig.getTrustStorePath(),
sslConfig.getTrustStorePasswordProvider().getPassword(),
sslConfig.getKeyStorePath(),
sslConfig.getKeyStorePasswordProvider().getPassword(),
sslConfig.getKeyManagerPasswordProvider().getPassword()
)
StringUtils.format(CONNECTION_TEMPLATE, getServerUrl(router)),
StringUtils.format(CONNECTION_TEMPLATE, getServerUrl(broker)),
// Add in the consecutive patch
// StringUtils.format(
// TLS_CONNECTION_TEMPLATE,
// config.getRouterTLSUrl(),
// sslConfig.getTrustStorePath(),
// sslConfig.getTrustStorePasswordProvider().getPassword(),
// sslConfig.getKeyStorePath(),
// sslConfig.getKeyStorePasswordProvider().getPassword(),
// sslConfig.getKeyManagerPasswordProvider().getPassword()
//),
// StringUtils.format(
// TLS_CONNECTION_TEMPLATE,
// config.getBrokerTLSUrl(),
// sslConfig.getTrustStorePath(),
// sslConfig.getTrustStorePasswordProvider().getPassword(),
// sslConfig.getKeyStorePath(),
// sslConfig.getKeyStorePasswordProvider().getPassword(),
// sslConfig.getKeyManagerPasswordProvider().getPassword()
// )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the commented out code. We can just add one line to the javadoc of this class that mentions that the TLS flavor for this test will be added later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

};
// ensure that wikipedia segments are loaded completely
dataLoaderHelper.waitUntilDatasourceIsReady(WIKIPEDIA_DATA_SOURCE);
dataLoaderHelper.waitUntilDatasourceIsReady("wikipedia");
dataLoaderHelper.waitUntilDatasourceIsReady("twitterstream");

tableName = ingestBasicData();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: In Druid, we generally use the term datasource. Maybe rename this field to testDataSource or similar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename pending?

}

@Test
Expand All @@ -126,7 +105,7 @@ public void testJdbcMetadata()
catalogs.add(catalog);
}
LOG.info("catalogs %s", catalogs);
Assert.assertEquals(catalogs, ImmutableList.of("druid"));
Assertions.assertEquals(catalogs, ImmutableList.of("druid"));

Set<String> schemas = new HashSet<>();
ResultSet schemasMetadata = metadata.getSchemas("druid", null);
Expand All @@ -136,7 +115,7 @@ public void testJdbcMetadata()
}
LOG.info("'druid' catalog schemas %s", schemas);
// maybe more schemas than this, but at least should have these
Assert.assertTrue(schemas.containsAll(ImmutableList.of("INFORMATION_SCHEMA", "druid", "lookup", "sys")));
Assertions.assertTrue(schemas.containsAll(ImmutableList.of("INFORMATION_SCHEMA", "druid", "lookup", "sys")));

Set<String> druidTables = new HashSet<>();
ResultSet tablesMetadata = metadata.getTables("druid", "druid", null, null);
Expand All @@ -145,84 +124,91 @@ public void testJdbcMetadata()
druidTables.add(table);
}
LOG.info("'druid' schema tables %s", druidTables);
// maybe more tables than this, but at least should have these
Assert.assertTrue(
druidTables.containsAll(ImmutableList.of("twitterstream", "wikipedia", WIKIPEDIA_DATA_SOURCE))
// maybe more tables than this, but at least should have @tableName
Assertions.assertTrue(
druidTables.containsAll(ImmutableList.of(tableName))
);

Set<String> wikiColumns = new HashSet<>();
ResultSet columnsMetadata = metadata.getColumns("druid", "druid", WIKIPEDIA_DATA_SOURCE, null);
ResultSet columnsMetadata = metadata.getColumns("druid", "druid", tableName, null);
while (columnsMetadata.next()) {
final String column = columnsMetadata.getString(4);
wikiColumns.add(column);
}
LOG.info("'%s' columns %s", WIKIPEDIA_DATA_SOURCE, wikiColumns);
LOG.info("'%s' columns %s", tableName, wikiColumns);
// a lot more columns than this, but at least should have these
Assert.assertTrue(
wikiColumns.containsAll(ImmutableList.of("added", "city", "delta", "language"))
Assertions.assertTrue(
wikiColumns.containsAll(ImmutableList.of("__time", "item", "value"))
);
}
catch (SQLException throwables) {
Assert.fail(throwables.getMessage());
Assertions.fail(throwables.getMessage());
}
}
}

@Test
public void testJdbcStatementQuery()
{
String query = StringUtils.format(QUERY_TEMPLATE, tableName, "1000");
for (String url : connections) {
try (Connection connection = DriverManager.getConnection(url, connectionProperties)) {
try (Statement statement = connection.createStatement()) {
final ResultSet resultSet = statement.executeQuery(QUERY);
final ResultSet resultSet = statement.executeQuery(query);
int resultRowCount = 0;
while (resultSet.next()) {
resultRowCount++;
LOG.info("%s,%s,%s", resultSet.getString(1), resultSet.getLong(2), resultSet.getLong(3));
}
Assert.assertEquals(resultRowCount, 10);
Assertions.assertEquals(7, resultRowCount);
resultSet.close();
}
}
catch (SQLException throwables) {
Assert.fail(throwables.getMessage());
Assertions.fail(throwables.getMessage());
}
}
}

@Test
public void testJdbcPrepareStatementQuery()
{
String query = StringUtils.format(QUERY_TEMPLATE, tableName, "?");
for (String url : connections) {
try (Connection connection = DriverManager.getConnection(url, connectionProperties)) {
try (PreparedStatement statement = connection.prepareStatement(QUERY_PARAMETERIZED)) {
statement.setString(1, "en");
try (PreparedStatement statement = connection.prepareStatement(query)) {
statement.setLong(1, 1000);
final ResultSet resultSet = statement.executeQuery();
int resultRowCount = 0;
while (resultSet.next()) {
resultRowCount++;
LOG.info("%s,%s,%s", resultSet.getString(1), resultSet.getLong(2), resultSet.getLong(3));
}
Assert.assertEquals(resultRowCount, 10);
Assertions.assertEquals(7, resultRowCount);
resultSet.close();
}
}
catch (SQLException throwables) {
Assert.fail(throwables.getMessage());
Assertions.fail(throwables.getMessage());
}
}
}

@Test(expectedExceptions = AvaticaSqlException.class, expectedExceptionsMessageRegExp = ".* No value bound for parameter \\(position \\[1]\\)")
public void testJdbcPrepareStatementQueryMissingParameters() throws SQLException
@Test
public void testJdbcPrepareStatementQueryMissingParameters()
{
String query = StringUtils.format(QUERY_TEMPLATE, tableName, "?");
for (String url : connections) {
try (Connection connection = DriverManager.getConnection(url, connectionProperties);
PreparedStatement statement = connection.prepareStatement(QUERY_PARAMETERIZED);
PreparedStatement statement = connection.prepareStatement(query);
ResultSet resultSet = statement.executeQuery()) {
// This won't actually run as we expect the exception to be thrown before it gets here
throw new IllegalStateException(resultSet.toString());
}
catch (SQLException e) {
Assertions.assertInstanceOf(AvaticaSqlException.class, e);
Assertions.assertTrue(e.getMessage().contains("No value bound for parameter (position [1])"));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.druid.query.http.SqlTaskStatus;
import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.testing.embedded.EmbeddedClusterApis;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -58,26 +57,6 @@ public class QueryErrorTest extends QueryTestBase
// Introduce onAnyRouter(...) and use it; add TLS tests in the follow-up patches
protected String tableName;

@Override
protected EmbeddedDruidCluster createCluster()
{
overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s");
indexer.setServerMemory(600_000_000)
.addProperty("druid.worker.capacity", "4")
.addProperty("druid.processing.numThreads", "2")
.addProperty("druid.segment.handoff.pollDuration", "PT0.1s");

return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
.useLatchableEmitter()
.addServer(overlord)
.addServer(coordinator)
.addServer(broker)
.addServer(router)
.addServer(indexer)
.addServer(historical)
.addExtension(ServerManagerForQueryErrorTestModule.class);
}

@Override
protected void beforeAll()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,11 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.http.ClientSqlQuery;
import org.apache.druid.testing.embedded.EmbeddedClusterApis;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
import org.apache.druid.testing.embedded.indexing.MoreResources;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -72,38 +67,11 @@ private enum Expectation
private ObjectMapper jsonMapper;
private String tableName;

@Override
protected EmbeddedDruidCluster createCluster()
{
overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s");
coordinator.addProperty("druid.manager.segments.useIncrementalCache", "always");
indexer.setServerMemory(400_000_000)
.addProperty("druid.worker.capacity", "4")
.addProperty("druid.processing.numThreads", "2")
.addProperty("druid.segment.handoff.pollDuration", "PT0.1s");

return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
.useLatchableEmitter()
.addServer(overlord)
.addServer(coordinator)
.addServer(broker)
.addServer(router)
.addServer(indexer)
.addServer(historical)
.addExtension(ServerManagerForQueryErrorTestModule.class);
}

@Override
public void beforeAll()
{
jsonMapper = overlord.bindings().jsonMapper();
tableName = EmbeddedClusterApis.createTestDatasourceName();

final String taskId = IdUtils.getRandomId();
final IndexTask task = MoreResources.Task.BASIC_INDEX.get().dataSource(tableName).withId(taskId);
cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
cluster.callApi().waitForTaskToSucceed(taskId, overlord);
cluster.callApi().waitForAllSegmentsToBeAvailable(tableName, coordinator, broker);
tableName = ingestBasicData();
}

@Test
Expand Down
Loading
Loading