Skip to content

Commit dd7d87f

Browse files
authored
Merge branch 'master' into data-loader/add-ci-for-build
2 parents 640623e + 30057d8 commit dd7d87f

35 files changed

+2360
-211
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.scalar.db.storage.cassandra;
2+
3+
import com.scalar.db.api.DistributedStorageMutationAtomicityUnitIntegrationTestBase;
4+
import java.util.Collections;
5+
import java.util.Map;
6+
import java.util.Properties;
7+
8+
public class CassandraMutationAtomicityUnitIntegrationTest
9+
extends DistributedStorageMutationAtomicityUnitIntegrationTestBase {
10+
11+
@Override
12+
protected Properties getProperties(String testName) {
13+
return CassandraEnv.getProperties(testName);
14+
}
15+
16+
@Override
17+
protected Map<String, String> getCreationOptions() {
18+
return Collections.singletonMap(CassandraAdmin.REPLICATION_FACTOR, "1");
19+
}
20+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.scalar.db.storage.cosmos;
2+
3+
import com.scalar.db.api.DistributedStorageMutationAtomicityUnitIntegrationTestBase;
4+
import java.util.Map;
5+
import java.util.Properties;
6+
import org.junit.jupiter.api.Disabled;
7+
8+
public class CosmosMutationAtomicityUnitIntegrationTest
9+
extends DistributedStorageMutationAtomicityUnitIntegrationTestBase {
10+
11+
@Override
12+
protected Properties getProperties(String testName) {
13+
return CosmosEnv.getProperties(testName);
14+
}
15+
16+
@Override
17+
protected Map<String, String> getCreationOptions() {
18+
return CosmosEnv.getCreationOptions();
19+
}
20+
21+
@Disabled("This test fails. It might be a bug")
22+
@Override
23+
public void
24+
mutate_MutationsWithinRecordGiven_ShouldBehaveCorrectlyBaseOnMutationAtomicityUnit() {}
25+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.scalar.db.storage.dynamo;
2+
3+
import com.scalar.db.api.DistributedStorageMutationAtomicityUnitIntegrationTestBase;
4+
import java.util.Map;
5+
import java.util.Properties;
6+
import org.junit.jupiter.api.Disabled;
7+
8+
public class DynamoMutationAtomicityUnitIntegrationTest
9+
extends DistributedStorageMutationAtomicityUnitIntegrationTestBase {
10+
11+
@Override
12+
protected Properties getProperties(String testName) {
13+
return DynamoEnv.getProperties(testName);
14+
}
15+
16+
@Override
17+
protected Map<String, String> getCreationOptions() {
18+
return DynamoEnv.getCreationOptions();
19+
}
20+
21+
@Disabled("Transaction request cannot include multiple operations on one item in DynamoDB")
22+
@Override
23+
public void
24+
mutate_MutationsWithinRecordGiven_ShouldBehaveCorrectlyBaseOnMutationAtomicityUnit() {}
25+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.scalar.db.storage.jdbc;
2+
3+
import com.scalar.db.api.DistributedStorageMutationAtomicityUnitIntegrationTestBase;
4+
import java.util.Properties;
5+
6+
public class JdbcDatabaseMutationAtomicityUnitIntegrationTest
7+
extends DistributedStorageMutationAtomicityUnitIntegrationTestBase {
8+
9+
@Override
10+
protected Properties getProperties(String testName) {
11+
return JdbcEnv.getProperties(testName);
12+
}
13+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package com.scalar.db.storage.multistorage;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import com.scalar.db.api.DistributedStorageMutationAtomicityUnitIntegrationTestBase;
6+
import com.scalar.db.api.Put;
7+
import com.scalar.db.config.DatabaseConfig;
8+
import com.scalar.db.exception.storage.ExecutionException;
9+
import com.scalar.db.io.Key;
10+
import java.util.Arrays;
11+
import java.util.Properties;
12+
import org.assertj.core.api.Assertions;
13+
import org.junit.jupiter.api.Test;
14+
15+
public class MultiStorageMutationAtomicityUnitIntegrationTest
16+
extends DistributedStorageMutationAtomicityUnitIntegrationTestBase {
17+
18+
@Override
19+
public Properties getProperties(String testName) {
20+
Properties properties = new Properties();
21+
properties.setProperty(DatabaseConfig.STORAGE, "multi-storage");
22+
23+
// Define storages, jdbc and cassandra
24+
properties.setProperty(MultiStorageConfig.STORAGES, "jdbc,cassandra");
25+
26+
Properties propertiesForJdbc = MultiStorageEnv.getPropertiesForJdbc(testName);
27+
for (String propertyName : propertiesForJdbc.stringPropertyNames()) {
28+
properties.setProperty(
29+
MultiStorageConfig.STORAGES
30+
+ ".jdbc."
31+
+ propertyName.substring(DatabaseConfig.PREFIX.length()),
32+
propertiesForJdbc.getProperty(propertyName));
33+
}
34+
35+
Properties propertiesForCassandra = MultiStorageEnv.getPropertiesForCassandra(testName);
36+
for (String propertyName : propertiesForCassandra.stringPropertyNames()) {
37+
properties.setProperty(
38+
MultiStorageConfig.STORAGES
39+
+ ".cassandra."
40+
+ propertyName.substring(DatabaseConfig.PREFIX.length()),
41+
propertiesForCassandra.getProperty(propertyName));
42+
}
43+
44+
// Define namespace mappings
45+
properties.setProperty(
46+
MultiStorageConfig.NAMESPACE_MAPPING,
47+
NAMESPACE1 + ":jdbc," + NAMESPACE2 + ":jdbc," + NAMESPACE3 + ":cassandra");
48+
49+
// The default storage is jdbc
50+
properties.setProperty(MultiStorageConfig.DEFAULT_STORAGE, "jdbc");
51+
52+
// Add testName as a metadata schema suffix
53+
properties.setProperty(
54+
DatabaseConfig.SYSTEM_NAMESPACE_NAME,
55+
DatabaseConfig.DEFAULT_SYSTEM_NAMESPACE_NAME + "_" + testName);
56+
57+
return properties;
58+
}
59+
60+
@Test
61+
public void mutate_MutationsAcrossStorageGiven_ShouldBehaveCorrectlyBaseOnMutationAtomicityUnit()
62+
throws ExecutionException {
63+
// Arrange
64+
Put put1 =
65+
Put.newBuilder()
66+
.namespace(namespace1)
67+
.table(TABLE1)
68+
.partitionKey(Key.ofInt(COL_NAME1, 0))
69+
.clusteringKey(Key.ofInt(COL_NAME2, 1))
70+
.intValue(COL_NAME3, 1)
71+
.build();
72+
Put put2 =
73+
Put.newBuilder()
74+
.namespace(namespace3)
75+
.table(TABLE1)
76+
.partitionKey(Key.ofInt(COL_NAME1, 1))
77+
.clusteringKey(Key.ofInt(COL_NAME2, 2))
78+
.intValue(COL_NAME3, 2)
79+
.build();
80+
81+
// Act
82+
Exception exception =
83+
Assertions.catchException(() -> storage.mutate(Arrays.asList(put1, put2)));
84+
85+
// Assert
86+
assertThat(exception).isInstanceOf(IllegalArgumentException.class);
87+
}
88+
}

core/src/main/java/com/scalar/db/api/DistributedStorage.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,14 @@ public interface DistributedStorage extends AutoCloseable {
181181
/**
182182
* Mutates entries of the underlying storage with the specified list of {@link Mutation} commands.
183183
*
184+
* <p>Note that this method only supports mutations within the atomicity unit specified by {@link
185+
* StorageInfo#getMutationAtomicityUnit()}. For example, if the atomicity unit of the storage is
186+
* {@link StorageInfo.MutationAtomicityUnit#PARTITION}, the mutations must occur within the same
187+
* partition. Also note that the maximum number of mutations that can be performed atomically is
188+
* defined by {@link StorageInfo#getMaxAtomicMutationsCount()}.
189+
*
190+
* <p>To retrieve storage information, use {@link DistributedStorageAdmin#getStorageInfo(String)}.
191+
*
184192
* @param mutations a list of {@code Mutation} commands
185193
* @throws ExecutionException if the operation fails
186194
*/

core/src/main/java/com/scalar/db/api/DistributedStorageAdmin.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,15 @@ TableMetadata getImportTableMetadata(
8888
void addRawColumnToTable(String namespace, String table, String columnName, DataType columnType)
8989
throws ExecutionException;
9090

91+
/**
92+
* Returns the storage information.
93+
*
94+
* @param namespace the namespace to get the storage information for
95+
* @return the storage information
96+
* @throws ExecutionException if the operation fails
97+
*/
98+
StorageInfo getStorageInfo(String namespace) throws ExecutionException;
99+
91100
/** Closes connections to the storage. */
92101
@Override
93102
void close();
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.scalar.db.api;
2+
3+
public interface StorageInfo {
4+
/**
5+
* Returns the storage name.
6+
*
7+
* @return the storage name
8+
*/
9+
String getStorageName();
10+
11+
/**
12+
* Returns the mutation atomicity unit of the storage.
13+
*
14+
* @return the mutation atomicity unit of the storage
15+
*/
16+
MutationAtomicityUnit getMutationAtomicityUnit();
17+
18+
/**
19+
* Returns the maximum number of mutations that can be performed atomically in the storage.
20+
*
21+
* @return the maximum number of mutations that can be performed atomically in the storage
22+
*/
23+
int getMaxAtomicMutationsCount();
24+
25+
/**
26+
* The mutation atomicity unit of the storage.
27+
*
28+
* <p>This enum defines the atomicity unit for mutations in the storage. It determines the scope
29+
* of atomicity for mutations such as put and delete.
30+
*/
31+
enum MutationAtomicityUnit {
32+
/**
33+
* The atomicity unit is at the record level, meaning that mutations are performed atomically
34+
* for each record.
35+
*/
36+
RECORD,
37+
38+
/**
39+
* The atomicity unit is at the partition level, meaning that mutations are performed atomically
40+
* for each partition.
41+
*/
42+
PARTITION,
43+
44+
/**
45+
* The atomicity unit is at the table level, meaning that mutations are performed atomically for
46+
* each table.
47+
*/
48+
TABLE,
49+
50+
/**
51+
* The atomicity unit is at the namespace level, meaning that mutations are performed atomically
52+
* for each namespace.
53+
*/
54+
NAMESPACE,
55+
56+
/**
57+
* The atomicity unit is at the storage level, meaning that mutations are performed atomically
58+
* for the entire storage.
59+
*/
60+
STORAGE
61+
}
62+
}

core/src/main/java/com/scalar/db/common/CheckedDistributedStorageAdmin.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.scalar.db.common;
22

33
import com.scalar.db.api.DistributedStorageAdmin;
4+
import com.scalar.db.api.StorageInfo;
45
import com.scalar.db.api.TableMetadata;
56
import com.scalar.db.common.error.CoreError;
67
import com.scalar.db.config.DatabaseConfig;
@@ -373,6 +374,20 @@ public void upgrade(Map<String, String> options) throws ExecutionException {
373374
}
374375
}
375376

377+
@Override
378+
public StorageInfo getStorageInfo(String namespace) throws ExecutionException {
379+
if (!namespaceExists(namespace)) {
380+
throw new IllegalArgumentException(CoreError.NAMESPACE_NOT_FOUND.buildMessage(namespace));
381+
}
382+
383+
try {
384+
return admin.getStorageInfo(namespace);
385+
} catch (ExecutionException e) {
386+
throw new ExecutionException(
387+
CoreError.GETTING_STORAGE_INFO_FAILED.buildMessage(namespace), e);
388+
}
389+
}
390+
376391
@Override
377392
public void close() {
378393
admin.close();
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package com.scalar.db.common;
2+
3+
import com.google.common.base.MoreObjects;
4+
import com.scalar.db.api.StorageInfo;
5+
import java.util.Objects;
6+
import javax.annotation.concurrent.Immutable;
7+
8+
@Immutable
9+
public class StorageInfoImpl implements StorageInfo {
10+
11+
private final String storageName;
12+
private final MutationAtomicityUnit mutationAtomicityUnit;
13+
private final int maxAtomicMutationsCount;
14+
15+
public StorageInfoImpl(
16+
String storageName,
17+
MutationAtomicityUnit mutationAtomicityUnit,
18+
int maxAtomicMutationsCount) {
19+
this.storageName = storageName;
20+
this.mutationAtomicityUnit = mutationAtomicityUnit;
21+
this.maxAtomicMutationsCount = maxAtomicMutationsCount;
22+
}
23+
24+
@Override
25+
public String getStorageName() {
26+
return storageName;
27+
}
28+
29+
@Override
30+
public MutationAtomicityUnit getMutationAtomicityUnit() {
31+
return mutationAtomicityUnit;
32+
}
33+
34+
@Override
35+
public int getMaxAtomicMutationsCount() {
36+
return maxAtomicMutationsCount;
37+
}
38+
39+
@Override
40+
public boolean equals(Object o) {
41+
if (this == o) {
42+
return true;
43+
}
44+
if (!(o instanceof StorageInfoImpl)) {
45+
return false;
46+
}
47+
StorageInfoImpl that = (StorageInfoImpl) o;
48+
return getMaxAtomicMutationsCount() == that.getMaxAtomicMutationsCount()
49+
&& Objects.equals(getStorageName(), that.getStorageName())
50+
&& getMutationAtomicityUnit() == that.getMutationAtomicityUnit();
51+
}
52+
53+
@Override
54+
public int hashCode() {
55+
return Objects.hash(getStorageName(), getMutationAtomicityUnit(), getMaxAtomicMutationsCount());
56+
}
57+
58+
@Override
59+
public String toString() {
60+
return MoreObjects.toStringHelper(this)
61+
.add("storageName", storageName)
62+
.add("mutationAtomicityUnit", mutationAtomicityUnit)
63+
.add("maxAtomicMutationsCount", maxAtomicMutationsCount)
64+
.toString();
65+
}
66+
}

0 commit comments

Comments
 (0)