Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
package com.scalar.db.storage.jdbc;

import static org.assertj.core.api.Assertions.assertThat;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedStorageIntegrationTestBase;
import com.scalar.db.api.Get;
import com.scalar.db.api.Put;
import com.scalar.db.api.Result;
import com.scalar.db.api.Scan;
import com.scalar.db.api.Scanner;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.io.Key;
import com.scalar.db.service.StorageFactory;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import org.junit.jupiter.api.Test;

public class JdbcDatabaseIntegrationTest extends DistributedStorageIntegrationTestBase {

Expand All @@ -25,4 +40,113 @@ protected int getLargeDataSizeInBytes() {
return super.getLargeDataSizeInBytes();
}
}

@Test
public void get_InStreamingMode_ShouldRetrieveSingleResult() throws ExecutionException {
if (!JdbcTestUtils.isMysql(rdbEngine) || JdbcTestUtils.isMariaDB(rdbEngine)) {
// MySQL is the only RDB engine that supports streaming mode
return;
}

try (DistributedStorage storage = getStorageInStreamingMode()) {
// Arrange
int pKey = 0;
int cKey = 1;
int value = 2;

storage.put(
Put.newBuilder()
.namespace(namespace)
.table(TABLE)
.partitionKey(Key.ofInt(COL_NAME1, pKey))
.clusteringKey(Key.ofInt(COL_NAME4, cKey))
.intValue(COL_NAME3, value)
.build());

// Act
Optional<Result> result =
storage.get(
Get.newBuilder()
.namespace(namespace)
.table(TABLE)
.partitionKey(Key.ofInt(COL_NAME1, pKey))
.clusteringKey(Key.ofInt(COL_NAME4, cKey))
.build());

// Assert
assertThat(result.isPresent()).isTrue();
assertThat(result.get().getInt(COL_NAME1)).isEqualTo(pKey);
assertThat(result.get().getInt(COL_NAME4)).isEqualTo(cKey);
assertThat(result.get().getInt(COL_NAME3)).isEqualTo(value);
}
}

@Test
public void scan_InStreamingMode_ShouldRetrieveResults() throws IOException, ExecutionException {
if (!JdbcTestUtils.isMysql(rdbEngine) || JdbcTestUtils.isMariaDB(rdbEngine)) {
// MySQL is the only RDB engine that supports streaming mode
return;
}

try (DistributedStorage storage = getStorageInStreamingMode()) {
// Arrange
int pKey = 0;

storage.put(
Put.newBuilder()
.namespace(namespace)
.table(TABLE)
.partitionKey(Key.ofInt(COL_NAME1, pKey))
.clusteringKey(Key.ofInt(COL_NAME4, 0))
.intValue(COL_NAME3, 1)
.build());
storage.put(
Put.newBuilder()
.namespace(namespace)
.table(TABLE)
.partitionKey(Key.ofInt(COL_NAME1, pKey))
.clusteringKey(Key.ofInt(COL_NAME4, 1))
.intValue(COL_NAME3, 2)
.build());
storage.put(
Put.newBuilder()
.namespace(namespace)
.table(TABLE)
.partitionKey(Key.ofInt(COL_NAME1, pKey))
.clusteringKey(Key.ofInt(COL_NAME4, 2))
.intValue(COL_NAME3, 3)
.build());

// Act
Scanner scanner =
storage.scan(
Scan.newBuilder()
.namespace(namespace)
.table(TABLE)
.partitionKey(Key.ofInt(COL_NAME1, pKey))
.build());
List<Result> results = scanner.all();
scanner.close();

// Assert
assertThat(results).hasSize(3);
assertThat(results.get(0).getInt(COL_NAME1)).isEqualTo(pKey);
assertThat(results.get(0).getInt(COL_NAME4)).isEqualTo(0);
assertThat(results.get(0).getInt(COL_NAME3)).isEqualTo(1);

assertThat(results.get(1).getInt(COL_NAME1)).isEqualTo(pKey);
assertThat(results.get(1).getInt(COL_NAME4)).isEqualTo(1);
assertThat(results.get(1).getInt(COL_NAME3)).isEqualTo(2);

assertThat(results.get(2).getInt(COL_NAME1)).isEqualTo(pKey);
assertThat(results.get(2).getInt(COL_NAME4)).isEqualTo(2);
assertThat(results.get(2).getInt(COL_NAME3)).isEqualTo(3);
}
}

