Skip to content

Commit 9a5b0e1

Browse files
committed
Optimize JDBC adaptor for MySQL (#2801)
1 parent 48cefa2 commit 9a5b0e1

File tree

14 files changed

+207
-36
lines changed

14 files changed

+207
-36
lines changed

core/src/integration-test/java/com/scalar/db/storage/jdbc/JdbcDatabaseIntegrationTest.java

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,23 @@
11
package com.scalar.db.storage.jdbc;
22

3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import com.scalar.db.api.DistributedStorage;
36
import com.scalar.db.api.DistributedStorageIntegrationTestBase;
7+
import com.scalar.db.api.Get;
8+
import com.scalar.db.api.Put;
9+
import com.scalar.db.api.Result;
10+
import com.scalar.db.api.Scan;
11+
import com.scalar.db.api.Scanner;
412
import com.scalar.db.config.DatabaseConfig;
13+
import com.scalar.db.exception.storage.ExecutionException;
14+
import com.scalar.db.io.Key;
15+
import com.scalar.db.service.StorageFactory;
16+
import java.io.IOException;
17+
import java.util.List;
18+
import java.util.Optional;
519
import java.util.Properties;
20+
import org.junit.jupiter.api.Test;
621

722
public class JdbcDatabaseIntegrationTest extends DistributedStorageIntegrationTestBase {
823

@@ -25,4 +40,113 @@ protected int getLargeDataSizeInBytes() {
2540
return super.getLargeDataSizeInBytes();
2641
}
2742
}
43+
44+
@Test
45+
public void get_InStreamingMode_ShouldRetrieveSingleResult() throws ExecutionException {
46+
if (!JdbcTestUtils.isMysql(rdbEngine) || JdbcTestUtils.isMariaDB(rdbEngine)) {
47+
// MySQL is the only RDB engine that supports streaming mode
48+
return;
49+
}
50+
51+
try (DistributedStorage storage = getStorageInStreamingMode()) {
52+
// Arrange
53+
int pKey = 0;
54+
int cKey = 1;
55+
int value = 2;
56+
57+
storage.put(
58+
Put.newBuilder()
59+
.namespace(namespace)
60+
.table(TABLE)
61+
.partitionKey(Key.ofInt(COL_NAME1, pKey))
62+
.clusteringKey(Key.ofInt(COL_NAME4, cKey))
63+
.intValue(COL_NAME3, value)
64+
.build());
65+
66+
// Act
67+
Optional<Result> result =
68+
storage.get(
69+
Get.newBuilder()
70+
.namespace(namespace)
71+
.table(TABLE)
72+
.partitionKey(Key.ofInt(COL_NAME1, pKey))
73+
.clusteringKey(Key.ofInt(COL_NAME4, cKey))
74+
.build());
75+
76+
// Assert
77+
assertThat(result.isPresent()).isTrue();
78+
assertThat(result.get().getInt(COL_NAME1)).isEqualTo(pKey);
79+
assertThat(result.get().getInt(COL_NAME4)).isEqualTo(cKey);
80+
assertThat(result.get().getInt(COL_NAME3)).isEqualTo(value);
81+
}
82+
}
83+
84+
@Test
85+
public void scan_InStreamingMode_ShouldRetrieveResults() throws IOException, ExecutionException {
86+
if (!JdbcTestUtils.isMysql(rdbEngine) || JdbcTestUtils.isMariaDB(rdbEngine)) {
87+
// MySQL is the only RDB engine that supports streaming mode
88+
return;
89+
}
90+
91+
try (DistributedStorage storage = getStorageInStreamingMode()) {
92+
// Arrange
93+
int pKey = 0;
94+
95+
storage.put(
96+
Put.newBuilder()
97+
.namespace(namespace)
98+
.table(TABLE)
99+
.partitionKey(Key.ofInt(COL_NAME1, pKey))
100+
.clusteringKey(Key.ofInt(COL_NAME4, 0))
101+
.intValue(COL_NAME3, 1)
102+
.build());
103+
storage.put(
104+
Put.newBuilder()
105+
.namespace(namespace)
106+
.table(TABLE)
107+
.partitionKey(Key.ofInt(COL_NAME1, pKey))
108+
.clusteringKey(Key.ofInt(COL_NAME4, 1))
109+
.intValue(COL_NAME3, 2)
110+
.build());
111+
storage.put(
112+
Put.newBuilder()
113+
.namespace(namespace)
114+
.table(TABLE)
115+
.partitionKey(Key.ofInt(COL_NAME1, pKey))
116+
.clusteringKey(Key.ofInt(COL_NAME4, 2))
117+
.intValue(COL_NAME3, 3)
118+
.build());
119+
120+
// Act
121+
Scanner scanner =
122+
storage.scan(
123+
Scan.newBuilder()
124+
.namespace(namespace)
125+
.table(TABLE)
126+
.partitionKey(Key.ofInt(COL_NAME1, pKey))
127+
.build());
128+
List<Result> results = scanner.all();
129+
scanner.close();
130+
131+
// Assert
132+
assertThat(results).hasSize(3);
133+
assertThat(results.get(0).getInt(COL_NAME1)).isEqualTo(pKey);
134+
assertThat(results.get(0).getInt(COL_NAME4)).isEqualTo(0);
135+
assertThat(results.get(0).getInt(COL_NAME3)).isEqualTo(1);
136+
137+
assertThat(results.get(1).getInt(COL_NAME1)).isEqualTo(pKey);
138+
assertThat(results.get(1).getInt(COL_NAME4)).isEqualTo(1);
139+
assertThat(results.get(1).getInt(COL_NAME3)).isEqualTo(2);
140+
141+
assertThat(results.get(2).getInt(COL_NAME1)).isEqualTo(pKey);
142+
assertThat(results.get(2).getInt(COL_NAME4)).isEqualTo(2);
143+
assertThat(results.get(2).getInt(COL_NAME3)).isEqualTo(3);
144+
}
145+
}
146+
147+
private DistributedStorage getStorageInStreamingMode() {
148+
Properties properties = JdbcEnv.getProperties(TEST_NAME);
149+
properties.setProperty(DatabaseConfig.SCAN_FETCH_SIZE, Integer.toString(Integer.MIN_VALUE));
150+
return StorageFactory.create(properties).getStorage();
151+
}
28152
}

