From 8ca4fada79b53310c7c7279c5affcceaa81bbc39 Mon Sep 17 00:00:00 2001 From: brfrn169 Date: Wed, 11 Jun 2025 15:53:10 +0900 Subject: [PATCH 1/3] Introduce StorageInfo --- ...aMutationAtomicityUnitIntegrationTest.java | 20 + ...sMutationAtomicityUnitIntegrationTest.java | 25 + ...oMutationAtomicityUnitIntegrationTest.java | 25 + ...eMutationAtomicityUnitIntegrationTest.java | 13 + ...eMutationAtomicityUnitIntegrationTest.java | 88 +++ .../com/scalar/db/api/DistributedStorage.java | 8 + .../db/api/DistributedStorageAdmin.java | 9 + .../java/com/scalar/db/api/StorageInfo.java | 62 ++ .../CheckedDistributedStorageAdmin.java | 15 + .../com/scalar/db/common/StorageInfoImpl.java | 64 ++ .../scalar/db/common/StorageInfoProvider.java | 42 ++ .../db/common/checker/OperationChecker.java | 87 ++- .../com/scalar/db/common/error/CoreError.java | 32 +- .../com/scalar/db/service/AdminService.java | 6 + .../db/storage/cassandra/Cassandra.java | 9 +- .../db/storage/cassandra/CassandraAdmin.java | 14 + .../com/scalar/db/storage/cosmos/Cosmos.java | 9 +- .../scalar/db/storage/cosmos/CosmosAdmin.java | 13 + .../cosmos/CosmosOperationChecker.java | 7 +- .../com/scalar/db/storage/dynamo/Dynamo.java | 9 +- .../scalar/db/storage/dynamo/DynamoAdmin.java | 13 + .../dynamo/DynamoOperationChecker.java | 7 +- .../com/scalar/db/storage/jdbc/JdbcAdmin.java | 13 + .../scalar/db/storage/jdbc/JdbcDatabase.java | 10 +- .../db/storage/multistorage/MultiStorage.java | 6 +- .../multistorage/MultiStorageAdmin.java | 83 ++- .../jdbc/JdbcTransactionManager.java | 10 +- .../common/checker/OperationCheckerTest.java | 237 +++++++- .../cosmos/CosmosOperationCheckerTest.java | 12 +- .../dynamo/DynamoOperationCheckerTest.java | 11 +- .../multistorage/MultiStorageAdminTest.java | 109 +++- ...DistributedStorageIntegrationTestBase.java | 118 ---- ...ationAtomicityUnitIntegrationTestBase.java | 564 ++++++++++++++++++ 33 files changed, 1539 insertions(+), 211 deletions(-) create mode 100644 core/src/integration-test/java/com/scalar/db/storage/cassandra/CassandraMutationAtomicityUnitIntegrationTest.java create mode 100644 core/src/integration-test/java/com/scalar/db/storage/cosmos/CosmosMutationAtomicityUnitIntegrationTest.java create mode 100644 core/src/integration-test/java/com/scalar/db/storage/dynamo/DynamoMutationAtomicityUnitIntegrationTest.java create mode 100644 core/src/integration-test/java/com/scalar/db/storage/jdbc/JdbcDatabaseMutationAtomicityUnitIntegrationTest.java create mode 100644 core/src/integration-test/java/com/scalar/db/storage/multistorage/MultiStorageMutationAtomicityUnitIntegrationTest.java create mode 100644 core/src/main/java/com/scalar/db/api/StorageInfo.java create mode 100644 core/src/main/java/com/scalar/db/common/StorageInfoImpl.java create mode 100644 core/src/main/java/com/scalar/db/common/StorageInfoProvider.java create mode 100644 integration-test/src/main/java/com/scalar/db/api/DistributedStorageMutationAtomicityUnitIntegrationTestBase.java diff --git a/core/src/integration-test/java/com/scalar/db/storage/cassandra/CassandraMutationAtomicityUnitIntegrationTest.java b/core/src/integration-test/java/com/scalar/db/storage/cassandra/CassandraMutationAtomicityUnitIntegrationTest.java new file mode 100644 index 0000000000..1a9af7ebf0 --- /dev/null +++ b/core/src/integration-test/java/com/scalar/db/storage/cassandra/CassandraMutationAtomicityUnitIntegrationTest.java @@ -0,0 +1,20 @@ +package com.scalar.db.storage.cassandra; + +import com.scalar.db.api.DistributedStorageMutationAtomicityUnitIntegrationTestBase; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; + +public class CassandraMutationAtomicityUnitIntegrationTest + extends DistributedStorageMutationAtomicityUnitIntegrationTestBase { + + @Override + protected Properties getProperties(String testName) { + return CassandraEnv.getProperties(testName); + } + + @Override + protected Map getCreationOptions() { + return Collections.singletonMap(CassandraAdmin.REPLICATION_FACTOR, "1"); + } +} diff --git a/core/src/integration-test/java/com/scalar/db/storage/cosmos/CosmosMutationAtomicityUnitIntegrationTest.java b/core/src/integration-test/java/com/scalar/db/storage/cosmos/CosmosMutationAtomicityUnitIntegrationTest.java new file mode 100644 index 0000000000..7cb3dc8913 --- /dev/null +++ b/core/src/integration-test/java/com/scalar/db/storage/cosmos/CosmosMutationAtomicityUnitIntegrationTest.java @@ -0,0 +1,25 @@ +package com.scalar.db.storage.cosmos; + +import com.scalar.db.api.DistributedStorageMutationAtomicityUnitIntegrationTestBase; +import java.util.Map; +import java.util.Properties; +import org.junit.jupiter.api.Disabled; + +public class CosmosMutationAtomicityUnitIntegrationTest + extends DistributedStorageMutationAtomicityUnitIntegrationTestBase { + + @Override + protected Properties getProperties(String testName) { + return CosmosEnv.getProperties(testName); + } + + @Override + protected Map getCreationOptions() { + return CosmosEnv.getCreationOptions(); + } + + @Disabled("This test fails. It might be a bug") + @Override + public void + mutate_MutationsWithinRecordGiven_ShouldBehaveCorrectlyBaseOnMutationAtomicityUnit() {} +} diff --git a/core/src/integration-test/java/com/scalar/db/storage/dynamo/DynamoMutationAtomicityUnitIntegrationTest.java b/core/src/integration-test/java/com/scalar/db/storage/dynamo/DynamoMutationAtomicityUnitIntegrationTest.java new file mode 100644 index 0000000000..b16a33a982 --- /dev/null +++ b/core/src/integration-test/java/com/scalar/db/storage/dynamo/DynamoMutationAtomicityUnitIntegrationTest.java @@ -0,0 +1,25 @@ +package com.scalar.db.storage.dynamo; + +import com.scalar.db.api.DistributedStorageMutationAtomicityUnitIntegrationTestBase; +import java.util.Map; +import java.util.Properties; +import org.junit.jupiter.api.Disabled; + +public class DynamoMutationAtomicityUnitIntegrationTest + extends DistributedStorageMutationAtomicityUnitIntegrationTestBase { + + @Override + protected Properties getProperties(String testName) { + return DynamoEnv.getProperties(testName); + } + + @Override + protected Map getCreationOptions() { + return DynamoEnv.getCreationOptions(); + } + + @Disabled("Transaction request cannot include multiple operations on one item in DynamoDB") + @Override + public void + mutate_MutationsWithinRecordGiven_ShouldBehaveCorrectlyBaseOnMutationAtomicityUnit() {} +} diff --git a/core/src/integration-test/java/com/scalar/db/storage/jdbc/JdbcDatabaseMutationAtomicityUnitIntegrationTest.java b/core/src/integration-test/java/com/scalar/db/storage/jdbc/JdbcDatabaseMutationAtomicityUnitIntegrationTest.java new file mode 100644 index 0000000000..eb01cb4ab8 --- /dev/null +++ b/core/src/integration-test/java/com/scalar/db/storage/jdbc/JdbcDatabaseMutationAtomicityUnitIntegrationTest.java @@ -0,0 +1,13 @@ +package com.scalar.db.storage.jdbc; + +import com.scalar.db.api.DistributedStorageMutationAtomicityUnitIntegrationTestBase; +import java.util.Properties; + +public class JdbcDatabaseMutationAtomicityUnitIntegrationTest + extends DistributedStorageMutationAtomicityUnitIntegrationTestBase { + + @Override + protected Properties getProperties(String testName) { + return JdbcEnv.getProperties(testName); + } +} diff --git a/core/src/integration-test/java/com/scalar/db/storage/multistorage/MultiStorageMutationAtomicityUnitIntegrationTest.java b/core/src/integration-test/java/com/scalar/db/storage/multistorage/MultiStorageMutationAtomicityUnitIntegrationTest.java new file mode 100644 index 0000000000..d97b9fbe47 --- /dev/null +++ b/core/src/integration-test/java/com/scalar/db/storage/multistorage/MultiStorageMutationAtomicityUnitIntegrationTest.java @@ -0,0 +1,88 @@ +package com.scalar.db.storage.multistorage; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.scalar.db.api.DistributedStorageMutationAtomicityUnitIntegrationTestBase; +import com.scalar.db.api.Put; +import com.scalar.db.config.DatabaseConfig; +import com.scalar.db.exception.storage.ExecutionException; +import com.scalar.db.io.Key; +import java.util.Arrays; +import java.util.Properties; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +public class MultiStorageMutationAtomicityUnitIntegrationTest + extends DistributedStorageMutationAtomicityUnitIntegrationTestBase { + + @Override + public Properties getProperties(String testName) { + Properties properties = new Properties(); + properties.setProperty(DatabaseConfig.STORAGE, "multi-storage"); + + // Define storages, jdbc and cassandra + properties.setProperty(MultiStorageConfig.STORAGES, "jdbc,cassandra"); + + Properties propertiesForJdbc = MultiStorageEnv.getPropertiesForJdbc(testName); + for (String propertyName : propertiesForJdbc.stringPropertyNames()) { + properties.setProperty( + MultiStorageConfig.STORAGES + + ".jdbc." + + propertyName.substring(DatabaseConfig.PREFIX.length()), + propertiesForJdbc.getProperty(propertyName)); + } + + Properties propertiesForCassandra = MultiStorageEnv.getPropertiesForCassandra(testName); + for (String propertyName : propertiesForCassandra.stringPropertyNames()) { + properties.setProperty( + MultiStorageConfig.STORAGES + + ".cassandra." + + propertyName.substring(DatabaseConfig.PREFIX.length()), + propertiesForCassandra.getProperty(propertyName)); + } + + // Define namespace mappings + properties.setProperty( + MultiStorageConfig.NAMESPACE_MAPPING, + NAMESPACE1 + ":jdbc," + NAMESPACE2 + ":jdbc," + NAMESPACE3 + ":cassandra"); + + // The default storage is jdbc + properties.setProperty(MultiStorageConfig.DEFAULT_STORAGE, "jdbc"); + + // Add testName as a metadata schema suffix + properties.setProperty( + DatabaseConfig.SYSTEM_NAMESPACE_NAME, + DatabaseConfig.DEFAULT_SYSTEM_NAMESPACE_NAME + "_" + testName); + + return properties; + } + + @Test + public void mutate_MutationsAcrossStorageGiven_ShouldBehaveCorrectlyBaseOnMutationAtomicityUnit() + throws ExecutionException { + // Arrange + Put put1 = + Put.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 0)) + .clusteringKey(Key.ofInt(COL_NAME2, 1)) + .intValue(COL_NAME3, 1) + .build(); + Put put2 = + Put.newBuilder() + .namespace(namespace3) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 1)) + .clusteringKey(Key.ofInt(COL_NAME2, 2)) + .intValue(COL_NAME3, 2) + .build(); + + // Act + Exception exception = + Assertions.catchException(() -> storage.mutate(Arrays.asList(put1, put2))); + + // Assert + assertThat(exception).isInstanceOf(IllegalArgumentException.class); + } +} diff --git a/core/src/main/java/com/scalar/db/api/DistributedStorage.java b/core/src/main/java/com/scalar/db/api/DistributedStorage.java index 5f7e6e16ed..531ad976c5 100644 --- a/core/src/main/java/com/scalar/db/api/DistributedStorage.java +++ b/core/src/main/java/com/scalar/db/api/DistributedStorage.java @@ -181,6 +181,14 @@ public interface DistributedStorage extends AutoCloseable { /** * Mutates entries of the underlying storage with the specified list of {@link Mutation} commands. * + *

Note that this method only supports mutations within the atomicity unit specified by {@link + * StorageInfo#getMutationAtomicityUnit()}. For example, if the atomicity unit of the storage is + * {@link StorageInfo.MutationAtomicityUnit#PARTITION}, the mutations must occur within the same + * partition. Also note that the maximum number of mutations that can be performed atomically is + * defined by {@link StorageInfo#getMaxAtomicMutationCount()}. + * + *

To retrieve storage information, use {@link DistributedStorageAdmin#getStorageInfo(String)}. + * * @param mutations a list of {@code Mutation} commands * @throws ExecutionException if the operation fails */ diff --git a/core/src/main/java/com/scalar/db/api/DistributedStorageAdmin.java b/core/src/main/java/com/scalar/db/api/DistributedStorageAdmin.java index 2643cb4f57..32c239a590 100644 --- a/core/src/main/java/com/scalar/db/api/DistributedStorageAdmin.java +++ b/core/src/main/java/com/scalar/db/api/DistributedStorageAdmin.java @@ -88,6 +88,15 @@ TableMetadata getImportTableMetadata( void addRawColumnToTable(String namespace, String table, String columnName, DataType columnType) throws ExecutionException; + /** + * Returns the storage information. + * + * @param namespace the namespace to get the storage information for + * @return the storage information + * @throws ExecutionException if the operation fails + */ + StorageInfo getStorageInfo(String namespace) throws ExecutionException; + /** Closes connections to the storage. */ @Override void close(); diff --git a/core/src/main/java/com/scalar/db/api/StorageInfo.java b/core/src/main/java/com/scalar/db/api/StorageInfo.java new file mode 100644 index 0000000000..b2453e349a --- /dev/null +++ b/core/src/main/java/com/scalar/db/api/StorageInfo.java @@ -0,0 +1,62 @@ +package com.scalar.db.api; + +public interface StorageInfo { + /** + * Returns the storage name. + * + * @return the storage name + */ + String getStorageName(); + + /** + * Returns the mutation atomicity unit of the storage. + * + * @return the mutation atomicity unit of the storage + */ + MutationAtomicityUnit getMutationAtomicityUnit(); + + /** + * Returns the maximum number of mutations that can be performed atomically in the storage. + * + * @return the maximum number of mutations that can be performed atomically in the storage + */ + int getMaxAtomicMutationCount(); + + /** + * The mutation atomicity unit of the storage. + * + *

This enum defines the atomicity unit for mutations in the storage. It determines the scope + * of atomicity for mutations such as put and delete. + */ + enum MutationAtomicityUnit { + /** + * The atomicity unit is at the record level, meaning that mutations are performed atomically + * for each record. + */ + RECORD, + + /** + * The atomicity unit is at the partition level, meaning that mutations are performed atomically + * for each partition. + */ + PARTITION, + + /** + * The atomicity unit is at the table level, meaning that mutations are performed atomically for + * each table. + */ + TABLE, + + /** + * The atomicity unit is at the namespace level, meaning that mutations are performed atomically + * for each namespace. + */ + NAMESPACE, + + /** + * The atomicity unit is at the storage level, meaning that mutations are performed atomically + * for the entire storage. + */ + STORAGE + } +} diff --git a/core/src/main/java/com/scalar/db/common/CheckedDistributedStorageAdmin.java b/core/src/main/java/com/scalar/db/common/CheckedDistributedStorageAdmin.java index 462d02132d..51aedf8e92 100644 --- a/core/src/main/java/com/scalar/db/common/CheckedDistributedStorageAdmin.java +++ b/core/src/main/java/com/scalar/db/common/CheckedDistributedStorageAdmin.java @@ -1,6 +1,7 @@ package com.scalar.db.common; import com.scalar.db.api.DistributedStorageAdmin; +import com.scalar.db.api.StorageInfo; import com.scalar.db.api.TableMetadata; import com.scalar.db.common.error.CoreError; import com.scalar.db.config.DatabaseConfig; @@ -373,6 +374,20 @@ public void upgrade(Map options) throws ExecutionException { } } + @Override + public StorageInfo getStorageInfo(String namespace) throws ExecutionException { + if (!namespaceExists(namespace)) { + throw new IllegalArgumentException(CoreError.NAMESPACE_NOT_FOUND.buildMessage(namespace)); + } + + try { + return admin.getStorageInfo(namespace); + } catch (ExecutionException e) { + throw new ExecutionException( + CoreError.GETTING_STORAGE_INFO_FAILED.buildMessage(namespace), e); + } + } + @Override public void close() { admin.close(); diff --git a/core/src/main/java/com/scalar/db/common/StorageInfoImpl.java b/core/src/main/java/com/scalar/db/common/StorageInfoImpl.java new file mode 100644 index 0000000000..3bc989e8be --- /dev/null +++ b/core/src/main/java/com/scalar/db/common/StorageInfoImpl.java @@ -0,0 +1,64 @@ +package com.scalar.db.common; + +import com.google.common.base.MoreObjects; +import com.scalar.db.api.StorageInfo; +import java.util.Objects; +import javax.annotation.concurrent.Immutable; + +@Immutable +public class StorageInfoImpl implements StorageInfo { + + private final String storageName; + private final MutationAtomicityUnit mutationAtomicityUnit; + private final int maxAtomicMutationCount; + + public StorageInfoImpl( + String storageName, MutationAtomicityUnit mutationAtomicityUnit, int maxAtomicMutationCount) { + this.storageName = storageName; + this.mutationAtomicityUnit = mutationAtomicityUnit; + this.maxAtomicMutationCount = maxAtomicMutationCount; + } + + @Override + public String getStorageName() { + return storageName; + } + + @Override + public MutationAtomicityUnit getMutationAtomicityUnit() { + return mutationAtomicityUnit; + } + + @Override + public int getMaxAtomicMutationCount() { + return maxAtomicMutationCount; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof StorageInfoImpl)) { + return false; + } + StorageInfoImpl that = (StorageInfoImpl) o; + return getMaxAtomicMutationCount() == that.getMaxAtomicMutationCount() + && Objects.equals(getStorageName(), that.getStorageName()) + && getMutationAtomicityUnit() == that.getMutationAtomicityUnit(); + } + + @Override + public int hashCode() { + return Objects.hash(getStorageName(), getMutationAtomicityUnit(), getMaxAtomicMutationCount()); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("storageName", storageName) + .add("mutationAtomicityUnit", mutationAtomicityUnit) + .add("maxAtomicMutationCount", maxAtomicMutationCount) + .toString(); + } +} diff --git a/core/src/main/java/com/scalar/db/common/StorageInfoProvider.java b/core/src/main/java/com/scalar/db/common/StorageInfoProvider.java new file mode 100644 index 0000000000..c05e6fd6f5 --- /dev/null +++ b/core/src/main/java/com/scalar/db/common/StorageInfoProvider.java @@ -0,0 +1,42 @@ +package com.scalar.db.common; + +import com.scalar.db.api.DistributedStorageAdmin; +import com.scalar.db.api.StorageInfo; +import com.scalar.db.exception.storage.ExecutionException; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import javax.annotation.concurrent.ThreadSafe; + +@ThreadSafe +public class StorageInfoProvider { + + private final DistributedStorageAdmin admin; + + // Cache to store storage information. A map namespace to StorageInfo. + private final ConcurrentMap storageInfoCache = new ConcurrentHashMap<>(); + + @SuppressFBWarnings("EI_EXPOSE_REP2") + public StorageInfoProvider(DistributedStorageAdmin admin) { + this.admin = admin; + } + + public StorageInfo getStorageInfo(String namespace) throws ExecutionException { + try { + return storageInfoCache.computeIfAbsent(namespace, this::getStorageInfoInternal); + } catch (RuntimeException e) { + if (e.getCause() instanceof ExecutionException) { + throw (ExecutionException) e.getCause(); + } + throw e; + } + } + + private StorageInfo getStorageInfoInternal(String namespace) { + try { + return admin.getStorageInfo(namespace); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } +} diff --git a/core/src/main/java/com/scalar/db/common/checker/OperationChecker.java b/core/src/main/java/com/scalar/db/common/checker/OperationChecker.java index 0e68dacc3f..869556f316 100644 --- a/core/src/main/java/com/scalar/db/common/checker/OperationChecker.java +++ b/core/src/main/java/com/scalar/db/common/checker/OperationChecker.java @@ -11,7 +11,9 @@ import com.scalar.db.api.ScanAll; import com.scalar.db.api.Selection; import com.scalar.db.api.Selection.Conjunction; +import com.scalar.db.api.StorageInfo; import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.StorageInfoProvider; import com.scalar.db.common.TableMetadataManager; import com.scalar.db.common.error.CoreError; import com.scalar.db.config.DatabaseConfig; @@ -31,10 +33,15 @@ public class OperationChecker { private final DatabaseConfig config; private final TableMetadataManager tableMetadataManager; + private final StorageInfoProvider storageInfoProvider; - public OperationChecker(DatabaseConfig config, TableMetadataManager tableMetadataManager) { + public OperationChecker( + DatabaseConfig config, + TableMetadataManager tableMetadataManager, + StorageInfoProvider storageInfoProvider) { this.config = config; this.tableMetadataManager = tableMetadataManager; + this.storageInfoProvider = storageInfoProvider; } public void check(Get get) throws ExecutionException { @@ -334,16 +341,18 @@ public void check(List mutations) throws ExecutionException } Mutation first = mutations.get(0); + assert first.forNamespace().isPresent(); + StorageInfo storageInfoForFirst = + storageInfoProvider.getStorageInfo(first.forNamespace().get()); + for (Mutation mutation : mutations) { // Check if each mutation is Put or Delete checkMutationType(mutation); - // Check if all mutations are for the same partition - if (!mutation.forNamespace().equals(first.forNamespace()) - || !mutation.forTable().equals(first.forTable()) - || !mutation.getPartitionKey().equals(first.getPartitionKey())) { + // Check if the mutations are within the atomicity unit of the storage + if (isOutOfAtomicityUnit(first, storageInfoForFirst, mutation)) { throw new IllegalArgumentException( - CoreError.OPERATION_CHECK_ERROR_MULTI_PARTITION_MUTATION.buildMessage(mutations)); + getErrorMessageForOutOfAtomicityUnit(storageInfoForFirst, mutations)); } } @@ -357,6 +366,72 @@ public void check(List mutations) throws ExecutionException } } + private boolean isOutOfAtomicityUnit( + Mutation mutation1, StorageInfo storageInfo1, Mutation mutation2) throws ExecutionException { + assert mutation1.forNamespace().isPresent() + && mutation1.forTable().isPresent() + && mutation2.forNamespace().isPresent() + && mutation2.forTable().isPresent(); + + switch (storageInfo1.getMutationAtomicityUnit()) { + case RECORD: + if (!mutation1.getClusteringKey().equals(mutation2.getClusteringKey())) { + return true; // Different clustering keys + } + // Fall through + case PARTITION: + if (!mutation1.getPartitionKey().equals(mutation2.getPartitionKey())) { + return true; // Different partition keys + } + // Fall through + case TABLE: + if (!mutation1.forTable().equals(mutation2.forTable())) { + return true; // Different tables + } + // Fall through + case NAMESPACE: + if (!mutation1.forNamespace().equals(mutation2.forNamespace())) { + return true; // Different namespaces + } + break; + case STORAGE: + StorageInfo storageInfo2 = + storageInfoProvider.getStorageInfo(mutation2.forNamespace().get()); + if (!storageInfo1.getStorageName().equals(storageInfo2.getStorageName())) { + return true; // Different storage names + } + break; + default: + throw new AssertionError( + "Unknown mutation atomicity unit: " + storageInfo1.getMutationAtomicityUnit()); + } + + return false; + } + + private String getErrorMessageForOutOfAtomicityUnit( + StorageInfo storageInfo, List mutations) { + switch (storageInfo.getMutationAtomicityUnit()) { + case RECORD: + return CoreError.OPERATION_CHECK_ERROR_MULTI_RECORD_MUTATION.buildMessage( + storageInfo.getStorageName(), mutations); + case PARTITION: + return CoreError.OPERATION_CHECK_ERROR_MULTI_PARTITION_MUTATION.buildMessage( + storageInfo.getStorageName(), mutations); + case TABLE: + return CoreError.OPERATION_CHECK_ERROR_MULTI_TABLE_MUTATION.buildMessage( + storageInfo.getStorageName(), mutations); + case NAMESPACE: + return CoreError.OPERATION_CHECK_ERROR_MULTI_NAMESPACE_MUTATION.buildMessage( + storageInfo.getStorageName(), mutations); + case STORAGE: + return CoreError.OPERATION_CHECK_ERROR_MULTI_STORAGE_MUTATION.buildMessage(mutations); + default: + throw new AssertionError( + "Unknown mutation atomicity unit: " + storageInfo.getMutationAtomicityUnit()); + } + } + private void checkMutationType(Mutation mutation) { if (!(mutation instanceof Put) && !(mutation instanceof Delete)) { throw new IllegalArgumentException( diff --git a/core/src/main/java/com/scalar/db/common/error/CoreError.java b/core/src/main/java/com/scalar/db/common/error/CoreError.java index 4244b52064..d7d69c5495 100644 --- a/core/src/main/java/com/scalar/db/common/error/CoreError.java +++ b/core/src/main/java/com/scalar/db/common/error/CoreError.java @@ -100,7 +100,7 @@ public enum CoreError implements ScalarDbError { OPERATION_CHECK_ERROR_MULTI_PARTITION_MUTATION( Category.USER_ERROR, "0019", - "Mutations that span multiple partitions are not supported. Mutations: %s", + "The storage does not support mutations across multiple partitions. Storage: %s; Mutations: %s", "", ""), OPERATION_CHECK_ERROR_PARTITION_KEY( @@ -936,6 +936,30 @@ public enum CoreError implements ScalarDbError { "Mutations are not allowed in read-only transactions. Transaction ID: %s", "", ""), + OPERATION_CHECK_ERROR_MULTI_RECORD_MUTATION( + Category.USER_ERROR, + "0212", + "The storage does not support mutations across multiple records. Storage: %s; Mutations: %s", + "", + ""), + OPERATION_CHECK_ERROR_MULTI_TABLE_MUTATION( + Category.USER_ERROR, + "0213", + "The storage does not support mutations across multiple tables. Storage: %s; Mutations: %s", + "", + ""), + OPERATION_CHECK_ERROR_MULTI_NAMESPACE_MUTATION( + Category.USER_ERROR, + "0214", + "The storage does not support mutations across multiple namespaces. Storage: %s; Mutations: %s", + "", + ""), + OPERATION_CHECK_ERROR_MULTI_STORAGE_MUTATION( + Category.USER_ERROR, + "0215", + "Mutations across multiple storages are not allowed. Mutations: %s", + "", + ""), // // Errors for the concurrency error category @@ -1211,6 +1235,12 @@ public enum CoreError implements ScalarDbError { Category.INTERNAL_ERROR, "0054", "Getting the scanner failed. Details: %s", "", ""), JDBC_CLOSING_SCANNER_FAILED( Category.INTERNAL_ERROR, "0055", "Closing the scanner failed. Details: %s", "", ""), + GETTING_STORAGE_INFO_FAILED( + Category.INTERNAL_ERROR, + "0056", + "Getting the storage information failed. Namespace: %s", + "", + ""), // // Errors for the unknown transaction status error category diff --git a/core/src/main/java/com/scalar/db/service/AdminService.java b/core/src/main/java/com/scalar/db/service/AdminService.java index a40181e67d..18806b1ac5 100644 --- a/core/src/main/java/com/scalar/db/service/AdminService.java +++ b/core/src/main/java/com/scalar/db/service/AdminService.java @@ -2,6 +2,7 @@ import com.google.inject.Inject; import com.scalar.db.api.DistributedStorageAdmin; +import com.scalar.db.api.StorageInfo; import com.scalar.db.api.TableMetadata; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.io.DataType; @@ -133,6 +134,11 @@ public void upgrade(Map options) throws ExecutionException { admin.upgrade(options); } + @Override + public StorageInfo getStorageInfo(String namespace) throws ExecutionException { + return admin.getStorageInfo(namespace); + } + @Override public void close() { admin.close(); diff --git a/core/src/main/java/com/scalar/db/storage/cassandra/Cassandra.java b/core/src/main/java/com/scalar/db/storage/cassandra/Cassandra.java index 4b29a804a6..25b066e99a 100644 --- a/core/src/main/java/com/scalar/db/storage/cassandra/Cassandra.java +++ b/core/src/main/java/com/scalar/db/storage/cassandra/Cassandra.java @@ -15,6 +15,7 @@ import com.scalar.db.api.Scanner; import com.scalar.db.common.AbstractDistributedStorage; import com.scalar.db.common.FilterableScanner; +import com.scalar.db.common.StorageInfoProvider; import com.scalar.db.common.TableMetadataManager; import com.scalar.db.common.checker.OperationChecker; import com.scalar.db.common.error.CoreError; @@ -65,11 +66,11 @@ public Cassandra(DatabaseConfig config) { batch = new BatchHandler(session, handlers); logger.info("Cassandra object is created properly"); + CassandraAdmin cassandraAdmin = new CassandraAdmin(clusterManager, config); metadataManager = - new TableMetadataManager( - new CassandraAdmin(clusterManager, config), - config.getMetadataCacheExpirationTimeSecs()); - operationChecker = new OperationChecker(config, metadataManager); + new TableMetadataManager(cassandraAdmin, config.getMetadataCacheExpirationTimeSecs()); + operationChecker = + new OperationChecker(config, metadataManager, new StorageInfoProvider(cassandraAdmin)); } @VisibleForTesting diff --git a/core/src/main/java/com/scalar/db/storage/cassandra/CassandraAdmin.java b/core/src/main/java/com/scalar/db/storage/cassandra/CassandraAdmin.java index 4dcb46c2e5..22a4e17f7c 100644 --- a/core/src/main/java/com/scalar/db/storage/cassandra/CassandraAdmin.java +++ b/core/src/main/java/com/scalar/db/storage/cassandra/CassandraAdmin.java @@ -21,7 +21,9 @@ import com.scalar.db.api.DistributedStorageAdmin; import com.scalar.db.api.Scan; import com.scalar.db.api.Scan.Ordering.Order; +import com.scalar.db.api.StorageInfo; import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.StorageInfoImpl; import com.scalar.db.common.error.CoreError; import com.scalar.db.config.DatabaseConfig; import com.scalar.db.exception.storage.ExecutionException; @@ -47,6 +49,13 @@ public class CassandraAdmin implements DistributedStorageAdmin { public static final String NAMESPACES_NAME_COL = "name"; private static final String DEFAULT_REPLICATION_FACTOR = "3"; @VisibleForTesting static final String INDEX_NAME_PREFIX = "index"; + private static final StorageInfo STORAGE_INFO = + new StorageInfoImpl( + "cassandra", + StorageInfo.MutationAtomicityUnit.PARTITION, + // No limit on the number of mutations + Integer.MAX_VALUE); + private final ClusterManager clusterManager; private final String metadataKeyspace; @@ -570,6 +579,11 @@ void createSecondaryIndexes( } } + @Override + public StorageInfo getStorageInfo(String namespace) { + return STORAGE_INFO; + } + @Override public void close() { clusterManager.close(); diff --git a/core/src/main/java/com/scalar/db/storage/cosmos/Cosmos.java b/core/src/main/java/com/scalar/db/storage/cosmos/Cosmos.java index 01f65c9aae..6e24c3a925 100644 --- a/core/src/main/java/com/scalar/db/storage/cosmos/Cosmos.java +++ b/core/src/main/java/com/scalar/db/storage/cosmos/Cosmos.java @@ -15,6 +15,7 @@ import com.scalar.db.api.Scanner; import com.scalar.db.common.AbstractDistributedStorage; import com.scalar.db.common.FilterableScanner; +import com.scalar.db.common.StorageInfoProvider; import com.scalar.db.common.TableMetadataManager; import com.scalar.db.common.checker.OperationChecker; import com.scalar.db.common.error.CoreError; @@ -57,10 +58,12 @@ public Cosmos(DatabaseConfig databaseConfig) { client = CosmosUtils.buildCosmosClient(config); + CosmosAdmin cosmosAdmin = new CosmosAdmin(client, config); TableMetadataManager metadataManager = - new TableMetadataManager( - new CosmosAdmin(client, config), databaseConfig.getMetadataCacheExpirationTimeSecs()); - operationChecker = new CosmosOperationChecker(databaseConfig, metadataManager); + new TableMetadataManager(cosmosAdmin, databaseConfig.getMetadataCacheExpirationTimeSecs()); + operationChecker = + new CosmosOperationChecker( + databaseConfig, metadataManager, new StorageInfoProvider(cosmosAdmin)); selectStatementHandler = new SelectStatementHandler(client, metadataManager, databaseConfig.getScanFetchSize()); diff --git a/core/src/main/java/com/scalar/db/storage/cosmos/CosmosAdmin.java b/core/src/main/java/com/scalar/db/storage/cosmos/CosmosAdmin.java index 8de1848bf5..6888eac356 100644 --- a/core/src/main/java/com/scalar/db/storage/cosmos/CosmosAdmin.java +++ b/core/src/main/java/com/scalar/db/storage/cosmos/CosmosAdmin.java @@ -24,7 +24,9 @@ import com.google.inject.Inject; import com.scalar.db.api.DistributedStorageAdmin; import com.scalar.db.api.Scan.Ordering.Order; +import com.scalar.db.api.StorageInfo; import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.StorageInfoImpl; import com.scalar.db.common.error.CoreError; import com.scalar.db.config.DatabaseConfig; import com.scalar.db.exception.storage.ExecutionException; @@ -64,6 +66,12 @@ public class CosmosAdmin implements DistributedStorageAdmin { @VisibleForTesting public static final String STORED_PROCEDURE_FILE_NAME = "mutate.js"; private static final String STORED_PROCEDURE_PATH = "cosmosdb_stored_procedure/" + STORED_PROCEDURE_FILE_NAME; + private static final StorageInfo STORAGE_INFO = + new StorageInfoImpl( + "cosmos", + StorageInfo.MutationAtomicityUnit.PARTITION, + // No limit on the number of mutations + Integer.MAX_VALUE); private final CosmosClient client; private final String metadataDatabase; @@ -741,4 +749,9 @@ private boolean containerExists(String databaseId, String containerId) throws Co } return true; } + + @Override + public StorageInfo getStorageInfo(String namespace) { + return STORAGE_INFO; + } } diff --git a/core/src/main/java/com/scalar/db/storage/cosmos/CosmosOperationChecker.java b/core/src/main/java/com/scalar/db/storage/cosmos/CosmosOperationChecker.java index 77cbe7e364..2c23898506 100644 --- a/core/src/main/java/com/scalar/db/storage/cosmos/CosmosOperationChecker.java +++ b/core/src/main/java/com/scalar/db/storage/cosmos/CosmosOperationChecker.java @@ -8,6 +8,7 @@ import com.scalar.db.api.Put; import com.scalar.db.api.Scan; import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.StorageInfoProvider; import com.scalar.db.common.TableMetadataManager; import com.scalar.db.common.checker.OperationChecker; import com.scalar.db.common.error.CoreError; @@ -90,8 +91,10 @@ public void visit(TimestampTZColumn column) {} }; public CosmosOperationChecker( - DatabaseConfig databaseConfig, TableMetadataManager metadataManager) { - super(databaseConfig, metadataManager); + DatabaseConfig databaseConfig, + TableMetadataManager metadataManager, + StorageInfoProvider storageInfoProvider) { + super(databaseConfig, metadataManager, storageInfoProvider); } @Override diff --git a/core/src/main/java/com/scalar/db/storage/dynamo/Dynamo.java b/core/src/main/java/com/scalar/db/storage/dynamo/Dynamo.java index 511bd1eaff..99ecae7c30 100644 --- a/core/src/main/java/com/scalar/db/storage/dynamo/Dynamo.java +++ b/core/src/main/java/com/scalar/db/storage/dynamo/Dynamo.java @@ -14,6 +14,7 @@ import com.scalar.db.api.Scanner; import com.scalar.db.common.AbstractDistributedStorage; import com.scalar.db.common.FilterableScanner; +import com.scalar.db.common.StorageInfoProvider; import com.scalar.db.common.TableMetadataManager; import com.scalar.db.common.checker.OperationChecker; import com.scalar.db.common.error.CoreError; @@ -71,10 +72,12 @@ public Dynamo(DatabaseConfig databaseConfig) { .region(Region.of(config.getRegion())) .build(); + DynamoAdmin dynamoAdmin = new DynamoAdmin(client, config); TableMetadataManager metadataManager = - new TableMetadataManager( - new DynamoAdmin(client, config), databaseConfig.getMetadataCacheExpirationTimeSecs()); - operationChecker = new DynamoOperationChecker(databaseConfig, metadataManager); + new TableMetadataManager(dynamoAdmin, databaseConfig.getMetadataCacheExpirationTimeSecs()); + operationChecker = + new DynamoOperationChecker( + databaseConfig, metadataManager, new StorageInfoProvider(dynamoAdmin)); selectStatementHandler = new SelectStatementHandler( diff --git a/core/src/main/java/com/scalar/db/storage/dynamo/DynamoAdmin.java b/core/src/main/java/com/scalar/db/storage/dynamo/DynamoAdmin.java index 9c9d8f39c5..99fd6632bd 100644 --- a/core/src/main/java/com/scalar/db/storage/dynamo/DynamoAdmin.java +++ b/core/src/main/java/com/scalar/db/storage/dynamo/DynamoAdmin.java @@ -7,7 +7,9 @@ import com.google.inject.Inject; import com.scalar.db.api.DistributedStorageAdmin; import com.scalar.db.api.Scan.Ordering.Order; +import com.scalar.db.api.StorageInfo; import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.StorageInfoImpl; import com.scalar.db.config.DatabaseConfig; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.io.DataType; @@ -153,6 +155,12 @@ public class DynamoAdmin implements DistributedStorageAdmin { .put(SCALING_TYPE_INDEX_READ, MetricType.DYNAMO_DB_READ_CAPACITY_UTILIZATION) .put(SCALING_TYPE_INDEX_WRITE, MetricType.DYNAMO_DB_WRITE_CAPACITY_UTILIZATION) .build(); + private static final StorageInfo STORAGE_INFO = + new StorageInfoImpl( + "dynamo", + StorageInfo.MutationAtomicityUnit.STORAGE, + // DynamoDB has a limit of 100 items per transactional batch write operation + 100); private final DynamoDbClient client; private final ApplicationAutoScalingClient applicationAutoScalingClient; @@ -1503,6 +1511,11 @@ private String getFullTableName(Namespace namespace, String table) { return ScalarDbUtils.getFullTableName(namespace.prefixed(), table); } + @Override + public StorageInfo getStorageInfo(String namespace) { + return STORAGE_INFO; + } + @Override public void close() { client.close(); diff --git a/core/src/main/java/com/scalar/db/storage/dynamo/DynamoOperationChecker.java b/core/src/main/java/com/scalar/db/storage/dynamo/DynamoOperationChecker.java index c0ee1859a4..f8b77b8971 100644 --- a/core/src/main/java/com/scalar/db/storage/dynamo/DynamoOperationChecker.java +++ b/core/src/main/java/com/scalar/db/storage/dynamo/DynamoOperationChecker.java @@ -5,6 +5,7 @@ import com.scalar.db.api.Mutation; import com.scalar.db.api.Put; import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.StorageInfoProvider; import com.scalar.db.common.TableMetadataManager; import com.scalar.db.common.checker.ColumnChecker; import com.scalar.db.common.checker.OperationChecker; @@ -17,8 +18,10 @@ @ThreadSafe public class DynamoOperationChecker extends OperationChecker { public DynamoOperationChecker( - DatabaseConfig databaseConfig, TableMetadataManager metadataManager) { - super(databaseConfig, metadataManager); + DatabaseConfig databaseConfig, + TableMetadataManager metadataManager, + StorageInfoProvider storageInfoProvider) { + super(databaseConfig, metadataManager, storageInfoProvider); } @Override diff --git a/core/src/main/java/com/scalar/db/storage/jdbc/JdbcAdmin.java b/core/src/main/java/com/scalar/db/storage/jdbc/JdbcAdmin.java index a9e34787b7..6400f3f98c 100644 --- a/core/src/main/java/com/scalar/db/storage/jdbc/JdbcAdmin.java +++ b/core/src/main/java/com/scalar/db/storage/jdbc/JdbcAdmin.java @@ -12,7 +12,9 @@ import com.scalar.db.api.Scan; import com.scalar.db.api.Scan.Ordering; import com.scalar.db.api.Scan.Ordering.Order; +import com.scalar.db.api.StorageInfo; import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.StorageInfoImpl; import com.scalar.db.common.error.CoreError; import com.scalar.db.config.DatabaseConfig; import com.scalar.db.exception.storage.ExecutionException; @@ -59,6 +61,12 @@ public class JdbcAdmin implements DistributedStorageAdmin { @VisibleForTesting static final String NAMESPACE_COL_NAMESPACE_NAME = "namespace_name"; private static final Logger logger = LoggerFactory.getLogger(JdbcAdmin.class); private static final String INDEX_NAME_PREFIX = "index"; + private static final StorageInfo STORAGE_INFO = + new StorageInfoImpl( + "jdbc", + StorageInfo.MutationAtomicityUnit.STORAGE, + // No limit on the number of mutations + Integer.MAX_VALUE); private final RdbEngineStrategy rdbEngine; private final BasicDataSource dataSource; @@ -1162,6 +1170,11 @@ private void importNamespaceNamesOfExistingTables(Connection connection) } } + @Override + public StorageInfo getStorageInfo(String namespace) { + return STORAGE_INFO; + } + @FunctionalInterface interface SqlWarningHandler { void throwSqlWarningIfNeeded(SQLWarning warning) throws SQLException; diff --git a/core/src/main/java/com/scalar/db/storage/jdbc/JdbcDatabase.java b/core/src/main/java/com/scalar/db/storage/jdbc/JdbcDatabase.java index 0850c1cac5..64a20cc725 100644 --- a/core/src/main/java/com/scalar/db/storage/jdbc/JdbcDatabase.java +++ b/core/src/main/java/com/scalar/db/storage/jdbc/JdbcDatabase.java @@ -11,6 +11,7 @@ import com.scalar.db.api.Scan; import com.scalar.db.api.Scanner; import com.scalar.db.common.AbstractDistributedStorage; +import com.scalar.db.common.StorageInfoProvider; import com.scalar.db.common.TableMetadataManager; import com.scalar.db.common.checker.OperationChecker; import com.scalar.db.common.error.CoreError; @@ -53,12 +54,13 @@ public JdbcDatabase(DatabaseConfig databaseConfig) { dataSource = JdbcUtils.initDataSource(config, rdbEngine); tableMetadataDataSource = JdbcUtils.initDataSourceForTableMetadata(config, rdbEngine); + JdbcAdmin jdbcAdmin = new JdbcAdmin(tableMetadataDataSource, config); TableMetadataManager tableMetadataManager = - new TableMetadataManager( - new JdbcAdmin(tableMetadataDataSource, config), - databaseConfig.getMetadataCacheExpirationTimeSecs()); + new TableMetadataManager(jdbcAdmin, databaseConfig.getMetadataCacheExpirationTimeSecs()); + OperationChecker operationChecker = + new OperationChecker( + databaseConfig, tableMetadataManager, new StorageInfoProvider(jdbcAdmin)); - OperationChecker operationChecker = new OperationChecker(databaseConfig, tableMetadataManager); jdbcService = new JdbcService( tableMetadataManager, operationChecker, rdbEngine, databaseConfig.getScanFetchSize()); diff --git a/core/src/main/java/com/scalar/db/storage/multistorage/MultiStorage.java b/core/src/main/java/com/scalar/db/storage/multistorage/MultiStorage.java index 2c32dace2b..015225b131 100644 --- a/core/src/main/java/com/scalar/db/storage/multistorage/MultiStorage.java +++ b/core/src/main/java/com/scalar/db/storage/multistorage/MultiStorage.java @@ -36,7 +36,9 @@ @ThreadSafe public class MultiStorage extends AbstractDistributedStorage { - private final Map tableStorageMap; + /** @deprecated Will be removed in 5.0.0. */ + @Deprecated private final Map tableStorageMap; + private final Map namespaceStorageMap; private final DistributedStorage defaultStorage; private final List storages; @@ -140,6 +142,8 @@ public void mutate(List mutations) throws ExecutionException } private DistributedStorage getStorage(Operation operation) { + assert operation.forFullTableName().isPresent() && operation.forNamespace().isPresent(); + String fullTaleName = operation.forFullTableName().get(); DistributedStorage storage = tableStorageMap.get(fullTaleName); if (storage != null) { diff --git a/core/src/main/java/com/scalar/db/storage/multistorage/MultiStorageAdmin.java b/core/src/main/java/com/scalar/db/storage/multistorage/MultiStorageAdmin.java index a1443e1999..9697877053 100644 --- a/core/src/main/java/com/scalar/db/storage/multistorage/MultiStorageAdmin.java +++ b/core/src/main/java/com/scalar/db/storage/multistorage/MultiStorageAdmin.java @@ -4,7 +4,9 @@ import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; import com.scalar.db.api.DistributedStorageAdmin; +import com.scalar.db.api.StorageInfo; import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.StorageInfoImpl; import com.scalar.db.config.DatabaseConfig; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.io.DataType; @@ -15,6 +17,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import javax.annotation.concurrent.ThreadSafe; /** @@ -28,9 +31,11 @@ @ThreadSafe public class MultiStorageAdmin implements DistributedStorageAdmin { - private final Map tableAdminMap; - private final Map namespaceAdminMap; - private final DistributedStorageAdmin defaultAdmin; + /** @deprecated Will be removed in 5.0.0. */ + @Deprecated private final Map tableAdminMap; + + private final Map namespaceAdminMap; + private final AdminHolder defaultAdmin; private final List admins; @Inject @@ -44,7 +49,7 @@ public MultiStorageAdmin(DatabaseConfig databaseConfig) { .forEach( (storageName, properties) -> { StorageFactory factory = StorageFactory.create(properties); - DistributedStorageAdmin admin = factory.getAdmin(); + DistributedStorageAdmin admin = factory.getStorageAdmin(); nameAdminMap.put(storageName, admin); admins.add(admin); }); @@ -59,23 +64,28 @@ public MultiStorageAdmin(DatabaseConfig databaseConfig) { .getNamespaceStorageMap() .forEach( (namespace, storageName) -> - namespaceAdminMap.put(namespace, nameAdminMap.get(storageName))); + namespaceAdminMap.put( + namespace, new AdminHolder(storageName, nameAdminMap.get(storageName)))); - defaultAdmin = nameAdminMap.get(config.getDefaultStorage()); + defaultAdmin = + new AdminHolder(config.getDefaultStorage(), nameAdminMap.get(config.getDefaultStorage())); } @VisibleForTesting MultiStorageAdmin( Map tableAdminMap, - Map namespaceAdminMap, - DistributedStorageAdmin defaultAdmin) { + Map namespaceAdminMap, + AdminHolder defaultAdmin) { this.tableAdminMap = tableAdminMap; this.namespaceAdminMap = namespaceAdminMap; this.defaultAdmin = defaultAdmin; ImmutableSet.Builder adminsSet = ImmutableSet.builder(); adminsSet.addAll(tableAdminMap.values()); - adminsSet.addAll(namespaceAdminMap.values()); - adminsSet.add(defaultAdmin); + adminsSet.addAll( + namespaceAdminMap.values().stream() + .map(holder -> holder.admin) + .collect(Collectors.toSet())); + adminsSet.add(defaultAdmin.admin); this.admins = adminsSet.build().asList(); } @@ -243,16 +253,18 @@ public Set getNamespaceNames() throws ExecutionException { // => returned // - ns4 and ns5 are in the default storage (cosmos) // => returned - Set namespaceNames = new HashSet<>(defaultAdmin.getNamespaceNames()); + Set namespaceNames = new HashSet<>(defaultAdmin.admin.getNamespaceNames()); Set adminsWithoutDefaultAdmin = - new HashSet<>(namespaceAdminMap.values()); - adminsWithoutDefaultAdmin.remove(defaultAdmin); + namespaceAdminMap.values().stream().map(holder -> holder.admin).collect(Collectors.toSet()); + adminsWithoutDefaultAdmin.remove(defaultAdmin.admin); + for (DistributedStorageAdmin admin : adminsWithoutDefaultAdmin) { Set existingNamespaces = admin.getNamespaceNames(); // Filter out namespace not in the mapping for (String existingNamespace : existingNamespaces) { - if (admin.equals(namespaceAdminMap.get(existingNamespace))) { + AdminHolder holder = namespaceAdminMap.get(existingNamespace); + if (holder != null && admin.equals(holder.admin)) { namespaceNames.add(existingNamespace); } } @@ -263,14 +275,38 @@ public Set getNamespaceNames() throws ExecutionException { @Override public void upgrade(Map options) throws ExecutionException { - for (DistributedStorageAdmin admin : namespaceAdminMap.values()) { - admin.upgrade(options); + for (AdminHolder admin : namespaceAdminMap.values()) { + admin.admin.upgrade(options); + } + } + + @Override + public StorageInfo getStorageInfo(String namespace) throws ExecutionException { + try { + AdminHolder holder = getAdminHolder(namespace); + StorageInfo storageInfo = holder.admin.getStorageInfo(namespace); + return new StorageInfoImpl( + holder.storageName, + storageInfo.getMutationAtomicityUnit(), + storageInfo.getMaxAtomicMutationCount()); + } catch (RuntimeException e) { + if (e.getCause() instanceof ExecutionException) { + throw (ExecutionException) e.getCause(); + } + throw e; } } + private AdminHolder getAdminHolder(String namespace) { + AdminHolder adminHolder = namespaceAdminMap.get(namespace); + if (adminHolder != null) { + return adminHolder; + } + return defaultAdmin; + } + private DistributedStorageAdmin getAdmin(String namespace) { - DistributedStorageAdmin admin = namespaceAdminMap.get(namespace); - return admin != null ? admin : defaultAdmin; + return getAdminHolder(namespace).admin; } private DistributedStorageAdmin getAdmin(String namespace, String table) { @@ -288,4 +324,15 @@ public void close() { admin.close(); } } + + @VisibleForTesting + static class AdminHolder { + public final String storageName; + public final DistributedStorageAdmin admin; + + public AdminHolder(String storageName, DistributedStorageAdmin admin) { + this.storageName = storageName; + this.admin = admin; + } + } } diff --git a/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransactionManager.java b/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransactionManager.java index e774f7e006..4d0c4b3b71 100644 --- a/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransactionManager.java +++ b/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransactionManager.java @@ -19,6 +19,7 @@ import com.scalar.db.common.AbstractDistributedTransactionManager; import com.scalar.db.common.AbstractTransactionManagerCrudOperableScanner; import com.scalar.db.common.ReadOnlyDistributedTransaction; +import com.scalar.db.common.StorageInfoProvider; import com.scalar.db.common.TableMetadataManager; import com.scalar.db.common.checker.OperationChecker; import com.scalar.db.common.error.CoreError; @@ -66,12 +67,13 @@ public JdbcTransactionManager(DatabaseConfig databaseConfig) { dataSource = JdbcUtils.initDataSource(config, rdbEngine, true); tableMetadataDataSource = JdbcUtils.initDataSourceForTableMetadata(config, rdbEngine); + JdbcAdmin jdbcAdmin = new JdbcAdmin(tableMetadataDataSource, config); TableMetadataManager tableMetadataManager = - new TableMetadataManager( - new JdbcAdmin(tableMetadataDataSource, config), - databaseConfig.getMetadataCacheExpirationTimeSecs()); + new TableMetadataManager(jdbcAdmin, databaseConfig.getMetadataCacheExpirationTimeSecs()); + OperationChecker operationChecker = + new OperationChecker( + databaseConfig, tableMetadataManager, new StorageInfoProvider(jdbcAdmin)); - OperationChecker operationChecker = new OperationChecker(databaseConfig, tableMetadataManager); jdbcService = new JdbcService( tableMetadataManager, operationChecker, rdbEngine, databaseConfig.getScanFetchSize()); diff --git a/core/src/test/java/com/scalar/db/common/checker/OperationCheckerTest.java b/core/src/test/java/com/scalar/db/common/checker/OperationCheckerTest.java index 395f6cdcef..6b8d582627 100644 --- a/core/src/test/java/com/scalar/db/common/checker/OperationCheckerTest.java +++ b/core/src/test/java/com/scalar/db/common/checker/OperationCheckerTest.java @@ -1,7 +1,9 @@ package com.scalar.db.common.checker; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.catchException; import static org.mockito.Mockito.any; import static org.mockito.Mockito.when; @@ -13,6 +15,7 @@ import com.scalar.db.api.DeleteIfExists; import com.scalar.db.api.Get; import com.scalar.db.api.Insert; +import com.scalar.db.api.Mutation; import com.scalar.db.api.MutationCondition; import com.scalar.db.api.Put; import com.scalar.db.api.PutIf; @@ -21,9 +24,12 @@ import com.scalar.db.api.Scan; import com.scalar.db.api.Scan.Ordering; import com.scalar.db.api.ScanAll; +import com.scalar.db.api.StorageInfo; import com.scalar.db.api.TableMetadata; import com.scalar.db.api.Update; import com.scalar.db.api.Upsert; +import com.scalar.db.common.StorageInfoImpl; +import com.scalar.db.common.StorageInfoProvider; import com.scalar.db.common.TableMetadataManager; import com.scalar.db.config.DatabaseConfig; import com.scalar.db.exception.storage.ExecutionException; @@ -40,6 +46,8 @@ import java.util.List; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -54,9 +62,13 @@ public class OperationCheckerTest { private static final String COL1 = "v1"; private static final String COL2 = "v2"; private static final String COL3 = "v3"; + private static final StorageInfo STORAGE_INFO = + new StorageInfoImpl( + "cassandra", StorageInfo.MutationAtomicityUnit.PARTITION, Integer.MAX_VALUE); @Mock private DatabaseConfig databaseConfig; @Mock private TableMetadataManager metadataManager; + @Mock private StorageInfoProvider storageInfoProvider; private OperationChecker operationChecker; @BeforeEach @@ -81,7 +93,7 @@ public void setUp() throws Exception { .addSecondaryIndex(COL1) .build()); - operationChecker = new OperationChecker(databaseConfig, metadataManager); + operationChecker = new OperationChecker(databaseConfig, metadataManager, storageInfoProvider); } @Test @@ -965,7 +977,7 @@ public void whenCheckingPutOperationWithDeleteIfCondition_shouldThrowIllegalArgu .addClusteringKey(CKEY1) .build()); - operationChecker = new OperationChecker(databaseConfig, metadataManager); + operationChecker = new OperationChecker(databaseConfig, metadataManager, storageInfoProvider); Key partitionKey = new Key(PKEY1, (byte[]) null); Key clusteringKey = new Key(CKEY1, new byte[] {1, 1, 1}); @@ -996,7 +1008,7 @@ public void whenCheckingPutOperationWithDeleteIfCondition_shouldThrowIllegalArgu .addClusteringKey(CKEY1) .build()); - operationChecker = new OperationChecker(databaseConfig, metadataManager); + operationChecker = new OperationChecker(databaseConfig, metadataManager, storageInfoProvider); Key partitionKey = new Key(PKEY1, new byte[0]); Key clusteringKey = new Key(CKEY1, new byte[] {1, 1, 1}); @@ -1027,7 +1039,7 @@ public void whenCheckingPutOperationWithDeleteIfCondition_shouldThrowIllegalArgu .addClusteringKey(CKEY1) .build()); - operationChecker = new OperationChecker(databaseConfig, metadataManager); + operationChecker = new OperationChecker(databaseConfig, metadataManager, storageInfoProvider); Key partitionKey = new Key(PKEY1, new byte[] {1, 1, 1}); Key clusteringKey = new Key(CKEY1, (byte[]) null); @@ -1058,7 +1070,7 @@ public void whenCheckingPutOperationWithDeleteIfCondition_shouldThrowIllegalArgu .addClusteringKey(CKEY1) .build()); - operationChecker = new OperationChecker(databaseConfig, metadataManager); + operationChecker = new OperationChecker(databaseConfig, metadataManager, storageInfoProvider); Key partitionKey = new Key(PKEY1, new byte[] {1, 1, 1}); Key clusteringKey = new Key(CKEY1, new byte[0]); @@ -1280,8 +1292,11 @@ public void whenCheckingDeleteOperationWithPutIfCondition_shouldThrowIllegalArgu } @Test - public void whenCheckingMutateOperationWithAllValidArguments_shouldNotThrowAnyException() { + public void whenCheckingMutateOperationWithAllValidArguments_shouldNotThrowAnyException() + throws ExecutionException { // Arrange + when(storageInfoProvider.getStorageInfo(any())).thenReturn(STORAGE_INFO); + Key partitionKey = new Key(PKEY1, 1, PKEY2, "val1"); Key clusteringKey = new Key(CKEY1, 2, CKEY2, "val1"); Put put = @@ -1308,8 +1323,11 @@ public void whenCheckingMutateOperationWithEmptyMutations_shouldThrowIllegalArgu @Test public void - whenCheckingMutateOperationWithMutationsWithDifferentPartitionKeys_shouldThrowIllegalArgumentException() { + whenCheckingMutateOperationWithMutationsWithDifferentPartitionKeys_shouldThrowIllegalArgumentException() + throws ExecutionException { // Arrange + when(storageInfoProvider.getStorageInfo(any())).thenReturn(STORAGE_INFO); + Key partitionKey1 = new Key(PKEY1, 1, PKEY2, "val1"); Key partitionKey2 = new Key(PKEY1, 2, PKEY2, "val2"); Key clusteringKey = new Key(CKEY1, 2, CKEY2, "val3"); @@ -1328,8 +1346,11 @@ public void whenCheckingMutateOperationWithEmptyMutations_shouldThrowIllegalArgu @Test public void - whenCheckingMutateOperationWithMutationsWithSameTableAndPartitionKeyButDifferentNamespace_shouldThrowIllegalArgumentException() { + whenCheckingMutateOperationWithMutationsWithSameTableAndPartitionKeyButDifferentNamespace_shouldThrowIllegalArgumentException() + throws ExecutionException { // Arrange + when(storageInfoProvider.getStorageInfo(any())).thenReturn(STORAGE_INFO); + Key partitionKey = new Key(PKEY1, 1, PKEY2, "val1"); Key clusteringKey = new Key(CKEY1, 2, CKEY2, "val3"); Put put = @@ -1346,8 +1367,11 @@ public void whenCheckingMutateOperationWithEmptyMutations_shouldThrowIllegalArgu @Test public void - whenCheckingMutateOperationWithMutationsWithPutWithInvalidClusteringKey_shouldThrowIllegalArgumentException() { + whenCheckingMutateOperationWithMutationsWithPutWithInvalidClusteringKey_shouldThrowIllegalArgumentException() + throws ExecutionException { // Arrange + when(storageInfoProvider.getStorageInfo(any())).thenReturn(STORAGE_INFO); + Key partitionKey = Key.of(PKEY1, 1, PKEY2, "val1"); Key invalidClusteringKey = Key.of(CKEY1, 2, "c3", "val3"); Key clusteringKey = Key.of(CKEY1, 2, CKEY2, "val3"); @@ -1745,7 +1769,7 @@ public void whenCheckingScanAllOperationWithAllValidArguments_shouldNotThrowAnyE .forNamespace(NAMESPACE) .forTable(TABLE_NAME); when(databaseConfig.isCrossPartitionScanEnabled()).thenReturn(true); - operationChecker = new OperationChecker(databaseConfig, metadataManager); + operationChecker = new OperationChecker(databaseConfig, metadataManager, storageInfoProvider); // Act Assert assertThatCode(() -> operationChecker.check(scanAll)).doesNotThrowAnyException(); @@ -1764,7 +1788,7 @@ public void whenCheckingScanAllOperationWithAllValidArguments_shouldNotThrowAnyE .forNamespace(NAMESPACE) .forTable(TABLE_NAME); when(databaseConfig.isCrossPartitionScanEnabled()).thenReturn(true); - operationChecker = new OperationChecker(databaseConfig, metadataManager); + operationChecker = new OperationChecker(databaseConfig, metadataManager, storageInfoProvider); // Act Assert assertThatThrownBy(() -> operationChecker.check(scanAll)) @@ -1784,7 +1808,7 @@ public void whenCheckingScanAllOperationWithAllValidArguments_shouldNotThrowAnyE .forNamespace(NAMESPACE) .forTable(TABLE_NAME); when(databaseConfig.isCrossPartitionScanEnabled()).thenReturn(true); - operationChecker = new OperationChecker(databaseConfig, metadataManager); + operationChecker = new OperationChecker(databaseConfig, metadataManager, storageInfoProvider); // Act Assert assertThatThrownBy(() -> operationChecker.check(scanAll)) @@ -1804,7 +1828,7 @@ public void whenCheckingScanAllOperationWithAllValidArguments_shouldNotThrowAnyE .limit(10) .build(); when(databaseConfig.isCrossPartitionScanEnabled()).thenReturn(false); - operationChecker = new OperationChecker(databaseConfig, metadataManager); + operationChecker = new OperationChecker(databaseConfig, metadataManager, storageInfoProvider); // Act Assert assertThatThrownBy(() -> operationChecker.check(scanAll)) @@ -1826,7 +1850,7 @@ public void whenCheckingScanAllOperationWithAllValidArguments_shouldNotThrowAnyE .build(); when(databaseConfig.isCrossPartitionScanEnabled()).thenReturn(true); when(databaseConfig.isCrossPartitionScanOrderingEnabled()).thenReturn(false); - operationChecker = new OperationChecker(databaseConfig, metadataManager); + operationChecker = new OperationChecker(databaseConfig, metadataManager, storageInfoProvider); // Act Assert assertThatThrownBy(() -> operationChecker.check(scanAll)) @@ -1848,10 +1872,193 @@ public void whenCheckingScanAllOperationWithAllValidArguments_shouldNotThrowAnyE .build(); when(databaseConfig.isCrossPartitionScanEnabled()).thenReturn(true); when(databaseConfig.isCrossPartitionScanFilteringEnabled()).thenReturn(false); - operationChecker = new OperationChecker(databaseConfig, metadataManager); + operationChecker = new OperationChecker(databaseConfig, metadataManager, storageInfoProvider); // Act Assert assertThatThrownBy(() -> operationChecker.check(scanAll)) .isInstanceOf(IllegalArgumentException.class); } + + @ParameterizedTest + @EnumSource(StorageInfo.MutationAtomicityUnit.class) + public void check_MutationsGiven_ForAtomicityUnit_ShouldBehaveCorrectly( + StorageInfo.MutationAtomicityUnit mutationAtomicityUnit) throws ExecutionException { + // Arrange + when(metadataManager.getTableMetadata(any())) + .thenReturn( + TableMetadata.newBuilder() + .addColumn(PKEY1, DataType.INT) + .addColumn(CKEY1, DataType.INT) + .addColumn(COL1, DataType.INT) + .addPartitionKey(PKEY1) + .addClusteringKey(CKEY1) + .build()); + + StorageInfo storageInfo1 = new StorageInfoImpl("s1", mutationAtomicityUnit, Integer.MAX_VALUE); + StorageInfo storageInfo2 = + new StorageInfoImpl("s2", StorageInfo.MutationAtomicityUnit.STORAGE, Integer.MAX_VALUE); + when(storageInfoProvider.getStorageInfo("ns")).thenReturn(storageInfo1); + when(storageInfoProvider.getStorageInfo("ns2")).thenReturn(storageInfo1); + when(storageInfoProvider.getStorageInfo("other_ns")).thenReturn(storageInfo2); + + List mutationsWithinRecord = + Arrays.asList( + Put.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt(PKEY1, 0)) + .clusteringKey(Key.ofInt(CKEY1, 1)) + .intValue(COL1, 0) + .build(), + Delete.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt(PKEY1, 0)) + .clusteringKey(Key.ofInt(CKEY1, 1)) + .build()); + List mutationsWithinPartition = + Arrays.asList( + Put.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt(PKEY1, 0)) + .clusteringKey(Key.ofInt(CKEY1, 1)) + .intValue(COL1, 0) + .build(), + Delete.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt(PKEY1, 0)) + .clusteringKey(Key.ofInt(CKEY1, 2)) + .build()); + List mutationsWithinTable = + Arrays.asList( + Put.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt(PKEY1, 0)) + .clusteringKey(Key.ofInt(CKEY1, 1)) + .intValue(COL1, 0) + .build(), + Delete.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt(PKEY1, 1)) + .clusteringKey(Key.ofInt(CKEY1, 2)) + .build()); + List mutationsWithinNamespace = + Arrays.asList( + Put.newBuilder() + .namespace("ns") + .table("tbl1") + .partitionKey(Key.ofInt(PKEY1, 0)) + .clusteringKey(Key.ofInt(CKEY1, 1)) + .intValue(COL1, 0) + .build(), + Delete.newBuilder() + .namespace("ns") + .table("tbl2") + .partitionKey(Key.ofInt(PKEY1, 1)) + .clusteringKey(Key.ofInt(CKEY1, 2)) + .build()); + List mutationsWithinStorage = + Arrays.asList( + Put.newBuilder() + .namespace("ns") + .table("tbl1") + .partitionKey(Key.ofInt(PKEY1, 0)) + .clusteringKey(Key.ofInt(CKEY1, 1)) + .intValue(COL1, 0) + .build(), + Delete.newBuilder() + .namespace("ns2") + .table("tbl2") + .partitionKey(Key.ofInt(PKEY1, 1)) + .clusteringKey(Key.ofInt(CKEY1, 2)) + .build()); + List mutationsAcrossStorages = + Arrays.asList( + Put.newBuilder() + .namespace("ns") + .table("tbl1") + .partitionKey(Key.ofInt(PKEY1, 0)) + .clusteringKey(Key.ofInt(CKEY1, 1)) + .intValue(COL1, 0) + .build(), + Delete.newBuilder() + .namespace("other_ns") + .table("tbl2") + .partitionKey(Key.ofInt(PKEY1, 1)) + .clusteringKey(Key.ofInt(CKEY1, 2)) + .build()); + + // Act + Exception exceptionForMutationsWithinRecord = + catchException(() -> operationChecker.check(mutationsWithinRecord)); + Exception exceptionForMutationsWithinPartition = + catchException(() -> operationChecker.check(mutationsWithinPartition)); + Exception exceptionForMutationsWithinTable = + catchException(() -> operationChecker.check(mutationsWithinTable)); + Exception exceptionForMutationsWithinNamespace = + catchException(() -> operationChecker.check(mutationsWithinNamespace)); + Exception exceptionForMutationsWithinStorage = + catchException(() -> operationChecker.check(mutationsWithinStorage)); + Exception exceptionForMutationsAcrossStorages = + catchException(() -> operationChecker.check(mutationsAcrossStorages)); + + // Assert + switch (mutationAtomicityUnit) { + case RECORD: + assertThat(exceptionForMutationsWithinRecord).doesNotThrowAnyException(); + assertThat(exceptionForMutationsWithinPartition) + .isInstanceOf(IllegalArgumentException.class); + assertThat(exceptionForMutationsWithinTable).isInstanceOf(IllegalArgumentException.class); + assertThat(exceptionForMutationsWithinNamespace) + .isInstanceOf(IllegalArgumentException.class); + assertThat(exceptionForMutationsWithinStorage).isInstanceOf(IllegalArgumentException.class); + assertThat(exceptionForMutationsAcrossStorages) + .isInstanceOf(IllegalArgumentException.class); + break; + case PARTITION: + assertThat(exceptionForMutationsWithinRecord).doesNotThrowAnyException(); + assertThat(exceptionForMutationsWithinPartition).doesNotThrowAnyException(); + assertThat(exceptionForMutationsWithinTable).isInstanceOf(IllegalArgumentException.class); + assertThat(exceptionForMutationsWithinNamespace) + .isInstanceOf(IllegalArgumentException.class); + assertThat(exceptionForMutationsWithinStorage).isInstanceOf(IllegalArgumentException.class); + assertThat(exceptionForMutationsAcrossStorages) + .isInstanceOf(IllegalArgumentException.class); + break; + case TABLE: + assertThat(exceptionForMutationsWithinRecord).doesNotThrowAnyException(); + assertThat(exceptionForMutationsWithinPartition).doesNotThrowAnyException(); + assertThat(exceptionForMutationsWithinTable).doesNotThrowAnyException(); + assertThat(exceptionForMutationsWithinNamespace) + .isInstanceOf(IllegalArgumentException.class); + assertThat(exceptionForMutationsWithinStorage).isInstanceOf(IllegalArgumentException.class); + assertThat(exceptionForMutationsAcrossStorages) + .isInstanceOf(IllegalArgumentException.class); + break; + case NAMESPACE: + assertThat(exceptionForMutationsWithinRecord).doesNotThrowAnyException(); + assertThat(exceptionForMutationsWithinPartition).doesNotThrowAnyException(); + assertThat(exceptionForMutationsWithinTable).doesNotThrowAnyException(); + assertThat(exceptionForMutationsWithinNamespace).doesNotThrowAnyException(); + assertThat(exceptionForMutationsWithinStorage).isInstanceOf(IllegalArgumentException.class); + assertThat(exceptionForMutationsAcrossStorages) + .isInstanceOf(IllegalArgumentException.class); + break; + case STORAGE: + assertThat(exceptionForMutationsWithinRecord).doesNotThrowAnyException(); + assertThat(exceptionForMutationsWithinPartition).doesNotThrowAnyException(); + assertThat(exceptionForMutationsWithinTable).doesNotThrowAnyException(); + assertThat(exceptionForMutationsWithinNamespace).doesNotThrowAnyException(); + assertThat(exceptionForMutationsWithinStorage).doesNotThrowAnyException(); + assertThat(exceptionForMutationsAcrossStorages) + .isInstanceOf(IllegalArgumentException.class); + break; + default: + throw new AssertionError(); + } + } } diff --git a/core/src/test/java/com/scalar/db/storage/cosmos/CosmosOperationCheckerTest.java b/core/src/test/java/com/scalar/db/storage/cosmos/CosmosOperationCheckerTest.java index d6f111d4d6..b11dcfb782 100644 --- a/core/src/test/java/com/scalar/db/storage/cosmos/CosmosOperationCheckerTest.java +++ b/core/src/test/java/com/scalar/db/storage/cosmos/CosmosOperationCheckerTest.java @@ -17,7 +17,10 @@ import com.scalar.db.api.MutationCondition; import com.scalar.db.api.Put; import com.scalar.db.api.Scan; +import com.scalar.db.api.StorageInfo; import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.StorageInfoImpl; +import com.scalar.db.common.StorageInfoProvider; import com.scalar.db.common.TableMetadataManager; import com.scalar.db.config.DatabaseConfig; import com.scalar.db.exception.storage.ExecutionException; @@ -37,6 +40,8 @@ public class CosmosOperationCheckerTest { private static final String CKEY1 = "c1"; private static final String COL1 = "v1"; private static final String COL2 = "v2"; + private static final StorageInfo STORAGE_INFO = + new StorageInfoImpl("cosmos", StorageInfo.MutationAtomicityUnit.PARTITION, Integer.MAX_VALUE); private static final TableMetadata TABLE_METADATA1 = TableMetadata.newBuilder() @@ -59,13 +64,15 @@ public class CosmosOperationCheckerTest { @Mock private DatabaseConfig databaseConfig; @Mock private TableMetadataManager metadataManager; + @Mock private StorageInfoProvider storageInfoProvider; private CosmosOperationChecker operationChecker; @BeforeEach public void setUp() throws Exception { openMocks(this).close(); - operationChecker = new CosmosOperationChecker(databaseConfig, metadataManager); + operationChecker = + new CosmosOperationChecker(databaseConfig, metadataManager, storageInfoProvider); } @Test @@ -218,6 +225,7 @@ public void check_ForMutationsWithPutWithCondition_ShouldBehaveProperly() throws ExecutionException { // Arrange when(metadataManager.getTableMetadata(any())).thenReturn(TABLE_METADATA1); + when(storageInfoProvider.getStorageInfo(any())).thenReturn(STORAGE_INFO); Put put = Put.newBuilder() @@ -320,6 +328,7 @@ public void check_ForMutationsWithDeleteWithCondition_ShouldBehaveProperly() throws ExecutionException { // Arrange when(metadataManager.getTableMetadata(any())).thenReturn(TABLE_METADATA1); + when(storageInfoProvider.getStorageInfo(any())).thenReturn(STORAGE_INFO); Delete delete = Delete.newBuilder() @@ -657,6 +666,7 @@ public void check_ForMutationsWithDeleteWithCondition_ShouldBehaveProperly() throws ExecutionException { // Arrange when(metadataManager.getTableMetadata(any())).thenReturn(TABLE_METADATA2); + when(storageInfoProvider.getStorageInfo(any())).thenReturn(STORAGE_INFO); Put put1 = Put.newBuilder() diff --git a/core/src/test/java/com/scalar/db/storage/dynamo/DynamoOperationCheckerTest.java b/core/src/test/java/com/scalar/db/storage/dynamo/DynamoOperationCheckerTest.java index 1ade1bc8d8..941f22bf7c 100644 --- a/core/src/test/java/com/scalar/db/storage/dynamo/DynamoOperationCheckerTest.java +++ b/core/src/test/java/com/scalar/db/storage/dynamo/DynamoOperationCheckerTest.java @@ -15,7 +15,10 @@ import com.scalar.db.api.Delete; import com.scalar.db.api.MutationCondition; import com.scalar.db.api.Put; +import com.scalar.db.api.StorageInfo; import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.StorageInfoImpl; +import com.scalar.db.common.StorageInfoProvider; import com.scalar.db.common.TableMetadataManager; import com.scalar.db.config.DatabaseConfig; import com.scalar.db.io.DataType; @@ -35,8 +38,12 @@ public class DynamoOperationCheckerTest { private static final String COL2 = "v2"; private static final String COL3 = "v3"; private static final String COL4 = "v4"; + private static final StorageInfo STORAGE_INFO = + new StorageInfoImpl("dynamo", StorageInfo.MutationAtomicityUnit.STORAGE, 100); + @Mock private DatabaseConfig databaseConfig; @Mock private TableMetadataManager metadataManager; + @Mock private StorageInfoProvider storageInfoProvider; private DynamoOperationChecker operationChecker; @BeforeEach @@ -57,7 +64,9 @@ public void setUp() throws Exception { .addSecondaryIndex(COL4) .build(); when(metadataManager.getTableMetadata(any())).thenReturn(tableMetadata); - operationChecker = new DynamoOperationChecker(databaseConfig, metadataManager); + when(storageInfoProvider.getStorageInfo(any())).thenReturn(STORAGE_INFO); + operationChecker = + new DynamoOperationChecker(databaseConfig, metadataManager, storageInfoProvider); } @Test diff --git a/core/src/test/java/com/scalar/db/storage/multistorage/MultiStorageAdminTest.java b/core/src/test/java/com/scalar/db/storage/multistorage/MultiStorageAdminTest.java index bceea4defc..2a74bf0a87 100644 --- a/core/src/test/java/com/scalar/db/storage/multistorage/MultiStorageAdminTest.java +++ b/core/src/test/java/com/scalar/db/storage/multistorage/MultiStorageAdminTest.java @@ -1,6 +1,7 @@ package com.scalar.db.storage.multistorage; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -8,7 +9,9 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.scalar.db.api.DistributedStorageAdmin; +import com.scalar.db.api.StorageInfo; import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.StorageInfoImpl; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.io.DataType; import java.util.Collections; @@ -44,10 +47,11 @@ public void setUp() throws Exception { Map tableAdminMap = new HashMap<>(); tableAdminMap.put(NAMESPACE1 + "." + TABLE1, admin1); tableAdminMap.put(NAMESPACE1 + "." + TABLE2, admin2); - Map namespaceAdminMap = new HashMap<>(); - namespaceAdminMap.put(NAMESPACE2, admin2); - DistributedStorageAdmin defaultAdmin = admin3; - multiStorageAdmin = new MultiStorageAdmin(tableAdminMap, namespaceAdminMap, defaultAdmin); + MultiStorageAdmin.AdminHolder s2 = new MultiStorageAdmin.AdminHolder("s2", admin2); + MultiStorageAdmin.AdminHolder s3 = new MultiStorageAdmin.AdminHolder("s3", admin3); + Map namespaceAdminMap = new HashMap<>(); + namespaceAdminMap.put(NAMESPACE2, s2); + multiStorageAdmin = new MultiStorageAdmin(tableAdminMap, namespaceAdminMap, s3); } @Test @@ -500,13 +504,15 @@ public void addNewColumnToTable_ForTable1InNamespace2_ShouldCallAddNewColumnOfAd getNamespaceNames_WithExistingNamespacesNotInMapping_ShouldReturnExistingNamespacesInMappingAndFromDefaultAdmin() throws ExecutionException { // Arrange - Map namespaceAdminMap = new HashMap<>(); - namespaceAdminMap.put("ns1", admin1); - namespaceAdminMap.put("ns2", admin2); - namespaceAdminMap.put("ns3", admin2); - DistributedStorageAdmin defaultAdmin = admin3; - multiStorageAdmin = - new MultiStorageAdmin(Collections.emptyMap(), namespaceAdminMap, defaultAdmin); + MultiStorageAdmin.AdminHolder s1 = new MultiStorageAdmin.AdminHolder("s1", admin1); + MultiStorageAdmin.AdminHolder s2 = new MultiStorageAdmin.AdminHolder("s2", admin2); + MultiStorageAdmin.AdminHolder s3 = new MultiStorageAdmin.AdminHolder("s3", admin3); + + Map namespaceAdminMap = new HashMap<>(); + namespaceAdminMap.put("ns1", s1); + namespaceAdminMap.put("ns2", s2); + namespaceAdminMap.put("ns3", s2); + multiStorageAdmin = new MultiStorageAdmin(Collections.emptyMap(), namespaceAdminMap, s3); when(admin1.getNamespaceNames()).thenReturn(ImmutableSet.of("ns1", "ns2")); when(admin2.getNamespaceNames()).thenReturn(ImmutableSet.of("ns3")); @@ -526,12 +532,14 @@ public void addNewColumnToTable_ForTable1InNamespace2_ShouldCallAddNewColumnOfAd public void getNamespaceNames_WithNamespaceInMappingButNotExisting_ShouldReturnEmptySet() throws ExecutionException { // Arrange - Map namespaceAdminMap = new HashMap<>(); - namespaceAdminMap.put("ns1", admin1); - namespaceAdminMap.put("ns2", admin2); - DistributedStorageAdmin defaultAdmin = admin3; - multiStorageAdmin = - new MultiStorageAdmin(Collections.emptyMap(), namespaceAdminMap, defaultAdmin); + MultiStorageAdmin.AdminHolder s1 = new MultiStorageAdmin.AdminHolder("s1", admin1); + MultiStorageAdmin.AdminHolder s2 = new MultiStorageAdmin.AdminHolder("s2", admin2); + MultiStorageAdmin.AdminHolder s3 = new MultiStorageAdmin.AdminHolder("s3", admin3); + + Map namespaceAdminMap = new HashMap<>(); + namespaceAdminMap.put("ns1", s1); + namespaceAdminMap.put("ns2", s2); + multiStorageAdmin = new MultiStorageAdmin(Collections.emptyMap(), namespaceAdminMap, s3); when(admin1.getNamespaceNames()).thenReturn(Collections.emptySet()); when(admin2.getNamespaceNames()).thenReturn(Collections.emptySet()); @@ -551,12 +559,14 @@ public void getNamespaceNames_WithNamespaceInMappingButNotExisting_ShouldReturnE public void getNamespaceNames_WithExistingNamespaceButNotInMapping_ShouldReturnEmptySet() throws ExecutionException { // Arrange - Map namespaceAdminMap = new HashMap<>(); - namespaceAdminMap.put("ns1", admin1); - namespaceAdminMap.put("ns2", admin2); - DistributedStorageAdmin defaultAdmin = admin3; - multiStorageAdmin = - new MultiStorageAdmin(Collections.emptyMap(), namespaceAdminMap, defaultAdmin); + MultiStorageAdmin.AdminHolder s1 = new MultiStorageAdmin.AdminHolder("s1", admin1); + MultiStorageAdmin.AdminHolder s2 = new MultiStorageAdmin.AdminHolder("s2", admin2); + MultiStorageAdmin.AdminHolder s3 = new MultiStorageAdmin.AdminHolder("s3", admin3); + + Map namespaceAdminMap = new HashMap<>(); + namespaceAdminMap.put("ns1", s1); + namespaceAdminMap.put("ns2", s2); + multiStorageAdmin = new MultiStorageAdmin(Collections.emptyMap(), namespaceAdminMap, s3); when(admin1.getNamespaceNames()).thenReturn(ImmutableSet.of("ns2")); when(admin2.getNamespaceNames()).thenReturn(Collections.emptySet()); @@ -604,11 +614,14 @@ public void repairNamespace_ForNamespace3_ShouldCallDefaultAdmin() throws Execut public void upgrade_ShouldCallNamespaceAndDefaultAdmins() throws ExecutionException { // Arrange Map options = ImmutableMap.of("foo", "bar"); - Map namespaceAdminMap = - ImmutableMap.of("ns1", admin1, "ns2", admin2); - DistributedStorageAdmin defaultAdmin = admin2; - multiStorageAdmin = - new MultiStorageAdmin(Collections.emptyMap(), namespaceAdminMap, defaultAdmin); + + MultiStorageAdmin.AdminHolder s1 = new MultiStorageAdmin.AdminHolder("s1", admin1); + MultiStorageAdmin.AdminHolder s2 = new MultiStorageAdmin.AdminHolder("s2", admin2); + + Map namespaceAdminMap = new HashMap<>(); + namespaceAdminMap.put("ns1", s1); + namespaceAdminMap.put("ns2", s2); + multiStorageAdmin = new MultiStorageAdmin(Collections.emptyMap(), namespaceAdminMap, s2); // Act multiStorageAdmin.upgrade(options); @@ -647,4 +660,44 @@ public void getImportTableMetadata_ForTable1InNamespace1_ShouldCallAdmin1() // Assert verify(admin1).getImportTableMetadata(namespace, table, overrideColumnsType); } + + @Test + public void getStorageInfo_ShouldReturnProperStorageInfo() throws ExecutionException { + // Arrange + MultiStorageAdmin.AdminHolder s1 = new MultiStorageAdmin.AdminHolder("s1", admin1); + MultiStorageAdmin.AdminHolder s2 = new MultiStorageAdmin.AdminHolder("s2", admin2); + MultiStorageAdmin.AdminHolder s3 = new MultiStorageAdmin.AdminHolder("s3", admin3); + + Map namespaceAdminMap = new HashMap<>(); + namespaceAdminMap.put("ns1", s1); + namespaceAdminMap.put("ns2", s2); + multiStorageAdmin = new MultiStorageAdmin(Collections.emptyMap(), namespaceAdminMap, s3); + + when(admin1.getStorageInfo(anyString())) + .thenReturn( + new StorageInfoImpl( + "cassandra", StorageInfo.MutationAtomicityUnit.PARTITION, Integer.MAX_VALUE)); + when(admin2.getStorageInfo(anyString())) + .thenReturn(new StorageInfoImpl("dynamo", StorageInfo.MutationAtomicityUnit.STORAGE, 100)); + when(admin3.getStorageInfo(anyString())) + .thenReturn( + new StorageInfoImpl( + "jdbc", StorageInfo.MutationAtomicityUnit.STORAGE, Integer.MAX_VALUE)); + + // Act Assert + assertThat(multiStorageAdmin.getStorageInfo("ns1")) + .isEqualTo( + new StorageInfoImpl( + "s1", StorageInfo.MutationAtomicityUnit.PARTITION, Integer.MAX_VALUE)); + assertThat(multiStorageAdmin.getStorageInfo("ns2")) + .isEqualTo(new StorageInfoImpl("s2", StorageInfo.MutationAtomicityUnit.STORAGE, 100)); + assertThat(multiStorageAdmin.getStorageInfo("ns3")) + .isEqualTo( + new StorageInfoImpl( + "s3", StorageInfo.MutationAtomicityUnit.STORAGE, Integer.MAX_VALUE)); + + verify(admin1).getStorageInfo("ns1"); + verify(admin2).getStorageInfo("ns2"); + verify(admin3).getStorageInfo("ns3"); + } } diff --git a/integration-test/src/main/java/com/scalar/db/api/DistributedStorageIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/api/DistributedStorageIntegrationTestBase.java index bd9ad6da28..008fdc6e80 100644 --- a/integration-test/src/main/java/com/scalar/db/api/DistributedStorageIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/api/DistributedStorageIntegrationTestBase.java @@ -1041,86 +1041,6 @@ public void put_MultiplePutWithIfNotExistsGivenWhenOneExists_ShouldThrowNoMutati .isEqualTo(Optional.of(new IntValue(COL_NAME4, pKey + cKey))); } - @Test - public void - put_MultiplePutWithDifferentPartitionsWithIfNotExistsGiven_ShouldThrowIllegalArgumentException() - throws IOException, ExecutionException { - // Arrange - List puts = preparePuts(); - puts.get(0).withCondition(new PutIfNotExists()); - puts.get(3).withCondition(new PutIfNotExists()); - puts.get(6).withCondition(new PutIfNotExists()); - - // Act - assertThatThrownBy(() -> storage.put(Arrays.asList(puts.get(0), puts.get(3), puts.get(6)))) - .isInstanceOf(IllegalArgumentException.class); - - // Assert - List results; - results = - scanAll( - Scan.newBuilder() - .namespace(namespace) - .table(TABLE) - .partitionKey(Key.ofInt(COL_NAME1, 0)) - .build()); - assertThat(results.size()).isEqualTo(0); - results = - scanAll( - Scan.newBuilder() - .namespace(namespace) - .table(TABLE) - .partitionKey(Key.ofInt(COL_NAME1, 3)) - .build()); - assertThat(results.size()).isEqualTo(0); - results = - scanAll( - Scan.newBuilder() - .namespace(namespace) - .table(TABLE) - .partitionKey(Key.ofInt(COL_NAME1, 6)) - .build()); - assertThat(results.size()).isEqualTo(0); - } - - @Test - public void put_MultiplePutWithDifferentPartitionsGiven_ShouldThrowIllegalArgumentException() - throws IOException, ExecutionException { - // Arrange - List puts = preparePuts(); - - // Act - assertThatThrownBy(() -> storage.put(Arrays.asList(puts.get(0), puts.get(3), puts.get(6)))) - .isInstanceOf(IllegalArgumentException.class); - - // Assert - List results; - results = - scanAll( - Scan.newBuilder() - .namespace(namespace) - .table(TABLE) - .partitionKey(Key.ofInt(COL_NAME1, 0)) - .build()); - assertThat(results.size()).isEqualTo(0); - results = - scanAll( - Scan.newBuilder() - .namespace(namespace) - .table(TABLE) - .partitionKey(Key.ofInt(COL_NAME1, 3)) - .build()); - assertThat(results.size()).isEqualTo(0); - results = - scanAll( - Scan.newBuilder() - .namespace(namespace) - .table(TABLE) - .partitionKey(Key.ofInt(COL_NAME1, 6)) - .build()); - assertThat(results.size()).isEqualTo(0); - } - @Test public void put_MultiplePutWithDifferentConditionsGiven_ShouldStoreProperly() throws IOException, ExecutionException { @@ -1544,44 +1464,6 @@ public void mutate_MultiplePutGiven_ShouldStoreProperly() throws ExecutionExcept assertThat(results.get(2).getValue(COL_NAME4).get().getAsInt()).isEqualTo(pKey + cKey + 2); } - @Test - public void mutate_MultiplePutWithDifferentPartitionsGiven_ShouldThrowIllegalArgumentException() - throws IOException, ExecutionException { - // Arrange - List puts = preparePuts(); - - // Act - assertThatCode(() -> storage.mutate(Arrays.asList(puts.get(0), puts.get(3), puts.get(6)))) - .isInstanceOf(IllegalArgumentException.class); - - // Assert - List results; - results = - scanAll( - Scan.newBuilder() - .namespace(namespace) - .table(TABLE) - .partitionKey(Key.ofInt(COL_NAME1, 0)) - .build()); - assertThat(results.size()).isEqualTo(0); - results = - scanAll( - Scan.newBuilder() - .namespace(namespace) - .table(TABLE) - .partitionKey(Key.ofInt(COL_NAME1, 3)) - .build()); - assertThat(results.size()).isEqualTo(0); - results = - scanAll( - Scan.newBuilder() - .namespace(namespace) - .table(TABLE) - .partitionKey(Key.ofInt(COL_NAME1, 6)) - .build()); - assertThat(results.size()).isEqualTo(0); - } - @Test public void mutate_PutAndDeleteGiven_ShouldUpdateAndDeleteRecordsProperly() throws ExecutionException, IOException { diff --git a/integration-test/src/main/java/com/scalar/db/api/DistributedStorageMutationAtomicityUnitIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/api/DistributedStorageMutationAtomicityUnitIntegrationTestBase.java new file mode 100644 index 0000000000..9b17902e7d --- /dev/null +++ b/integration-test/src/main/java/com/scalar/db/api/DistributedStorageMutationAtomicityUnitIntegrationTestBase.java @@ -0,0 +1,564 @@ +package com.scalar.db.api; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.scalar.db.exception.storage.ExecutionException; +import com.scalar.db.io.DataType; +import com.scalar.db.io.Key; +import com.scalar.db.service.StorageFactory; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public abstract class DistributedStorageMutationAtomicityUnitIntegrationTestBase { + protected static final Logger logger = + LoggerFactory.getLogger(DistributedStorageMutationAtomicityUnitIntegrationTestBase.class); + + protected static final String TEST_NAME = "storage_mau"; + protected static final String NAMESPACE1 = "int_test_" + TEST_NAME + "1"; + protected static final String NAMESPACE2 = "int_test_" + TEST_NAME + "2"; + protected static final String NAMESPACE3 = "int_test_" + TEST_NAME + "3"; + protected static final String TABLE1 = "test_table1"; + protected static final String TABLE2 = "test_table2"; + protected static final String COL_NAME1 = "c1"; + protected static final String COL_NAME2 = "c2"; + protected static final String COL_NAME3 = "c3"; + protected static final String COL_NAME4 = "c4"; + protected static final TableMetadata TABLE_METADATA = + TableMetadata.newBuilder() + .addColumn(COL_NAME1, DataType.INT) + .addColumn(COL_NAME2, DataType.INT) + .addColumn(COL_NAME3, DataType.INT) + .addColumn(COL_NAME4, DataType.INT) + .addPartitionKey(COL_NAME1) + .addClusteringKey(COL_NAME2) + .build(); + + protected DistributedStorage storage; + protected DistributedStorageAdmin admin; + protected String namespace1; + protected String namespace2; + protected String namespace3; + + @BeforeAll + public void beforeAll() throws Exception { + initialize(TEST_NAME); + StorageFactory factory = StorageFactory.create(getProperties(TEST_NAME)); + admin = factory.getStorageAdmin(); + namespace1 = getNamespace1(); + namespace2 = getNamespace2(); + namespace3 = getNamespace3(); + createTables(); + storage = factory.getStorage(); + } + + protected void initialize(String testName) throws Exception {} + + protected abstract Properties getProperties(String testName); + + protected String getNamespace1() { + return NAMESPACE1; + } + + protected String getNamespace2() { + return NAMESPACE2; + } + + protected String getNamespace3() { + return NAMESPACE3; + } + + protected void createTables() throws ExecutionException { + Map options = getCreationOptions(); + admin.createNamespace(namespace1, true, options); + admin.createNamespace(namespace2, true, options); + admin.createNamespace(namespace3, true, options); + admin.createTable(namespace1, TABLE1, TABLE_METADATA, true, options); + admin.createTable(namespace1, TABLE2, TABLE_METADATA, true, options); + admin.createTable(namespace2, TABLE1, TABLE_METADATA, true, options); + admin.createTable(namespace3, TABLE1, TABLE_METADATA, true, options); + } + + protected Map getCreationOptions() { + return Collections.emptyMap(); + } + + @BeforeEach + public void setUp() throws Exception { + truncateTable(); + } + + protected void truncateTable() throws ExecutionException { + admin.truncateTable(namespace1, TABLE1); + admin.truncateTable(namespace1, TABLE2); + admin.truncateTable(namespace2, TABLE1); + admin.truncateTable(namespace3, TABLE1); + } + + @AfterAll + public void afterAll() throws Exception { + try { + dropTable(); + } catch (Exception e) { + logger.warn("Failed to drop table", e); + } + + try { + if (admin != null) { + admin.close(); + } + } catch (Exception e) { + logger.warn("Failed to close admin", e); + } + + try { + if (storage != null) { + storage.close(); + } + } catch (Exception e) { + logger.warn("Failed to close storage", e); + } + } + + protected void dropTable() throws ExecutionException { + admin.dropTable(namespace1, TABLE1); + admin.dropTable(namespace1, TABLE2); + admin.dropTable(namespace2, TABLE1); + admin.dropTable(namespace3, TABLE1); + admin.dropNamespace(namespace1); + admin.dropNamespace(namespace2); + admin.dropNamespace(namespace3); + } + + @Test + public void mutate_MutationsWithinRecordGiven_ShouldBehaveCorrectlyBaseOnMutationAtomicityUnit() + throws ExecutionException { + // Arrange + Put put1 = + Put.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 0)) + .clusteringKey(Key.ofInt(COL_NAME2, 1)) + .intValue(COL_NAME3, 1) + .build(); + Put put2 = + Put.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 0)) + .clusteringKey(Key.ofInt(COL_NAME2, 1)) + .intValue(COL_NAME4, 2) + .build(); + + // Act + Exception exception = + Assertions.catchException(() -> storage.mutate(Arrays.asList(put1, put2))); + + // Assert + assertThat(exception).doesNotThrowAnyException(); + + Optional result = + storage.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 0)) + .clusteringKey(Key.ofInt(COL_NAME2, 1)) + .build()); + assertThat(result).isPresent(); + assertThat(result.get().getInt(COL_NAME1)).isEqualTo(0); + assertThat(result.get().getInt(COL_NAME2)).isEqualTo(1); + assertThat(result.get().getInt(COL_NAME3)).isEqualTo(1); + assertThat(result.get().getInt(COL_NAME4)).isEqualTo(2); + } + + @Test + public void + mutate_MutationsWithinPartitionGiven_ShouldBehaveCorrectlyBaseOnMutationAtomicityUnit() + throws ExecutionException { + // Arrange + storage.put( + Put.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 0)) + .clusteringKey(Key.ofInt(COL_NAME2, 3)) + .intValue(COL_NAME3, 3) + .build()); + + Put put1 = + Put.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 0)) + .clusteringKey(Key.ofInt(COL_NAME2, 1)) + .intValue(COL_NAME3, 1) + .build(); + Put put2 = + Put.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 0)) + .clusteringKey(Key.ofInt(COL_NAME2, 2)) + .intValue(COL_NAME3, 2) + .build(); + Delete delete = + Delete.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 0)) + .clusteringKey(Key.ofInt(COL_NAME2, 3)) + .build(); + + // Act + Exception exception = + Assertions.catchException(() -> storage.mutate(Arrays.asList(put1, put2, delete))); + + // Assert + StorageInfo storageInfo = admin.getStorageInfo(namespace1); + switch (storageInfo.getMutationAtomicityUnit()) { + case RECORD: + assertThat(exception).isInstanceOf(IllegalArgumentException.class); + break; + case PARTITION: + case TABLE: + case NAMESPACE: + case STORAGE: + assertThat(exception).doesNotThrowAnyException(); + + Optional result1 = + storage.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 0)) + .clusteringKey(Key.ofInt(COL_NAME2, 1)) + .build()); + assertThat(result1).isPresent(); + assertThat(result1.get().getInt(COL_NAME1)).isEqualTo(0); + assertThat(result1.get().getInt(COL_NAME2)).isEqualTo(1); + assertThat(result1.get().getInt(COL_NAME3)).isEqualTo(1); + + Optional result2 = + storage.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 0)) + .clusteringKey(Key.ofInt(COL_NAME2, 2)) + .build()); + assertThat(result2).isPresent(); + assertThat(result2.get().getInt(COL_NAME1)).isEqualTo(0); + assertThat(result2.get().getInt(COL_NAME2)).isEqualTo(2); + assertThat(result2.get().getInt(COL_NAME3)).isEqualTo(2); + + Optional result3 = + storage.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 0)) + .clusteringKey(Key.ofInt(COL_NAME2, 3)) + .build()); + assertThat(result3).isEmpty(); + break; + default: + throw new AssertionError(); + } + } + + @Test + public void mutate_MutationsWithinTableGiven_ShouldBehaveCorrectlyBaseOnMutationAtomicityUnit() + throws ExecutionException { + // Arrange + storage.put( + Put.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 2)) + .clusteringKey(Key.ofInt(COL_NAME2, 3)) + .intValue(COL_NAME3, 3) + .build()); + + Put put1 = + Put.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 0)) + .clusteringKey(Key.ofInt(COL_NAME2, 1)) + .intValue(COL_NAME3, 1) + .build(); + Put put2 = + Put.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 1)) + .clusteringKey(Key.ofInt(COL_NAME2, 2)) + .intValue(COL_NAME3, 2) + .build(); + Delete delete = + Delete.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 2)) + .clusteringKey(Key.ofInt(COL_NAME2, 3)) + .build(); + + // Act + Exception exception = + Assertions.catchException(() -> storage.mutate(Arrays.asList(put1, put2, delete))); + + // Assert + StorageInfo storageInfo = admin.getStorageInfo(namespace1); + switch (storageInfo.getMutationAtomicityUnit()) { + case RECORD: + case PARTITION: + assertThat(exception).isInstanceOf(IllegalArgumentException.class); + break; + case TABLE: + case NAMESPACE: + case STORAGE: + assertThat(exception).doesNotThrowAnyException(); + + Optional result1 = + storage.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 0)) + .clusteringKey(Key.ofInt(COL_NAME2, 1)) + .build()); + assertThat(result1).isPresent(); + assertThat(result1.get().getInt(COL_NAME1)).isEqualTo(0); + assertThat(result1.get().getInt(COL_NAME2)).isEqualTo(1); + assertThat(result1.get().getInt(COL_NAME3)).isEqualTo(1); + + Optional result2 = + storage.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 1)) + .clusteringKey(Key.ofInt(COL_NAME2, 2)) + .build()); + assertThat(result2).isPresent(); + assertThat(result2.get().getInt(COL_NAME1)).isEqualTo(1); + assertThat(result2.get().getInt(COL_NAME2)).isEqualTo(2); + assertThat(result2.get().getInt(COL_NAME3)).isEqualTo(2); + + Optional result3 = + storage.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 2)) + .clusteringKey(Key.ofInt(COL_NAME2, 3)) + .build()); + assertThat(result3).isEmpty(); + break; + default: + throw new AssertionError(); + } + } + + @Test + public void + mutate_MutationsWithinNamespaceGiven_ShouldBehaveCorrectlyBaseOnMutationAtomicityUnit() + throws ExecutionException { + // Arrange + storage.put( + Put.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 2)) + .clusteringKey(Key.ofInt(COL_NAME2, 3)) + .intValue(COL_NAME3, 3) + .build()); + + Put put1 = + Put.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 0)) + .clusteringKey(Key.ofInt(COL_NAME2, 1)) + .intValue(COL_NAME3, 1) + .build(); + Put put2 = + Put.newBuilder() + .namespace(namespace1) + .table(TABLE2) + .partitionKey(Key.ofInt(COL_NAME1, 1)) + .clusteringKey(Key.ofInt(COL_NAME2, 2)) + .intValue(COL_NAME3, 2) + .build(); + Delete delete = + Delete.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 2)) + .clusteringKey(Key.ofInt(COL_NAME2, 3)) + .build(); + + // Act + Exception exception = + Assertions.catchException(() -> storage.mutate(Arrays.asList(put1, put2, delete))); + + // Assert + StorageInfo storageInfo = admin.getStorageInfo(namespace1); + switch (storageInfo.getMutationAtomicityUnit()) { + case RECORD: + case PARTITION: + case TABLE: + assertThat(exception).isInstanceOf(IllegalArgumentException.class); + break; + case NAMESPACE: + case STORAGE: + assertThat(exception).doesNotThrowAnyException(); + + Optional result1 = + storage.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 0)) + .clusteringKey(Key.ofInt(COL_NAME2, 1)) + .build()); + assertThat(result1).isPresent(); + assertThat(result1.get().getInt(COL_NAME1)).isEqualTo(0); + assertThat(result1.get().getInt(COL_NAME2)).isEqualTo(1); + assertThat(result1.get().getInt(COL_NAME3)).isEqualTo(1); + + Optional result2 = + storage.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE2) + .partitionKey(Key.ofInt(COL_NAME1, 1)) + .clusteringKey(Key.ofInt(COL_NAME2, 2)) + .build()); + assertThat(result2).isPresent(); + assertThat(result2.get().getInt(COL_NAME1)).isEqualTo(1); + assertThat(result2.get().getInt(COL_NAME2)).isEqualTo(2); + assertThat(result2.get().getInt(COL_NAME3)).isEqualTo(2); + + Optional result3 = + storage.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 2)) + .clusteringKey(Key.ofInt(COL_NAME2, 3)) + .build()); + assertThat(result3).isEmpty(); + break; + default: + throw new AssertionError(); + } + } + + @Test + public void mutate_MutationsWithinStorageGiven_ShouldBehaveCorrectlyBaseOnMutationAtomicityUnit() + throws ExecutionException { + // Arrange + storage.put( + Put.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 2)) + .clusteringKey(Key.ofInt(COL_NAME2, 3)) + .intValue(COL_NAME3, 3) + .build()); + + Put put1 = + Put.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 0)) + .clusteringKey(Key.ofInt(COL_NAME2, 1)) + .intValue(COL_NAME3, 1) + .build(); + Put put2 = + Put.newBuilder() + .namespace(namespace2) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 1)) + .clusteringKey(Key.ofInt(COL_NAME2, 2)) + .intValue(COL_NAME3, 2) + .build(); + Delete delete = + Delete.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 2)) + .clusteringKey(Key.ofInt(COL_NAME2, 3)) + .build(); + + // Act + Exception exception = + Assertions.catchException(() -> storage.mutate(Arrays.asList(put1, put2, delete))); + + // Assert + StorageInfo storageInfo = admin.getStorageInfo(namespace1); + switch (storageInfo.getMutationAtomicityUnit()) { + case RECORD: + case PARTITION: + case TABLE: + case NAMESPACE: + assertThat(exception).isInstanceOf(IllegalArgumentException.class); + break; + case STORAGE: + assertThat(exception).doesNotThrowAnyException(); + + Optional result1 = + storage.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 0)) + .clusteringKey(Key.ofInt(COL_NAME2, 1)) + .build()); + assertThat(result1).isPresent(); + assertThat(result1.get().getInt(COL_NAME1)).isEqualTo(0); + assertThat(result1.get().getInt(COL_NAME2)).isEqualTo(1); + assertThat(result1.get().getInt(COL_NAME3)).isEqualTo(1); + + Optional result2 = + storage.get( + Get.newBuilder() + .namespace(namespace2) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 1)) + .clusteringKey(Key.ofInt(COL_NAME2, 2)) + .build()); + assertThat(result2).isPresent(); + assertThat(result2.get().getInt(COL_NAME1)).isEqualTo(1); + assertThat(result2.get().getInt(COL_NAME2)).isEqualTo(2); + assertThat(result2.get().getInt(COL_NAME3)).isEqualTo(2); + + Optional result3 = + storage.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE1) + .partitionKey(Key.ofInt(COL_NAME1, 2)) + .clusteringKey(Key.ofInt(COL_NAME2, 3)) + .build()); + assertThat(result3).isEmpty(); + break; + default: + throw new AssertionError(); + } + } +} From 0a32de8e350a05fc52aaa4d8cec6471da83788d9 Mon Sep 17 00:00:00 2001 From: brfrn169 Date: Thu, 12 Jun 2025 15:15:08 +0900 Subject: [PATCH 2/3] Rename getMaxAtomicMutationCount to getMaxAtomicMutationsCount --- .../com/scalar/db/api/DistributedStorage.java | 2 +- .../java/com/scalar/db/api/StorageInfo.java | 2 +- .../com/scalar/db/common/StorageInfoImpl.java | 18 ++++++++++-------- .../multistorage/MultiStorageAdmin.java | 2 +- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/com/scalar/db/api/DistributedStorage.java b/core/src/main/java/com/scalar/db/api/DistributedStorage.java index 531ad976c5..36ca679075 100644 --- a/core/src/main/java/com/scalar/db/api/DistributedStorage.java +++ b/core/src/main/java/com/scalar/db/api/DistributedStorage.java @@ -185,7 +185,7 @@ public interface DistributedStorage extends AutoCloseable { * StorageInfo#getMutationAtomicityUnit()}. For example, if the atomicity unit of the storage is * {@link StorageInfo.MutationAtomicityUnit#PARTITION}, the mutations must occur within the same * partition. Also note that the maximum number of mutations that can be performed atomically is - * defined by {@link StorageInfo#getMaxAtomicMutationCount()}. + * defined by {@link StorageInfo#getMaxAtomicMutationsCount()}. * *

To retrieve storage information, use {@link DistributedStorageAdmin#getStorageInfo(String)}. * diff --git a/core/src/main/java/com/scalar/db/api/StorageInfo.java b/core/src/main/java/com/scalar/db/api/StorageInfo.java index b2453e349a..507f974318 100644 --- a/core/src/main/java/com/scalar/db/api/StorageInfo.java +++ b/core/src/main/java/com/scalar/db/api/StorageInfo.java @@ -20,7 +20,7 @@ public interface StorageInfo { * * @return the maximum number of mutations that can be performed atomically in the storage */ - int getMaxAtomicMutationCount(); + int getMaxAtomicMutationsCount(); /** * The mutation atomicity unit of the storage. diff --git a/core/src/main/java/com/scalar/db/common/StorageInfoImpl.java b/core/src/main/java/com/scalar/db/common/StorageInfoImpl.java index 3bc989e8be..2f3fc48a1e 100644 --- a/core/src/main/java/com/scalar/db/common/StorageInfoImpl.java +++ b/core/src/main/java/com/scalar/db/common/StorageInfoImpl.java @@ -10,13 +10,15 @@ public class StorageInfoImpl implements StorageInfo { private final String storageName; private final MutationAtomicityUnit mutationAtomicityUnit; - private final int maxAtomicMutationCount; + private final int maxAtomicMutationsCount; public StorageInfoImpl( - String storageName, MutationAtomicityUnit mutationAtomicityUnit, int maxAtomicMutationCount) { + String storageName, + MutationAtomicityUnit mutationAtomicityUnit, + int maxAtomicMutationsCount) { this.storageName = storageName; this.mutationAtomicityUnit = mutationAtomicityUnit; - this.maxAtomicMutationCount = maxAtomicMutationCount; + this.maxAtomicMutationsCount = maxAtomicMutationsCount; } @Override @@ -30,8 +32,8 @@ public MutationAtomicityUnit getMutationAtomicityUnit() { } @Override - public int getMaxAtomicMutationCount() { - return maxAtomicMutationCount; + public int getMaxAtomicMutationsCount() { + return maxAtomicMutationsCount; } @Override @@ -43,14 +45,14 @@ public boolean equals(Object o) { return false; } StorageInfoImpl that = (StorageInfoImpl) o; - return getMaxAtomicMutationCount() == that.getMaxAtomicMutationCount() + return getMaxAtomicMutationsCount() == that.getMaxAtomicMutationsCount() && Objects.equals(getStorageName(), that.getStorageName()) && getMutationAtomicityUnit() == that.getMutationAtomicityUnit(); } @Override public int hashCode() { - return Objects.hash(getStorageName(), getMutationAtomicityUnit(), getMaxAtomicMutationCount()); + return Objects.hash(getStorageName(), getMutationAtomicityUnit(), getMaxAtomicMutationsCount()); } @Override @@ -58,7 +60,7 @@ public String toString() { return MoreObjects.toStringHelper(this) .add("storageName", storageName) .add("mutationAtomicityUnit", mutationAtomicityUnit) - .add("maxAtomicMutationCount", maxAtomicMutationCount) + .add("maxAtomicMutationsCount", maxAtomicMutationsCount) .toString(); } } diff --git a/core/src/main/java/com/scalar/db/storage/multistorage/MultiStorageAdmin.java b/core/src/main/java/com/scalar/db/storage/multistorage/MultiStorageAdmin.java index 9697877053..a4c9528b88 100644 --- a/core/src/main/java/com/scalar/db/storage/multistorage/MultiStorageAdmin.java +++ b/core/src/main/java/com/scalar/db/storage/multistorage/MultiStorageAdmin.java @@ -288,7 +288,7 @@ public StorageInfo getStorageInfo(String namespace) throws ExecutionException { return new StorageInfoImpl( holder.storageName, storageInfo.getMutationAtomicityUnit(), - storageInfo.getMaxAtomicMutationCount()); + storageInfo.getMaxAtomicMutationsCount()); } catch (RuntimeException e) { if (e.getCause() instanceof ExecutionException) { throw (ExecutionException) e.getCause(); From bd542b7ae2a75d125cbf19a44c102b4744338166 Mon Sep 17 00:00:00 2001 From: brfrn169 Date: Thu, 12 Jun 2025 18:56:50 +0900 Subject: [PATCH 3/3] Fix based on feedback --- .../scalar/db/storage/multistorage/MultiStorageAdmin.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/com/scalar/db/storage/multistorage/MultiStorageAdmin.java b/core/src/main/java/com/scalar/db/storage/multistorage/MultiStorageAdmin.java index a4c9528b88..5c7c4f9a79 100644 --- a/core/src/main/java/com/scalar/db/storage/multistorage/MultiStorageAdmin.java +++ b/core/src/main/java/com/scalar/db/storage/multistorage/MultiStorageAdmin.java @@ -327,10 +327,10 @@ public void close() { @VisibleForTesting static class AdminHolder { - public final String storageName; - public final DistributedStorageAdmin admin; + private final String storageName; + private final DistributedStorageAdmin admin; - public AdminHolder(String storageName, DistributedStorageAdmin admin) { + AdminHolder(String storageName, DistributedStorageAdmin admin) { this.storageName = storageName; this.admin = admin; }