Skip to content

Commit f3969d9

Browse files
authored
Optimize JDBC adaptor for MySQL (#2801)
1 parent a32e1e8 commit f3969d9

File tree

14 files changed

+203
-36
lines changed

14 files changed

+203
-36
lines changed

core/src/integration-test/java/com/scalar/db/storage/jdbc/JdbcDatabaseIntegrationTest.java

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,23 @@
11
package com.scalar.db.storage.jdbc;
22

3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import com.scalar.db.api.DistributedStorage;
36
import com.scalar.db.api.DistributedStorageIntegrationTestBase;
7+
import com.scalar.db.api.Get;
8+
import com.scalar.db.api.Put;
9+
import com.scalar.db.api.Result;
10+
import com.scalar.db.api.Scan;
11+
import com.scalar.db.api.Scanner;
412
import com.scalar.db.config.DatabaseConfig;
13+
import com.scalar.db.exception.storage.ExecutionException;
14+
import com.scalar.db.io.Key;
15+
import com.scalar.db.service.StorageFactory;
16+
import java.io.IOException;
17+
import java.util.List;
18+
import java.util.Optional;
519
import java.util.Properties;
20+
import org.junit.jupiter.api.Test;
621

722
public class JdbcDatabaseIntegrationTest extends DistributedStorageIntegrationTestBase {
823

@@ -25,4 +40,113 @@ protected int getLargeDataSizeInBytes() {
2540
return super.getLargeDataSizeInBytes();
2641
}
2742
}
43+
44+
@Test
45+
public void get_InStreamingMode_ShouldRetrieveSingleResult() throws ExecutionException {
46+
if (!JdbcTestUtils.isMysql(rdbEngine) || JdbcTestUtils.isMariaDB(rdbEngine)) {
47+
// MySQL is the only RDB engine that supports streaming mode
48+
return;
49+
}
50+
51+
try (DistributedStorage storage = getStorageInStreamingMode()) {
52+
// Arrange
53+
int pKey = 0;
54+
int cKey = 1;
55+
int value = 2;
56+
57+
storage.put(
58+
Put.newBuilder()
59+
.namespace(namespace)
60+
.table(TABLE)
61+
.partitionKey(Key.ofInt(COL_NAME1, pKey))
62+
.clusteringKey(Key.ofInt(COL_NAME4, cKey))
63+
.intValue(COL_NAME3, value)
64+
.build());
65+
66+
// Act
67+
Optional<Result> result =
68+
storage.get(
69+
Get.newBuilder()
70+
.namespace(namespace)
71+
.table(TABLE)
72+
.partitionKey(Key.ofInt(COL_NAME1, pKey))
73+
.clusteringKey(Key.ofInt(COL_NAME4, cKey))
74+
.build());
75+
76+
// Assert
77+
assertThat(result.isPresent()).isTrue();
78+
assertThat(result.get().getInt(COL_NAME1)).isEqualTo(pKey);
79+
assertThat(result.get().getInt(COL_NAME4)).isEqualTo(cKey);
80+
assertThat(result.get().getInt(COL_NAME3)).isEqualTo(value);
81+
}
82+
}
83+
84+
@Test
85+
public void scan_InStreamingMode_ShouldRetrieveResults() throws IOException, ExecutionException {
86+
if (!JdbcTestUtils.isMysql(rdbEngine) || JdbcTestUtils.isMariaDB(rdbEngine)) {
87+
// MySQL is the only RDB engine that supports streaming mode
88+
return;
89+
}
90+
91+
try (DistributedStorage storage = getStorageInStreamingMode()) {
92+
// Arrange
93+
int pKey = 0;
94+
95+
storage.put(
96+
Put.newBuilder()
97+
.namespace(namespace)
98+
.table(TABLE)
99+
.partitionKey(Key.ofInt(COL_NAME1, pKey))
100+
.clusteringKey(Key.ofInt(COL_NAME4, 0))
101+
.intValue(COL_NAME3, 1)
102+
.build());
103+
storage.put(
104+
Put.newBuilder()
105+
.namespace(namespace)
106+
.table(TABLE)
107+
.partitionKey(Key.ofInt(COL_NAME1, pKey))
108+
.clusteringKey(Key.ofInt(COL_NAME4, 1))
109+
.intValue(COL_NAME3, 2)
110+
.build());
111+
storage.put(
112+
Put.newBuilder()
113+
.namespace(namespace)
114+
.table(TABLE)
115+
.partitionKey(Key.ofInt(COL_NAME1, pKey))
116+
.clusteringKey(Key.ofInt(COL_NAME4, 2))
117+
.intValue(COL_NAME3, 3)
118+
.build());
119+
120+
// Act
121+
Scanner scanner =
122+
storage.scan(
123+
Scan.newBuilder()
124+
.namespace(namespace)
125+
.table(TABLE)
126+
.partitionKey(Key.ofInt(COL_NAME1, pKey))
127+
.build());
128+
List<Result> results = scanner.all();
129+
scanner.close();
130+
131+
// Assert
132+
assertThat(results).hasSize(3);
133+
assertThat(results.get(0).getInt(COL_NAME1)).isEqualTo(pKey);
134+
assertThat(results.get(0).getInt(COL_NAME4)).isEqualTo(0);
135+
assertThat(results.get(0).getInt(COL_NAME3)).isEqualTo(1);
136+
137+
assertThat(results.get(1).getInt(COL_NAME1)).isEqualTo(pKey);
138+
assertThat(results.get(1).getInt(COL_NAME4)).isEqualTo(1);
139+
assertThat(results.get(1).getInt(COL_NAME3)).isEqualTo(2);
140+
141+
assertThat(results.get(2).getInt(COL_NAME1)).isEqualTo(pKey);
142+
assertThat(results.get(2).getInt(COL_NAME4)).isEqualTo(2);
143+
assertThat(results.get(2).getInt(COL_NAME3)).isEqualTo(3);
144+
}
145+
}
146+
147+
private DistributedStorage getStorageInStreamingMode() {
148+
Properties properties = JdbcEnv.getProperties(TEST_NAME);
149+
properties.setProperty(DatabaseConfig.SCAN_FETCH_SIZE, Integer.toString(Integer.MIN_VALUE));
150+
return StorageFactory.create(properties).getStorage();
151+
}
28152
}