core/src/integration-test/java/com/scalar/db/storage/jdbc/JdbcTestUtils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ public static boolean isDb2(RdbEngineStrategy rdbEngine) {
8686
return rdbEngine instanceof RdbEngineDb2;
8787
}
8888

89+
public static boolean isMariaDB(RdbEngineStrategy rdbEngine) {
90+
return rdbEngine instanceof RdbEngineMariaDB;
91+
}
92+
8993
/**
9094
* Filters the data types based on the RDB engine and the excluded data types.
9195
*

core/src/main/java/com/scalar/db/storage/jdbc/JdbcConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ public class JdbcConfig {
101101
// is 1970-01-01.
102102
public static final String DEFAULT_DB2_TIME_COLUMN_DEFAULT_DATE_COMPONENT = "1970-01-01";
103103

104+
private final DatabaseConfig databaseConfig;
105+
104106
private final String jdbcUrl;
105107
@Nullable private final String username;
106108
@Nullable private final String password;
@@ -130,6 +132,7 @@ public class JdbcConfig {
130132
private final LocalDate db2TimeColumnDefaultDateComponent;
131133

132134
public JdbcConfig(DatabaseConfig databaseConfig) {
135+
this.databaseConfig = databaseConfig;
133136
String storage = databaseConfig.getStorage();
134137
String transactionManager = databaseConfig.getTransactionManager();
135138
if (!storage.equals(STORAGE_NAME) && !transactionManager.equals(TRANSACTION_MANAGER_NAME)) {
@@ -258,6 +261,10 @@ public JdbcConfig(DatabaseConfig databaseConfig) {
258261
LocalDate.parse(db2TimeColumnDefaultDateComponentString, DateTimeFormatter.ISO_LOCAL_DATE);
259262
}
260263

264+
public DatabaseConfig getDatabaseConfig() {
265+
return databaseConfig;
266+
}
267+
261268
public String getJdbcUrl() {
262269
return jdbcUrl;
263270
}

core/src/main/java/com/scalar/db/storage/jdbc/JdbcUtils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public static BasicDataSource initDataSource(
7070
dataSource.setMaxTotal(config.getConnectionPoolMaxTotal());
7171
dataSource.setPoolPreparedStatements(config.isPreparedStatementsPoolEnabled());
7272
dataSource.setMaxOpenPreparedStatements(config.getPreparedStatementsPoolMaxOpen());
73-
for (Entry<String, String> entry : rdbEngine.getConnectionProperties().entrySet()) {
73+
for (Entry<String, String> entry : rdbEngine.getConnectionProperties(config).entrySet()) {
7474
dataSource.addConnectionProperty(entry.getKey(), entry.getValue());
7575
}
7676

@@ -97,7 +97,7 @@ public static BasicDataSource initDataSourceForTableMetadata(
9797
dataSource.setMinIdle(config.getTableMetadataConnectionPoolMinIdle());
9898
dataSource.setMaxIdle(config.getTableMetadataConnectionPoolMaxIdle());
9999
dataSource.setMaxTotal(config.getTableMetadataConnectionPoolMaxTotal());
100-
for (Entry<String, String> entry : rdbEngine.getConnectionProperties().entrySet()) {
100+
for (Entry<String, String> entry : rdbEngine.getConnectionProperties(config).entrySet()) {
101101
dataSource.addConnectionProperty(entry.getKey(), entry.getValue());
102102
}
103103

@@ -124,7 +124,7 @@ public static BasicDataSource initDataSourceForAdmin(
124124
dataSource.setMinIdle(config.getAdminConnectionPoolMinIdle());
125125
dataSource.setMaxIdle(config.getAdminConnectionPoolMaxIdle());
126126
dataSource.setMaxTotal(config.getAdminConnectionPoolMaxTotal());
127-
for (Entry<String, String> entry : rdbEngine.getConnectionProperties().entrySet()) {
127+
for (Entry<String, String> entry : rdbEngine.getConnectionProperties(config).entrySet()) {
128128
dataSource.addConnectionProperty(entry.getKey(), entry.getValue());
129129
}
130130
return dataSource;

core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineDb2.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ public RdbEngineTimeTypeStrategy<String, LocalDateTime, String, String> getTimeT
403403
}
404404

405405
@Override
406-
public Map<String, String> getConnectionProperties() {
406+
public Map<String, String> getConnectionProperties(JdbcConfig config) {
407407
ImmutableMap.Builder<String, String> props = new ImmutableMap.Builder<>();
408408
// With this Db2 will return a textual description of the error when
409409
// calling JDBC `SQLException.getMessage` instead of a short message containing only error codes

core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineMariaDB.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package com.scalar.db.storage.jdbc;
22

33
import com.scalar.db.io.DataType;
4+
import java.sql.Connection;
45
import java.sql.Driver;
56
import java.sql.JDBCType;
7+
import java.sql.SQLException;
8+
import java.util.Collections;
9+
import java.util.Map;
610
import javax.annotation.Nullable;
711

812
class RdbEngineMariaDB extends RdbEngineMysql {
@@ -28,4 +32,14 @@ DataType getDataTypeForScalarDbInternal(
2832
type, typeName, columnSize, digits, columnDescription, overrideDataType);
2933
}
3034
}
35+
36+
@Override
37+
public Map<String, String> getConnectionProperties(JdbcConfig config) {
38+
return Collections.emptyMap();
39+
}
40+
41+
@Override
42+
public void setConnectionToReadOnly(Connection connection, boolean readOnly) throws SQLException {
43+
connection.setReadOnly(readOnly);
44+
}
3145
}

core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineMysql.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.scalar.db.storage.jdbc.query.SelectQuery;
1212
import com.scalar.db.storage.jdbc.query.SelectWithLimitQuery;
1313
import com.scalar.db.storage.jdbc.query.UpsertQuery;
14+
import java.sql.Connection;
1415
import java.sql.Driver;
1516
import java.sql.JDBCType;
1617
import java.sql.ResultSet;
@@ -440,7 +441,19 @@ public TimestampTZColumn parseTimestampTZColumn(ResultSet resultSet, String colu
440441
}
441442

442443
@Override
443-
public Map<String, String> getConnectionProperties() {
444+
public Map<String, String> getConnectionProperties(JdbcConfig config) {
445+
if (config.getDatabaseConfig().getScanFetchSize() == Integer.MIN_VALUE) {
446+
// If the scan fetch size is set to Integer.MIN_VALUE, use the streaming mode.
447+
return Collections.emptyMap();
448+
}
449+
450+
// Otherwise, use the cursor fetch mode.
444451
return Collections.singletonMap("useCursorFetch", "true");
445452
}
453+
454+
@Override
455+
public void setConnectionToReadOnly(Connection connection, boolean readOnly) throws SQLException {
456+
// Observed performance degradation when using read-only connections in MySQL. So we do not
457+
// set the read-only mode for MySQL connections.
458+
}
446459
}

core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineSqlServer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,8 +365,12 @@ public String getEscape(LikeExpression likeExpression) {
365365
return escape.isEmpty() ? "\\" : escape;
366366
}
367367

368+
public String tryAddIfNotExistsToCreateIndexSql(String createIndexSql) {
369+
return createIndexSql;
370+
}
371+
368372
@Override
369-
public Map<String, String> getConnectionProperties() {
373+
public Map<String, String> getConnectionProperties(JdbcConfig config) {
370374
// Needed to keep the microsecond precision when sending the value of ScalarDB TIME type.
371375
// It is being considered setting to it to false by default in a future driver release.
372376
return ImmutableMap.of("sendTimeAsDatetime", "false");

core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineStrategy.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,9 +203,10 @@ default TimestampTZColumn parseTimestampTZColumn(ResultSet resultSet, String col
203203
/**
204204
* Return the connection properties for the underlying database.
205205
*
206+
* @param config the JDBC configuration
206207
* @return a map where key=property_name and value=property_value
207208
*/
208-
default Map<String, String> getConnectionProperties() {
209+
default Map<String, String> getConnectionProperties(JdbcConfig config) {
209210
return Collections.emptyMap();
210211
}
211212

core/src/test/java/com/scalar/db/storage/jdbc/JdbcAdminTestBase.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ private void getTableMetadata_forX_ShouldReturnTableMetadata(
289289
.addSecondaryIndex("c4")
290290
.build();
291291
assertThat(actualMetadata).isEqualTo(expectedMetadata);
292-
if (rdbEngine == RdbEngine.SQLITE) {
292+
if (rdbEngine == RdbEngine.MYSQL || rdbEngine == RdbEngine.SQLITE) {
293293
verify(connection, never()).setReadOnly(anyBoolean());
294294
} else {
295295
verify(connection).setReadOnly(true);
@@ -1711,7 +1711,7 @@ private void getNamespaceTables_forX_ShouldReturnTableNames(
17111711
Set<String> actualTableNames = admin.getNamespaceTableNames(namespace);
17121712

17131713
// Assert
1714-
if (rdbEngine == RdbEngine.SQLITE) {
1714+
if (rdbEngine == RdbEngine.MYSQL || rdbEngine == RdbEngine.SQLITE) {
17151715
verify(connection, never()).setReadOnly(anyBoolean());
17161716
} else {
17171717
verify(connection).setReadOnly(true);
@@ -1785,7 +1785,7 @@ private void namespaceExists_forXWithExistingNamespace_ShouldReturnTrue(
17851785
// Assert
17861786
assertThat(admin.namespaceExists(namespace)).isTrue();
17871787

1788-
if (rdbEngine == RdbEngine.SQLITE) {
1788+
if (rdbEngine == RdbEngine.MYSQL || rdbEngine == RdbEngine.SQLITE) {
17891789
verify(connection, never()).setReadOnly(anyBoolean());
17901790
} else {
17911791
verify(connection).setReadOnly(true);
@@ -2866,7 +2866,11 @@ public Boolean answer(InvocationOnMock invocation) {
28662866
.execute(expectedCheckTableExistStatement);
28672867
assertThat(actual.getPartitionKeyNames()).hasSameElementsAs(ImmutableSet.of("pk1", "pk2"));
28682868
assertThat(actual.getColumnDataTypes()).containsExactlyEntriesOf(expectedColumns);
2869-
verify(connection).setReadOnly(true);
2869+
if (rdbEngine == RdbEngine.MYSQL) {
2870+
verify(connection, never()).setReadOnly(anyBoolean());
2871+
} else {
2872+
verify(connection).setReadOnly(true);
2873+
}
28702874
verify(rdbEngineStrategy)
28712875
.getDataTypeForScalarDb(
28722876
any(JDBCType.class),
@@ -3329,7 +3333,7 @@ private void getNamespaceNames_ForX_WithExistingTables_ShouldWorkProperly(
33293333
Set<String> actual = admin.getNamespaceNames();
33303334

33313335
// Assert
3332-
if (rdbEngine.equals(RdbEngine.SQLITE)) {
3336+
if (rdbEngine == RdbEngine.MYSQL || rdbEngine == RdbEngine.SQLITE) {
33333337
verify(connection, never()).setReadOnly(anyBoolean());
33343338
} else {
33353339
verify(connection).setReadOnly(true);

0 commit comments

Comments
 (0)