Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.scalar.db.storage.cassandra;

import com.scalar.db.api.DistributedStorageMutationAtomicityUnitIntegrationTestBase;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

public class CassandraMutationAtomicityUnitIntegrationTest
extends DistributedStorageMutationAtomicityUnitIntegrationTestBase {

@Override
protected Properties getProperties(String testName) {
return CassandraEnv.getProperties(testName);
}

@Override
protected Map<String, String> getCreationOptions() {
return Collections.singletonMap(CassandraAdmin.REPLICATION_FACTOR, "1");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.scalar.db.storage.cosmos;

import com.scalar.db.api.DistributedStorageMutationAtomicityUnitIntegrationTestBase;
import java.util.Map;
import java.util.Properties;
import org.junit.jupiter.api.Disabled;

public class CosmosMutationAtomicityUnitIntegrationTest
extends DistributedStorageMutationAtomicityUnitIntegrationTestBase {

@Override
protected Properties getProperties(String testName) {
return CosmosEnv.getProperties(testName);
}

@Override
protected Map<String, String> getCreationOptions() {
return CosmosEnv.getCreationOptions();
}

@Disabled("This test fails. It might be a bug")
@Override
public void
mutate_MutationsWithinRecordGiven_ShouldBehaveCorrectlyBaseOnMutationAtomicityUnit() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.scalar.db.storage.dynamo;

import com.scalar.db.api.DistributedStorageMutationAtomicityUnitIntegrationTestBase;
import java.util.Map;
import java.util.Properties;
import org.junit.jupiter.api.Disabled;

public class DynamoMutationAtomicityUnitIntegrationTest
extends DistributedStorageMutationAtomicityUnitIntegrationTestBase {

@Override
protected Properties getProperties(String testName) {
return DynamoEnv.getProperties(testName);
}

@Override
protected Map<String, String> getCreationOptions() {
return DynamoEnv.getCreationOptions();
}

@Disabled("Transaction request cannot include multiple operations on one item in DynamoDB")
@Override
public void
mutate_MutationsWithinRecordGiven_ShouldBehaveCorrectlyBaseOnMutationAtomicityUnit() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.scalar.db.storage.jdbc;

import com.scalar.db.api.DistributedStorageMutationAtomicityUnitIntegrationTestBase;
import java.util.Properties;

public class JdbcDatabaseMutationAtomicityUnitIntegrationTest
extends DistributedStorageMutationAtomicityUnitIntegrationTestBase {

@Override
protected Properties getProperties(String testName) {
return JdbcEnv.getProperties(testName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.scalar.db.storage.multistorage;

import static org.assertj.core.api.Assertions.assertThat;

import com.scalar.db.api.DistributedStorageMutationAtomicityUnitIntegrationTestBase;
import com.scalar.db.api.Put;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.io.Key;
import java.util.Arrays;
import java.util.Properties;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class MultiStorageMutationAtomicityUnitIntegrationTest
extends DistributedStorageMutationAtomicityUnitIntegrationTestBase {

@Override
public Properties getProperties(String testName) {
Properties properties = new Properties();
properties.setProperty(DatabaseConfig.STORAGE, "multi-storage");

// Define storages, jdbc and cassandra
properties.setProperty(MultiStorageConfig.STORAGES, "jdbc,cassandra");

Properties propertiesForJdbc = MultiStorageEnv.getPropertiesForJdbc(testName);
for (String propertyName : propertiesForJdbc.stringPropertyNames()) {
properties.setProperty(
MultiStorageConfig.STORAGES
+ ".jdbc."
+ propertyName.substring(DatabaseConfig.PREFIX.length()),
propertiesForJdbc.getProperty(propertyName));
}

Properties propertiesForCassandra = MultiStorageEnv.getPropertiesForCassandra(testName);
for (String propertyName : propertiesForCassandra.stringPropertyNames()) {
properties.setProperty(
MultiStorageConfig.STORAGES
+ ".cassandra."
+ propertyName.substring(DatabaseConfig.PREFIX.length()),
propertiesForCassandra.getProperty(propertyName));
}

// Define namespace mappings
properties.setProperty(
MultiStorageConfig.NAMESPACE_MAPPING,
NAMESPACE1 + ":jdbc," + NAMESPACE2 + ":jdbc," + NAMESPACE3 + ":cassandra");

// The default storage is jdbc
properties.setProperty(MultiStorageConfig.DEFAULT_STORAGE, "jdbc");

return properties;
}

@Test
public void mutate_MutationsAcrossStorageGiven_ShouldBehaveCorrectlyBaseOnMutationAtomicityUnit()
throws ExecutionException {
// Arrange
Put put1 =
Put.newBuilder()
.namespace(namespace1)
.table(TABLE1)
.partitionKey(Key.ofInt(COL_NAME1, 0))
.clusteringKey(Key.ofInt(COL_NAME2, 1))
.intValue(COL_NAME3, 1)
.build();
Put put2 =
Put.newBuilder()
.namespace(namespace3)
.table(TABLE1)
.partitionKey(Key.ofInt(COL_NAME1, 1))
.clusteringKey(Key.ofInt(COL_NAME2, 2))
.intValue(COL_NAME3, 2)
.build();

// Act
Exception exception =
Assertions.catchException(() -> storage.mutate(Arrays.asList(put1, put2)));

// Assert
assertThat(exception).isInstanceOf(IllegalArgumentException.class);
}
}
8 changes: 8 additions & 0 deletions core/src/main/java/com/scalar/db/api/DistributedStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,14 @@ public interface DistributedStorage extends AutoCloseable {
/**
* Mutates entries of the underlying storage with the specified list of {@link Mutation} commands.
*
* <p>Note that this method only supports mutations within the atomicity unit specified by {@link
* StorageInfo#getMutationAtomicityUnit()}. For example, if the atomicity unit of the storage is
* {@link StorageInfo.MutationAtomicityUnit#PARTITION}, the mutations must occur within the same
* partition. Also note that the maximum number of mutations that can be performed atomically is
* defined by {@link StorageInfo#getMaxAtomicMutationsCount()}.
*
* <p>To retrieve storage information, use {@link DistributedStorageAdmin#getStorageInfo(String)}.
*
* @param mutations a list of {@code Mutation} commands
* @throws ExecutionException if the operation fails
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ TableMetadata getImportTableMetadata(
void addRawColumnToTable(String namespace, String table, String columnName, DataType columnType)
throws ExecutionException;

/**
* Returns the storage information.
*
* @param namespace the namespace to get the storage information for
* @return the storage information
* @throws ExecutionException if the operation fails
*/
StorageInfo getStorageInfo(String namespace) throws ExecutionException;

/** Closes connections to the storage. */
@Override
void close();
Expand Down
62 changes: 62 additions & 0 deletions core/src/main/java/com/scalar/db/api/StorageInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.scalar.db.api;

public interface StorageInfo {
/**
* Returns the storage name.
*
* @return the storage name
*/
String getStorageName();

/**
* Returns the mutation atomicity unit of the storage.
*
* @return the mutation atomicity unit of the storage
*/
MutationAtomicityUnit getMutationAtomicityUnit();

/**
* Returns the maximum number of mutations that can be performed atomically in the storage.
*
* @return the maximum number of mutations that can be performed atomically in the storage
*/
int getMaxAtomicMutationsCount();

/**
* The mutation atomicity unit of the storage.
*
* <p>This enum defines the atomicity unit for mutations in the storage. It determines the scope
* of atomicity for mutations such as put and delete.
*/
enum MutationAtomicityUnit {
/**
* The atomicity unit is at the record level, meaning that mutations are performed atomically
* for each record.
*/
RECORD,

/**
* The atomicity unit is at the partition level, meaning that mutations are performed atomically
* for each partition.
*/
PARTITION,

/**
* The atomicity unit is at the table level, meaning that mutations are performed atomically for
* each table.
*/
TABLE,

/**
* The atomicity unit is at the namespace level, meaning that mutations are performed atomically
* for each namespace.
*/
NAMESPACE,

/**
* The atomicity unit is at the storage level, meaning that mutations are performed atomically
* for the entire storage.
*/
STORAGE
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.scalar.db.common;

import com.scalar.db.api.DistributedStorageAdmin;
import com.scalar.db.api.StorageInfo;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.common.error.CoreError;
import com.scalar.db.exception.storage.ExecutionException;
Expand Down Expand Up @@ -352,6 +353,20 @@ public Set<String> getNamespaceNames() throws ExecutionException {
}
}

@Override
public StorageInfo getStorageInfo(String namespace) throws ExecutionException {
if (!namespaceExists(namespace)) {
throw new IllegalArgumentException(CoreError.NAMESPACE_NOT_FOUND.buildMessage(namespace));
}

try {
return admin.getStorageInfo(namespace);
} catch (ExecutionException e) {
throw new ExecutionException(
CoreError.GETTING_STORAGE_INFO_FAILED.buildMessage(namespace), e);
}
}

@Override
public void close() {
admin.close();
Expand Down
66 changes: 66 additions & 0 deletions core/src/main/java/com/scalar/db/common/StorageInfoImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.scalar.db.common;

import com.google.common.base.MoreObjects;
import com.scalar.db.api.StorageInfo;
import java.util.Objects;
import javax.annotation.concurrent.Immutable;

@Immutable
public class StorageInfoImpl implements StorageInfo {

private final String storageName;
private final MutationAtomicityUnit mutationAtomicityUnit;
private final int maxAtomicMutationsCount;

public StorageInfoImpl(
String storageName,
MutationAtomicityUnit mutationAtomicityUnit,
int maxAtomicMutationsCount) {
this.storageName = storageName;
this.mutationAtomicityUnit = mutationAtomicityUnit;
this.maxAtomicMutationsCount = maxAtomicMutationsCount;
}

@Override
public String getStorageName() {
return storageName;
}

@Override
public MutationAtomicityUnit getMutationAtomicityUnit() {
return mutationAtomicityUnit;
}

@Override
public int getMaxAtomicMutationsCount() {
return maxAtomicMutationsCount;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof StorageInfoImpl)) {
return false;
}
StorageInfoImpl that = (StorageInfoImpl) o;
return getMaxAtomicMutationsCount() == that.getMaxAtomicMutationsCount()
&& Objects.equals(getStorageName(), that.getStorageName())
&& getMutationAtomicityUnit() == that.getMutationAtomicityUnit();
}

@Override
public int hashCode() {
return Objects.hash(getStorageName(), getMutationAtomicityUnit(), getMaxAtomicMutationsCount());
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("storageName", storageName)
.add("mutationAtomicityUnit", mutationAtomicityUnit)
.add("maxAtomicMutationsCount", maxAtomicMutationsCount)
.toString();
}
}
Loading