core/src/integration-test/java/com/scalar/db/storage/jdbc/JdbcTestUtils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ public static boolean isDb2(RdbEngineStrategy rdbEngine) {
8686
return rdbEngine instanceof RdbEngineDb2;
8787
}
8888

89+
public static boolean isMariaDB(RdbEngineStrategy rdbEngine) {
90+
return rdbEngine instanceof RdbEngineMariaDB;
91+
}
92+
8993
/**
9094
* Filters the data types based on the RDB engine and the excluded data types.
9195
*

core/src/main/java/com/scalar/db/storage/jdbc/JdbcConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ public class JdbcConfig {
107107
// is 1970-01-01.
108108
public static final String DEFAULT_DB2_TIME_COLUMN_DEFAULT_DATE_COMPONENT = "1970-01-01";
109109

110+
private final DatabaseConfig databaseConfig;
111+
110112
private final String jdbcUrl;
111113
@Nullable private final String username;
112114
@Nullable private final String password;
@@ -136,6 +138,7 @@ public class JdbcConfig {
136138
private final LocalDate db2TimeColumnDefaultDateComponent;
137139

138140
public JdbcConfig(DatabaseConfig databaseConfig) {
141+
this.databaseConfig = databaseConfig;
139142
String storage = databaseConfig.getStorage();
140143
String transactionManager = databaseConfig.getTransactionManager();
141144
if (!storage.equals(STORAGE_NAME) && !transactionManager.equals(TRANSACTION_MANAGER_NAME)) {
@@ -278,6 +281,10 @@ public JdbcConfig(DatabaseConfig databaseConfig) {
278281
}
279282
}
280283

284+
public DatabaseConfig getDatabaseConfig() {
285+
return databaseConfig;
286+
}
287+
281288
public String getJdbcUrl() {
282289
return jdbcUrl;
283290
}

core/src/main/java/com/scalar/db/storage/jdbc/JdbcUtils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public static BasicDataSource initDataSource(
7070
dataSource.setMaxTotal(config.getConnectionPoolMaxTotal());
7171
dataSource.setPoolPreparedStatements(config.isPreparedStatementsPoolEnabled());
7272
dataSource.setMaxOpenPreparedStatements(config.getPreparedStatementsPoolMaxOpen());
73-
for (Entry<String, String> entry : rdbEngine.getConnectionProperties().entrySet()) {
73+
for (Entry<String, String> entry : rdbEngine.getConnectionProperties(config).entrySet()) {
7474
dataSource.addConnectionProperty(entry.getKey(), entry.getValue());
7575
}
7676

@@ -97,7 +97,7 @@ public static BasicDataSource initDataSourceForTableMetadata(
9797
dataSource.setMinIdle(config.getTableMetadataConnectionPoolMinIdle());
9898
dataSource.setMaxIdle(config.getTableMetadataConnectionPoolMaxIdle());
9999
dataSource.setMaxTotal(config.getTableMetadataConnectionPoolMaxTotal());
100-
for (Entry<String, String> entry : rdbEngine.getConnectionProperties().entrySet()) {
100+
for (Entry<String, String> entry : rdbEngine.getConnectionProperties(config).entrySet()) {
101101
dataSource.addConnectionProperty(entry.getKey(), entry.getValue());
102102
}
103103

@@ -124,7 +124,7 @@ public static BasicDataSource initDataSourceForAdmin(
124124
dataSource.setMinIdle(config.getAdminConnectionPoolMinIdle());
125125
dataSource.setMaxIdle(config.getAdminConnectionPoolMaxIdle());
126126
dataSource.setMaxTotal(config.getAdminConnectionPoolMaxTotal());
127-
for (Entry<String, String> entry : rdbEngine.getConnectionProperties().entrySet()) {
127+
for (Entry<String, String> entry : rdbEngine.getConnectionProperties(config).entrySet()) {
128128
dataSource.addConnectionProperty(entry.getKey(), entry.getValue());
129129
}
130130
return dataSource;

core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineDb2.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,7 @@ public RdbEngineTimeTypeStrategy<String, LocalDateTime, String, String> getTimeT
432432
}
433433

434434
@Override
435-
public Map<String, String> getConnectionProperties() {
435+
public Map<String, String> getConnectionProperties(JdbcConfig config) {
436436
ImmutableMap.Builder<String, String> props = new ImmutableMap.Builder<>();
437437
// With this Db2 will return a textual description of the error when
438438
// calling JDBC `SQLException.getMessage` instead of a short message containing only error codes

core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineMariaDB.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package com.scalar.db.storage.jdbc;
22

33
import com.scalar.db.io.DataType;
4+
import java.sql.Connection;
45
import java.sql.Driver;
56
import java.sql.JDBCType;
7+
import java.sql.SQLException;
8+
import java.util.Collections;
9+
import java.util.Map;
610
import javax.annotation.Nullable;
711

812
class RdbEngineMariaDB extends RdbEngineMysql {
@@ -28,4 +32,14 @@ DataType getDataTypeForScalarDbInternal(
2832
type, typeName, columnSize, digits, columnDescription, overrideDataType);
2933
}
3034
}
35+
36+
@Override
37+
public Map<String, String> getConnectionProperties(JdbcConfig config) {
38+
return Collections.emptyMap();
39+
}
40+
41+
@Override
42+
public void setConnectionToReadOnly(Connection connection, boolean readOnly) throws SQLException {
43+
connection.setReadOnly(readOnly);
44+
}
3145
}

core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineMysql.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.scalar.db.storage.jdbc.query.SelectQuery;
1212
import com.scalar.db.storage.jdbc.query.SelectWithLimitQuery;
1313
import com.scalar.db.storage.jdbc.query.UpsertQuery;
14+
import java.sql.Connection;
1415
import java.sql.Driver;
1516
import java.sql.JDBCType;
1617
import java.sql.ResultSet;
@@ -448,7 +449,19 @@ public TimestampTZColumn parseTimestampTZColumn(ResultSet resultSet, String colu
448449
}
449450

450451
@Override
451-
public Map<String, String> getConnectionProperties() {
452+
public Map<String, String> getConnectionProperties(JdbcConfig config) {
453+
if (config.getDatabaseConfig().getScanFetchSize() == Integer.MIN_VALUE) {
454+
// If the scan fetch size is set to Integer.MIN_VALUE, use the streaming mode.
455+
return Collections.emptyMap();
456+
}
457+
458+
// Otherwise, use the cursor fetch mode.
452459
return Collections.singletonMap("useCursorFetch", "true");
453460
}
461+
462+
@Override
463+
public void setConnectionToReadOnly(Connection connection, boolean readOnly) throws SQLException {
464+
// Observed performance degradation when using read-only connections in MySQL. So we do not
465+
// set the read-only mode for MySQL connections.
466+
}
454467
}

core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineSqlServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ public String tryAddIfNotExistsToCreateIndexSql(String createIndexSql) {
375375
}
376376

377377
@Override
378-
public Map<String, String> getConnectionProperties() {
378+
public Map<String, String> getConnectionProperties(JdbcConfig config) {
379379
// Needed to keep the microsecond precision when sending the value of ScalarDB TIME type.
380380
// It is being considered setting to it to false by default in a future driver release.
381381
return ImmutableMap.of("sendTimeAsDatetime", "false");

core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineStrategy.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,9 +205,10 @@ default TimestampTZColumn parseTimestampTZColumn(ResultSet resultSet, String col
205205
/**
206206
* Return the connection properties for the underlying database.
207207
*
208+
* @param config the JDBC configuration
208209
* @return a map where key=property_name and value=property_value
209210
*/
210-
default Map<String, String> getConnectionProperties() {
211+
default Map<String, String> getConnectionProperties(JdbcConfig config) {
211212
return Collections.emptyMap();
212213
}
213214

core/src/test/java/com/scalar/db/storage/jdbc/JdbcAdminTest.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ private void getTableMetadata_forX_ShouldReturnTableMetadata(
274274
.addSecondaryIndex("c4")
275275
.build();
276276
assertThat(actualMetadata).isEqualTo(expectedMetadata);
277-
if (rdbEngine == RdbEngine.SQLITE) {
277+
if (rdbEngine == RdbEngine.MYSQL || rdbEngine == RdbEngine.SQLITE) {
278278
verify(connection, never()).setReadOnly(anyBoolean());
279279
} else {
280280
verify(connection).setReadOnly(true);
@@ -2197,7 +2197,7 @@ private void getNamespaceTables_forX_ShouldReturnTableNames(
21972197
Set<String> actualTableNames = admin.getNamespaceTableNames(namespace);
21982198

21992199
// Assert
2200-
if (rdbEngine == RdbEngine.SQLITE) {
2200+
if (rdbEngine == RdbEngine.MYSQL || rdbEngine == RdbEngine.SQLITE) {
22012201
verify(connection, never()).setReadOnly(anyBoolean());
22022202
} else {
22032203
verify(connection).setReadOnly(true);
@@ -2270,7 +2270,7 @@ private void namespaceExists_forXWithExistingNamespace_ShouldReturnTrue(
22702270
// Assert
22712271
assertThat(admin.namespaceExists(namespace)).isTrue();
22722272

2273-
if (rdbEngine == RdbEngine.SQLITE) {
2273+
if (rdbEngine == RdbEngine.MYSQL || rdbEngine == RdbEngine.SQLITE) {
22742274
verify(connection, never()).setReadOnly(anyBoolean());
22752275
} else {
22762276
verify(connection).setReadOnly(true);
@@ -2965,7 +2965,7 @@ private void getNamespaceNames_forX_ShouldReturnNamespaceNames(
29652965
Set<String> actualNamespaceNames = admin.getNamespaceNames();
29662966

29672967
// Assert
2968-
if (rdbEngine == RdbEngine.SQLITE) {
2968+
if (rdbEngine == RdbEngine.MYSQL || rdbEngine == RdbEngine.SQLITE) {
29692969
verify(connection, never()).setReadOnly(anyBoolean());
29702970
} else {
29712971
verify(connection).setReadOnly(true);
@@ -3050,7 +3050,11 @@ public Boolean answer(InvocationOnMock invocation) {
30503050
.execute(expectedCheckTableExistStatement);
30513051
assertThat(actual.getPartitionKeyNames()).hasSameElementsAs(ImmutableSet.of("pk1", "pk2"));
30523052
assertThat(actual.getColumnDataTypes()).containsExactlyEntriesOf(expectedColumns);
3053-
verify(connection).setReadOnly(true);
3053+
if (rdbEngine == RdbEngine.MYSQL) {
3054+
verify(connection, never()).setReadOnly(anyBoolean());
3055+
} else {
3056+
verify(connection).setReadOnly(true);
3057+
}
30543058
verify(rdbEngineStrategy)
30553059
.getDataTypeForScalarDb(
30563060
any(JDBCType.class),

0 commit comments

Comments
 (0)