private DistributedStorage getStorageInStreamingMode() {
Properties properties = JdbcEnv.getProperties(TEST_NAME);
properties.setProperty(DatabaseConfig.SCAN_FETCH_SIZE, Integer.toString(Integer.MIN_VALUE));
return StorageFactory.create(properties).getStorage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ public static boolean isDb2(RdbEngineStrategy rdbEngine) {
return rdbEngine instanceof RdbEngineDb2;
}

public static boolean isMariaDB(RdbEngineStrategy rdbEngine) {
return rdbEngine instanceof RdbEngineMariaDB;
}

/**
* Filters the data types based on the RDB engine and the excluded data types.
*
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/com/scalar/db/storage/jdbc/JdbcConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public class JdbcConfig {
// is 1970-01-01.
public static final String DEFAULT_DB2_TIME_COLUMN_DEFAULT_DATE_COMPONENT = "1970-01-01";

private final DatabaseConfig databaseConfig;

private final String jdbcUrl;
@Nullable private final String username;
@Nullable private final String password;
Expand Down Expand Up @@ -130,6 +132,7 @@ public class JdbcConfig {
private final LocalDate db2TimeColumnDefaultDateComponent;

public JdbcConfig(DatabaseConfig databaseConfig) {
this.databaseConfig = databaseConfig;
String storage = databaseConfig.getStorage();
String transactionManager = databaseConfig.getTransactionManager();
if (!storage.equals(STORAGE_NAME) && !transactionManager.equals(TRANSACTION_MANAGER_NAME)) {
Expand Down Expand Up @@ -258,6 +261,10 @@ public JdbcConfig(DatabaseConfig databaseConfig) {
LocalDate.parse(db2TimeColumnDefaultDateComponentString, DateTimeFormatter.ISO_LOCAL_DATE);
}

public DatabaseConfig getDatabaseConfig() {
return databaseConfig;
}

public String getJdbcUrl() {
return jdbcUrl;
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/com/scalar/db/storage/jdbc/JdbcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static BasicDataSource initDataSource(
dataSource.setMaxTotal(config.getConnectionPoolMaxTotal());
dataSource.setPoolPreparedStatements(config.isPreparedStatementsPoolEnabled());
dataSource.setMaxOpenPreparedStatements(config.getPreparedStatementsPoolMaxOpen());
for (Entry<String, String> entry : rdbEngine.getConnectionProperties().entrySet()) {
for (Entry<String, String> entry : rdbEngine.getConnectionProperties(config).entrySet()) {
dataSource.addConnectionProperty(entry.getKey(), entry.getValue());
}

Expand All @@ -97,7 +97,7 @@ public static BasicDataSource initDataSourceForTableMetadata(
dataSource.setMinIdle(config.getTableMetadataConnectionPoolMinIdle());
dataSource.setMaxIdle(config.getTableMetadataConnectionPoolMaxIdle());
dataSource.setMaxTotal(config.getTableMetadataConnectionPoolMaxTotal());
for (Entry<String, String> entry : rdbEngine.getConnectionProperties().entrySet()) {
for (Entry<String, String> entry : rdbEngine.getConnectionProperties(config).entrySet()) {
dataSource.addConnectionProperty(entry.getKey(), entry.getValue());
}

Expand All @@ -124,7 +124,7 @@ public static BasicDataSource initDataSourceForAdmin(
dataSource.setMinIdle(config.getAdminConnectionPoolMinIdle());
dataSource.setMaxIdle(config.getAdminConnectionPoolMaxIdle());
dataSource.setMaxTotal(config.getAdminConnectionPoolMaxTotal());
for (Entry<String, String> entry : rdbEngine.getConnectionProperties().entrySet()) {
for (Entry<String, String> entry : rdbEngine.getConnectionProperties(config).entrySet()) {
dataSource.addConnectionProperty(entry.getKey(), entry.getValue());
}
return dataSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ public RdbEngineTimeTypeStrategy<String, LocalDateTime, String, String> getTimeT
}

@Override
public Map<String, String> getConnectionProperties() {
public Map<String, String> getConnectionProperties(JdbcConfig config) {
ImmutableMap.Builder<String, String> props = new ImmutableMap.Builder<>();
// With this Db2 will return a textual description of the error when
// calling JDBC `SQLException.getMessage` instead of a short message containing only error codes
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package com.scalar.db.storage.jdbc;

import com.scalar.db.io.DataType;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.JDBCType;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Map;
import javax.annotation.Nullable;

class RdbEngineMariaDB extends RdbEngineMysql {
Expand All @@ -28,4 +32,14 @@ DataType getDataTypeForScalarDbInternal(
type, typeName, columnSize, digits, columnDescription, overrideDataType);
}
}

@Override
public Map<String, String> getConnectionProperties(JdbcConfig config) {
return Collections.emptyMap();
}

@Override
public void setConnectionToReadOnly(Connection connection, boolean readOnly) throws SQLException {
connection.setReadOnly(readOnly);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.scalar.db.storage.jdbc.query.SelectQuery;
import com.scalar.db.storage.jdbc.query.SelectWithLimitQuery;
import com.scalar.db.storage.jdbc.query.UpsertQuery;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.JDBCType;
import java.sql.ResultSet;
Expand Down Expand Up @@ -440,7 +441,19 @@ public TimestampTZColumn parseTimestampTZColumn(ResultSet resultSet, String colu
}

@Override
public Map<String, String> getConnectionProperties() {
public Map<String, String> getConnectionProperties(JdbcConfig config) {
if (config.getDatabaseConfig().getScanFetchSize() == Integer.MIN_VALUE) {
// If the scan fetch size is set to Integer.MIN_VALUE, use the streaming mode.
return Collections.emptyMap();
}

// Otherwise, use the cursor fetch mode.
return Collections.singletonMap("useCursorFetch", "true");
}

@Override
public void setConnectionToReadOnly(Connection connection, boolean readOnly) throws SQLException {
// Observed performance degradation when using read-only connections in MySQL. So we do not
// set the read-only mode for MySQL connections.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,12 @@ public String getEscape(LikeExpression likeExpression) {
return escape.isEmpty() ? "\\" : escape;
}

public String tryAddIfNotExistsToCreateIndexSql(String createIndexSql) {
return createIndexSql;
}

@Override
public Map<String, String> getConnectionProperties() {
public Map<String, String> getConnectionProperties(JdbcConfig config) {
// Needed to keep the microsecond precision when sending the value of ScalarDB TIME type.
// It is being considered setting to it to false by default in a future driver release.
return ImmutableMap.of("sendTimeAsDatetime", "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,10 @@ default TimestampTZColumn parseTimestampTZColumn(ResultSet resultSet, String col
/**
* Return the connection properties for the underlying database.
*
* @param config the JDBC configuration
* @return a map where key=property_name and value=property_value
*/
default Map<String, String> getConnectionProperties() {
default Map<String, String> getConnectionProperties(JdbcConfig config) {
return Collections.emptyMap();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ private void getTableMetadata_forX_ShouldReturnTableMetadata(
.addSecondaryIndex("c4")
.build();
assertThat(actualMetadata).isEqualTo(expectedMetadata);
if (rdbEngine == RdbEngine.SQLITE) {
if (rdbEngine == RdbEngine.MYSQL || rdbEngine == RdbEngine.SQLITE) {
verify(connection, never()).setReadOnly(anyBoolean());
} else {
verify(connection).setReadOnly(true);
Expand Down Expand Up @@ -1711,7 +1711,7 @@ private void getNamespaceTables_forX_ShouldReturnTableNames(
Set<String> actualTableNames = admin.getNamespaceTableNames(namespace);

// Assert
if (rdbEngine == RdbEngine.SQLITE) {
if (rdbEngine == RdbEngine.MYSQL || rdbEngine == RdbEngine.SQLITE) {
verify(connection, never()).setReadOnly(anyBoolean());
} else {
verify(connection).setReadOnly(true);
Expand Down Expand Up @@ -1785,7 +1785,7 @@ private void namespaceExists_forXWithExistingNamespace_ShouldReturnTrue(
// Assert
assertThat(admin.namespaceExists(namespace)).isTrue();

if (rdbEngine == RdbEngine.SQLITE) {
if (rdbEngine == RdbEngine.MYSQL || rdbEngine == RdbEngine.SQLITE) {
verify(connection, never()).setReadOnly(anyBoolean());
} else {
verify(connection).setReadOnly(true);
Expand Down Expand Up @@ -2866,7 +2866,11 @@ public Boolean answer(InvocationOnMock invocation) {
.execute(expectedCheckTableExistStatement);
assertThat(actual.getPartitionKeyNames()).hasSameElementsAs(ImmutableSet.of("pk1", "pk2"));
assertThat(actual.getColumnDataTypes()).containsExactlyEntriesOf(expectedColumns);
verify(connection).setReadOnly(true);
if (rdbEngine == RdbEngine.MYSQL) {
verify(connection, never()).setReadOnly(anyBoolean());
} else {
verify(connection).setReadOnly(true);
}
verify(rdbEngineStrategy)
.getDataTypeForScalarDb(
any(JDBCType.class),
Expand Down Expand Up @@ -3329,7 +3333,7 @@ private void getNamespaceNames_ForX_WithExistingTables_ShouldWorkProperly(
Set<String> actual = admin.getNamespaceNames();

// Assert
if (rdbEngine.equals(RdbEngine.SQLITE)) {
if (rdbEngine == RdbEngine.MYSQL || rdbEngine == RdbEngine.SQLITE) {
verify(connection, never()).setReadOnly(anyBoolean());
} else {
verify(connection).setReadOnly(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void setUp() throws Exception {
databaseConfig,
dataSource,
tableMetadataDataSource,
RdbEngine.createRdbEngineStrategy(RdbEngine.MYSQL),
RdbEngine.createRdbEngineStrategy(RdbEngine.POSTGRESQL),
jdbcService);
}

Expand Down Expand Up @@ -382,7 +382,7 @@ public void mutate_withConflictError_shouldThrowRetriableExecutionException()
throws SQLException, ExecutionException {
// Arrange
when(jdbcService.mutate(any(), any())).thenThrow(sqlException);
when(sqlException.getErrorCode()).thenReturn(1213);
when(sqlException.getSQLState()).thenReturn("40001");

// Act Assert
assertThatThrownBy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void initDataSource_NonTransactional_ShouldReturnProperDataSource() throw
JdbcConfig config = new JdbcConfig(new DatabaseConfig(properties));
Driver driver = new com.mysql.cj.jdbc.Driver();
when(rdbEngine.getDriver()).thenReturn(driver);
when(rdbEngine.getConnectionProperties()).thenReturn(Collections.emptyMap());
when(rdbEngine.getConnectionProperties(config)).thenReturn(Collections.emptyMap());

// Act
BasicDataSource dataSource = JdbcUtils.initDataSource(config, rdbEngine);
Expand Down Expand Up @@ -95,7 +95,7 @@ public void initDataSource_Transactional_ShouldReturnProperDataSource() throws S
JdbcConfig config = new JdbcConfig(new DatabaseConfig(properties));
Driver driver = new org.postgresql.Driver();
when(rdbEngine.getDriver()).thenReturn(driver);
when(rdbEngine.getConnectionProperties()).thenReturn(Collections.emptyMap());
when(rdbEngine.getConnectionProperties(config)).thenReturn(Collections.emptyMap());

// Act
BasicDataSource dataSource = JdbcUtils.initDataSource(config, rdbEngine, true);
Expand Down Expand Up @@ -135,7 +135,7 @@ public void initDataSource_WithRdbEngineConnectionProperties_ShouldAddProperties
JdbcConfig config = new JdbcConfig(new DatabaseConfig(properties));
Driver driver = new com.microsoft.sqlserver.jdbc.SQLServerDriver();
when(rdbEngine.getDriver()).thenReturn(driver);
when(rdbEngine.getConnectionProperties())
when(rdbEngine.getConnectionProperties(config))
.thenReturn(ImmutableMap.of("prop1", "prop1Value", "prop2", "prop2Value"));

try (MockedStatic<JdbcUtils> jdbcUtils =
Expand Down
Loading