Skip to content

Commit 570ab38

Browse files
authored
Merge branch 'master' into fix/data-loader/logger-rename
2 parents cc5365b + 32179a6 commit 570ab38

File tree

33 files changed

+2408
-391
lines changed

33 files changed

+2408
-391
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ subprojects {
2525
guavaVersion = '32.1.3-jre'
2626
slf4jVersion = '1.7.36'
2727
cassandraDriverVersion = '3.11.5'
28-
azureCosmosVersion = '4.68.0'
28+
azureCosmosVersion = '4.70.0'
2929
jooqVersion = '3.14.16'
3030
awssdkVersion = '2.31.3'
3131
commonsDbcp2Version = '2.13.0'

core/src/main/java/com/scalar/db/common/error/CoreError.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -896,6 +896,21 @@ public enum CoreError implements ScalarDbError {
896896
DATA_LOADER_FILE_PATH_IS_BLANK(
897897
Category.USER_ERROR, "0197", "File path must not be blank.", "", ""),
898898
DATA_LOADER_FILE_NOT_FOUND(Category.USER_ERROR, "0198", "File not found: %s", "", ""),
899+
DATA_LOADER_INVALID_DATE_TIME_FOR_COLUMN_VALUE(
900+
Category.USER_ERROR,
901+
"0199",
902+
"Invalid date time value specified for column %s in table %s in namespace %s.",
903+
"",
904+
""),
905+
DATA_LOADER_NULL_OR_EMPTY_KEY_VALUE_INPUT(
906+
Category.USER_ERROR, "0200", "Key-value cannot be null or empty", "", ""),
907+
DATA_LOADER_INVALID_KEY_VALUE_INPUT(
908+
Category.USER_ERROR, "0201", "Invalid key-value format: %s", "", ""),
909+
DATA_LOADER_SPLIT_INPUT_VALUE_NULL(Category.USER_ERROR, "0202", "Value must not be null", "", ""),
910+
DATA_LOADER_SPLIT_INPUT_DELIMITER_NULL(
911+
Category.USER_ERROR, "0203", "Delimiter must not be null", "", ""),
912+
DATA_LOADER_CONFIG_FILE_PATH_BLANK(
913+
Category.USER_ERROR, "0204", "Config file path must not be blank", "", ""),
899914

900915
//
901916
// Errors for the concurrency error category

core/src/main/java/com/scalar/db/storage/jdbc/JdbcAdmin.java

Lines changed: 64 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -437,40 +437,43 @@ public TableMetadata getTableMetadata(String namespace, String table) throws Exe
437437
TableMetadata.Builder builder = TableMetadata.newBuilder();
438438
boolean tableExists = false;
439439

440-
try (Connection connection = dataSource.getConnection();
441-
PreparedStatement preparedStatement =
442-
connection.prepareStatement(getSelectColumnsStatement())) {
443-
preparedStatement.setString(1, getFullTableName(namespace, table));
444-
445-
try (ResultSet resultSet = preparedStatement.executeQuery()) {
446-
while (resultSet.next()) {
447-
tableExists = true;
448-
449-
String columnName = resultSet.getString(METADATA_COL_COLUMN_NAME);
450-
DataType dataType = DataType.valueOf(resultSet.getString(METADATA_COL_DATA_TYPE));
451-
builder.addColumn(columnName, dataType);
452-
453-
boolean indexed = resultSet.getBoolean(METADATA_COL_INDEXED);
454-
if (indexed) {
455-
builder.addSecondaryIndex(columnName);
456-
}
457-
458-
String keyType = resultSet.getString(METADATA_COL_KEY_TYPE);
459-
if (keyType == null) {
460-
continue;
461-
}
462-
463-
switch (KeyType.valueOf(keyType)) {
464-
case PARTITION:
465-
builder.addPartitionKey(columnName);
466-
break;
467-
case CLUSTERING:
468-
Scan.Ordering.Order clusteringOrder =
469-
Scan.Ordering.Order.valueOf(resultSet.getString(METADATA_COL_CLUSTERING_ORDER));
470-
builder.addClusteringKey(columnName, clusteringOrder);
471-
break;
472-
default:
473-
throw new AssertionError("Invalid key type: " + keyType);
440+
try (Connection connection = dataSource.getConnection()) {
441+
rdbEngine.setReadOnly(connection, true);
442+
443+
try (PreparedStatement preparedStatement =
444+
connection.prepareStatement(getSelectColumnsStatement())) {
445+
preparedStatement.setString(1, getFullTableName(namespace, table));
446+
447+
try (ResultSet resultSet = preparedStatement.executeQuery()) {
448+
while (resultSet.next()) {
449+
tableExists = true;
450+
451+
String columnName = resultSet.getString(METADATA_COL_COLUMN_NAME);
452+
DataType dataType = DataType.valueOf(resultSet.getString(METADATA_COL_DATA_TYPE));
453+
builder.addColumn(columnName, dataType);
454+
455+
boolean indexed = resultSet.getBoolean(METADATA_COL_INDEXED);
456+
if (indexed) {
457+
builder.addSecondaryIndex(columnName);
458+
}
459+
460+
String keyType = resultSet.getString(METADATA_COL_KEY_TYPE);
461+
if (keyType == null) {
462+
continue;
463+
}
464+
465+
switch (KeyType.valueOf(keyType)) {
466+
case PARTITION:
467+
builder.addPartitionKey(columnName);
468+
break;
469+
case CLUSTERING:
470+
Scan.Ordering.Order clusteringOrder =
471+
Scan.Ordering.Order.valueOf(resultSet.getString(METADATA_COL_CLUSTERING_ORDER));
472+
builder.addClusteringKey(columnName, clusteringOrder);
473+
break;
474+
default:
475+
throw new AssertionError("Invalid key type: " + keyType);
476+
}
474477
}
475478
}
476479
}
@@ -507,6 +510,8 @@ public TableMetadata getImportTableMetadata(
507510
}
508511

509512
try (Connection connection = dataSource.getConnection()) {
513+
rdbEngine.setReadOnly(connection, true);
514+
510515
String catalogName = rdbEngine.getCatalogName(namespace);
511516
String schemaName = rdbEngine.getSchemaName(namespace);
512517

@@ -602,19 +607,22 @@ public Set<String> getNamespaceTableNames(String namespace) throws ExecutionExce
602607
+ " WHERE "
603608
+ enclose(METADATA_COL_FULL_TABLE_NAME)
604609
+ " LIKE ?";
605-
try (Connection connection = dataSource.getConnection();
606-
PreparedStatement preparedStatement =
607-
connection.prepareStatement(selectTablesOfNamespaceStatement)) {
608-
String prefix = namespace + ".";
609-
preparedStatement.setString(1, prefix + "%");
610-
try (ResultSet results = preparedStatement.executeQuery()) {
611-
Set<String> tableNames = new HashSet<>();
612-
while (results.next()) {
613-
String tableName =
614-
results.getString(METADATA_COL_FULL_TABLE_NAME).substring(prefix.length());
615-
tableNames.add(tableName);
610+
try (Connection connection = dataSource.getConnection()) {
611+
rdbEngine.setReadOnly(connection, true);
612+
613+
try (PreparedStatement preparedStatement =
614+
connection.prepareStatement(selectTablesOfNamespaceStatement)) {
615+
String prefix = namespace + ".";
616+
preparedStatement.setString(1, prefix + "%");
617+
try (ResultSet results = preparedStatement.executeQuery()) {
618+
Set<String> tableNames = new HashSet<>();
619+
while (results.next()) {
620+
String tableName =
621+
results.getString(METADATA_COL_FULL_TABLE_NAME).substring(prefix.length());
622+
tableNames.add(tableName);
623+
}
624+
return tableNames;
616625
}
617-
return tableNames;
618626
}
619627
} catch (SQLException e) {
620628
// An exception will be thrown if the metadata table does not exist when executing the select
@@ -635,11 +643,14 @@ public boolean namespaceExists(String namespace) throws ExecutionException {
635643
+ " WHERE "
636644
+ enclose(NAMESPACE_COL_NAMESPACE_NAME)
637645
+ " = ?";
638-
try (Connection connection = dataSource.getConnection();
639-
PreparedStatement statement = connection.prepareStatement(selectQuery)) {
640-
statement.setString(1, namespace);
641-
try (ResultSet resultSet = statement.executeQuery()) {
642-
return resultSet.next();
646+
try (Connection connection = dataSource.getConnection()) {
647+
rdbEngine.setReadOnly(connection, true);
648+
649+
try (PreparedStatement statement = connection.prepareStatement(selectQuery)) {
650+
statement.setString(1, namespace);
651+
try (ResultSet resultSet = statement.executeQuery()) {
652+
return resultSet.next();
653+
}
643654
}
644655
} catch (SQLException e) {
645656
// An exception will be thrown if the namespaces table does not exist when executing the
@@ -981,6 +992,8 @@ private String encloseFullTableName(String schema, String table) {
981992
@Override
982993
public Set<String> getNamespaceNames() throws ExecutionException {
983994
try (Connection connection = dataSource.getConnection()) {
995+
rdbEngine.setReadOnly(connection, true);
996+
984997
String selectQuery =
985998
"SELECT * FROM " + encloseFullTableName(metadataSchema, NAMESPACES_TABLE);
986999
Set<String> namespaces = new HashSet<>();

core/src/main/java/com/scalar/db/storage/jdbc/JdbcDatabase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public Optional<Result> get(Get get) throws ExecutionException {
8282
Connection connection = null;
8383
try {
8484
connection = dataSource.getConnection();
85+
rdbEngine.setReadOnly(connection, true);
8586
return jdbcService.get(get, connection);
8687
} catch (SQLException e) {
8788
throw new ExecutionException(
@@ -97,6 +98,7 @@ public Scanner scan(Scan scan) throws ExecutionException {
9798
Connection connection = null;
9899
try {
99100
connection = dataSource.getConnection();
101+
rdbEngine.setReadOnly(connection, true);
100102
return jdbcService.getScanner(scan, connection);
101103
} catch (SQLException e) {
102104
close(connection);

core/src/main/java/com/scalar/db/storage/jdbc/JdbcUtils.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public static BasicDataSource initDataSource(
6363
}
6464
});
6565

66+
dataSource.setDefaultReadOnly(false);
67+
6668
dataSource.setMinIdle(config.getConnectionPoolMinIdle());
6769
dataSource.setMaxIdle(config.getConnectionPoolMaxIdle());
6870
dataSource.setMaxTotal(config.getConnectionPoolMaxTotal());
@@ -89,6 +91,9 @@ public static BasicDataSource initDataSourceForTableMetadata(
8991
dataSource.setUrl(config.getJdbcUrl());
9092
config.getUsername().ifPresent(dataSource::setUsername);
9193
config.getPassword().ifPresent(dataSource::setPassword);
94+
95+
dataSource.setDefaultReadOnly(false);
96+
9297
dataSource.setMinIdle(config.getTableMetadataConnectionPoolMinIdle());
9398
dataSource.setMaxIdle(config.getTableMetadataConnectionPoolMaxIdle());
9499
dataSource.setMaxTotal(config.getTableMetadataConnectionPoolMaxTotal());
@@ -113,6 +118,9 @@ public static BasicDataSource initDataSourceForAdmin(
113118
dataSource.setUrl(config.getJdbcUrl());
114119
config.getUsername().ifPresent(dataSource::setUsername);
115120
config.getPassword().ifPresent(dataSource::setPassword);
121+
122+
dataSource.setDefaultReadOnly(false);
123+
116124
dataSource.setMinIdle(config.getAdminConnectionPoolMinIdle());
117125
dataSource.setMaxIdle(config.getAdminConnectionPoolMaxIdle());
118126
dataSource.setMaxTotal(config.getAdminConnectionPoolMaxTotal());

core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineSqlite.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.scalar.db.storage.jdbc.query.UpsertQuery;
1414
import com.scalar.db.util.TimeRelatedColumnEncodingUtils;
1515
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16+
import java.sql.Connection;
1617
import java.sql.Driver;
1718
import java.sql.JDBCType;
1819
import java.sql.ResultSet;
@@ -337,4 +338,9 @@ public TimestampTZColumn parseTimestampTZColumn(ResultSet resultSet, String colu
337338
public RdbEngineTimeTypeStrategy<Integer, Long, Long, Long> getTimeTypeStrategy() {
338339
return timeTypeEngine;
339340
}
341+
342+
@Override
343+
public void setReadOnly(Connection connection, boolean readOnly) {
344+
// Do nothing. SQLite does not support read-only mode.
345+
}
340346
}

core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineStrategy.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.scalar.db.io.TimestampTZColumn;
1111
import com.scalar.db.storage.jdbc.query.SelectQuery;
1212
import com.scalar.db.storage.jdbc.query.UpsertQuery;
13+
import java.sql.Connection;
1314
import java.sql.Driver;
1415
import java.sql.JDBCType;
1516
import java.sql.ResultSet;
@@ -228,4 +229,8 @@ default String getProjectionsSqlForSelectQuery(TableMetadata metadata, List<Stri
228229
default void throwIfDuplicatedIndexWarning(SQLWarning warning) throws SQLException {
229230
// Do nothing
230231
}
232+
233+
default void setReadOnly(Connection connection, boolean readOnly) throws SQLException {
234+
connection.setReadOnly(readOnly);
235+
}
231236
}

core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.LinkedHashMap;
2727
import java.util.LinkedHashSet;
2828
import java.util.List;
29-
import java.util.Map;
3029
import java.util.Optional;
3130
import java.util.stream.Collectors;
3231
import javax.annotation.concurrent.NotThreadSafe;
@@ -120,7 +119,7 @@ void read(Snapshot.Key key, Get get) throws CrudException {
120119
public List<Result> scan(Scan originalScan) throws CrudException {
121120
List<String> originalProjections = new ArrayList<>(originalScan.getProjections());
122121
Scan scan = (Scan) prepareStorageSelection(originalScan);
123-
Map<Snapshot.Key, TransactionResult> results = scanInternal(scan);
122+
LinkedHashMap<Snapshot.Key, TransactionResult> results = scanInternal(scan);
124123
snapshot.verifyNoOverlap(scan, results);
125124

126125
TableMetadata metadata = getTableMetadata(scan);
@@ -129,13 +128,15 @@ public List<Result> scan(Scan originalScan) throws CrudException {
129128
.collect(Collectors.toList());
130129
}
131130

132-
private Map<Snapshot.Key, TransactionResult> scanInternal(Scan scan) throws CrudException {
133-
Optional<Map<Snapshot.Key, TransactionResult>> resultsInSnapshot = snapshot.getResults(scan);
131+
private LinkedHashMap<Snapshot.Key, TransactionResult> scanInternal(Scan scan)
132+
throws CrudException {
133+
Optional<LinkedHashMap<Snapshot.Key, TransactionResult>> resultsInSnapshot =
134+
snapshot.getResults(scan);
134135
if (resultsInSnapshot.isPresent()) {
135136
return resultsInSnapshot.get();
136137
}
137138

138-
Map<Snapshot.Key, TransactionResult> results = new LinkedHashMap<>();
139+
LinkedHashMap<Snapshot.Key, TransactionResult> results = new LinkedHashMap<>();
139140

140141
Scanner scanner = null;
141142
try {

core/src/main/java/com/scalar/db/transaction/consensuscommit/PrepareMutationComposer.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.scalar.db.transaction.consensuscommit;
22

33
import static com.scalar.db.transaction.consensuscommit.Attribute.ID;
4-
import static com.scalar.db.transaction.consensuscommit.Attribute.VERSION;
54
import static com.scalar.db.transaction.consensuscommit.ConsensusCommitOperationAttributes.isInsertModeEnabled;
65
import static com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils.getNextTxVersion;
76

@@ -73,13 +72,10 @@ private void add(Put base, @Nullable TransactionResult result) throws ExecutionE
7372
if (result.isDeemedAsCommitted()) {
7473
// record is deemed-commit state
7574
putBuilder.condition(
76-
ConditionBuilder.putIf(ConditionBuilder.column(ID).isNullText())
77-
.and(ConditionBuilder.column(VERSION).isNullInt())
78-
.build());
75+
ConditionBuilder.putIf(ConditionBuilder.column(ID).isNullText()).build());
7976
} else {
8077
putBuilder.condition(
8178
ConditionBuilder.putIf(ConditionBuilder.column(ID).isEqualToText(result.getId()))
82-
.and(ConditionBuilder.column(VERSION).isEqualToInt(version))
8379
.build());
8480
}
8581
} else { // initial record or insert mode enabled
@@ -113,13 +109,10 @@ private void add(Delete base, @Nullable TransactionResult result) throws Executi
113109
// check if the record is not interrupted by other conflicting transactions
114110
if (result.isDeemedAsCommitted()) {
115111
putBuilder.condition(
116-
ConditionBuilder.putIf(ConditionBuilder.column(ID).isNullText())
117-
.and(ConditionBuilder.column(VERSION).isNullInt())
118-
.build());
112+
ConditionBuilder.putIf(ConditionBuilder.column(ID).isNullText()).build());
119113
} else {
120114
putBuilder.condition(
121115
ConditionBuilder.putIf(ConditionBuilder.column(ID).isEqualToText(result.getId()))
122-
.and(ConditionBuilder.column(VERSION).isEqualToInt(version))
123116
.build());
124117
}
125118
} else {

0 commit comments

Comments
 (0)