Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.scalar.db.storage.cassandra;

import com.scalar.db.api.DistributedStorageMutationAtomicityUnitIntegrationTestBase;
import com.scalar.db.api.DistributedStorageAtomicityUnitIntegrationTestBase;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

public class CassandraMutationAtomicityUnitIntegrationTest
extends DistributedStorageMutationAtomicityUnitIntegrationTestBase {
public class CassandraAtomicityUnitIntegrationTest
extends DistributedStorageAtomicityUnitIntegrationTestBase {

@Override
protected Properties getProperties(String testName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.scalar.db.storage.cosmos;

import com.scalar.db.api.DistributedStorageMutationAtomicityUnitIntegrationTestBase;
import com.scalar.db.api.DistributedStorageAtomicityUnitIntegrationTestBase;
import java.util.Map;
import java.util.Properties;
import org.junit.jupiter.api.Disabled;

public class CosmosMutationAtomicityUnitIntegrationTest
extends DistributedStorageMutationAtomicityUnitIntegrationTestBase {
public class CosmosAtomicityUnitIntegrationTest
extends DistributedStorageAtomicityUnitIntegrationTestBase {

@Override
protected Properties getProperties(String testName) {
Expand All @@ -20,6 +20,5 @@ protected Map<String, String> getCreationOptions() {

@Disabled("This test fails. It might be a bug")
@Override
public void
mutate_MutationsWithinRecordGiven_ShouldBehaveCorrectlyBaseOnMutationAtomicityUnit() {}
public void mutate_MutationsWithinRecordGiven_ShouldBehaveCorrectlyBaseOnAtomicityUnit() {}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.scalar.db.storage.dynamo;

import com.scalar.db.api.DistributedStorageMutationAtomicityUnitIntegrationTestBase;
import com.scalar.db.api.DistributedStorageAtomicityUnitIntegrationTestBase;
import java.util.Map;
import java.util.Properties;
import org.junit.jupiter.api.Disabled;

public class DynamoMutationAtomicityUnitIntegrationTest
extends DistributedStorageMutationAtomicityUnitIntegrationTestBase {
public class DynamoAtomicityUnitIntegrationTest
extends DistributedStorageAtomicityUnitIntegrationTestBase {

@Override
protected Properties getProperties(String testName) {
Expand All @@ -20,6 +20,5 @@ protected Map<String, String> getCreationOptions() {

@Disabled("Transaction request cannot include multiple operations on one item in DynamoDB")
@Override
public void
mutate_MutationsWithinRecordGiven_ShouldBehaveCorrectlyBaseOnMutationAtomicityUnit() {}
public void mutate_MutationsWithinRecordGiven_ShouldBehaveCorrectlyBaseOnAtomicityUnit() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.scalar.db.storage.jdbc;

import com.scalar.db.api.DistributedStorageAtomicityUnitIntegrationTestBase;
import java.util.Properties;

public class JdbcDatabaseAtomicityUnitIntegrationTest
extends DistributedStorageAtomicityUnitIntegrationTestBase {

@Override
protected Properties getProperties(String testName) {
return JdbcEnv.getProperties(testName);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import static org.assertj.core.api.Assertions.assertThat;

import com.scalar.db.api.DistributedStorageMutationAtomicityUnitIntegrationTestBase;
import com.scalar.db.api.DistributedStorageAtomicityUnitIntegrationTestBase;
import com.scalar.db.api.Put;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.storage.ExecutionException;
Expand All @@ -12,8 +12,8 @@
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class MultiStorageMutationAtomicityUnitIntegrationTest
extends DistributedStorageMutationAtomicityUnitIntegrationTestBase {
public class MultiStorageAtomicityUnitIntegrationTest
extends DistributedStorageAtomicityUnitIntegrationTestBase {

@Override
public Properties getProperties(String testName) {
Expand Down Expand Up @@ -61,7 +61,7 @@ public Properties getProperties(String testName) {
}

@Test
public void mutate_MutationsAcrossStorageGiven_ShouldBehaveCorrectlyBaseOnMutationAtomicityUnit()
public void mutate_MutationsAcrossStorageGiven_ShouldBehaveCorrectlyBaseOnAtomicityUnit()
throws ExecutionException {
// Arrange
Put put1 =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.scalar.db.storage.objectstorage;

import com.scalar.db.api.DistributedStorageMutationAtomicityUnitIntegrationTestBase;
import com.scalar.db.api.DistributedStorageAtomicityUnitIntegrationTestBase;
import java.util.Map;
import java.util.Properties;

public class ObjectStorageMutationAtomicityUnitIntegrationTest
extends DistributedStorageMutationAtomicityUnitIntegrationTestBase {
extends DistributedStorageAtomicityUnitIntegrationTestBase {

@Override
protected Properties getProperties(String testName) {
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/java/com/scalar/db/api/DistributedStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,10 @@ public interface DistributedStorage extends AutoCloseable {
* Mutates entries of the underlying storage with the specified list of {@link Mutation} commands.
*
* <p>Note that this method only supports mutations within the atomicity unit specified by {@link
* StorageInfo#getMutationAtomicityUnit()}. For example, if the atomicity unit of the storage is
* {@link StorageInfo.MutationAtomicityUnit#PARTITION}, the mutations must occur within the same
* partition. Also note that the maximum number of mutations that can be performed atomically is
* defined by {@link StorageInfo#getMaxAtomicMutationsCount()}.
* StorageInfo#getAtomicityUnit()}. For example, if the atomicity unit of the storage is {@link
* StorageInfo.AtomicityUnit#PARTITION}, the mutations must occur within the same partition. Also
* note that the maximum number of mutations that can be performed atomically is defined by {@link
* StorageInfo#getMaxAtomicMutationsCount()}.
*
* <p>To retrieve storage information, use {@link DistributedStorageAdmin#getStorageInfo(String)}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public interface DistributedStorageAdmin extends Admin, AutoCloseable {
/**
* Returns the storage information.
*
* <p>Note: This feature is primarily for internal use. Breaking changes can and will be
* introduced to it. Users should not depend on it.
*
* @param namespace the namespace to get the storage information for
* @return the storage information
* @throws ExecutionException if the operation fails
Expand Down
38 changes: 23 additions & 15 deletions core/src/main/java/com/scalar/db/api/StorageInfo.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.scalar.db.api;

/** Represents storage information. */
public interface StorageInfo {
/**
* Returns the storage name.
Expand All @@ -9,11 +10,11 @@ public interface StorageInfo {
String getStorageName();

/**
* Returns the mutation atomicity unit of the storage.
* Returns the atomicity unit of the storage.
*
* @return the mutation atomicity unit of the storage
* @return the atomicity unit of the storage
*/
MutationAtomicityUnit getMutationAtomicityUnit();
AtomicityUnit getAtomicityUnit();

/**
* Returns the maximum number of mutations that can be performed atomically in the storage.
Expand All @@ -23,38 +24,45 @@ public interface StorageInfo {
int getMaxAtomicMutationsCount();

/**
* The mutation atomicity unit of the storage.
* Returns whether the storage guarantees consistent reads within its atomicity unit.
*
* <p>This enum defines the atomicity unit for mutations in the storage. It determines the scope
* of atomicity for mutations such as put and delete.
* @return true if the storage guarantees consistent reads within its atomicity unit, false
* otherwise
*/
enum MutationAtomicityUnit {
boolean isConsistentReadGuaranteed();

/**
* The atomicity unit of the storage.
*
* <p>This enum defines the atomicity unit for operations in the storage.
*/
enum AtomicityUnit {
/**
* The atomicity unit is at the record level, meaning that mutations are performed atomically
* The atomicity unit is at the record level, meaning that operations are performed atomically
* for each record.
*/
RECORD,

/**
* The atomicity unit is at the partition level, meaning that mutations are performed atomically
* for each partition.
* The atomicity unit is at the partition level, meaning that operations are performed
* atomically for each partition.
*/
PARTITION,

/**
* The atomicity unit is at the table level, meaning that mutations are performed atomically for
* each table.
* The atomicity unit is at the table level, meaning that operations are performed atomically
* for each table.
*/
TABLE,

/**
* The atomicity unit is at the namespace level, meaning that mutations are performed atomically
* for each namespace.
* The atomicity unit is at the namespace level, meaning that operations are performed
* atomically for each namespace.
*/
NAMESPACE,

/**
* The atomicity unit is at the storage level, meaning that mutations are performed atomically
* The atomicity unit is at the storage level, meaning that operations are performed atomically
* for the entire storage.
*/
STORAGE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,23 +513,23 @@ public void createVirtualTable(
Map<String, String> options)
throws ExecutionException {
StorageInfo storageInfo = getStorageInfo(leftSourceNamespace);
switch (storageInfo.getMutationAtomicityUnit()) {
switch (storageInfo.getAtomicityUnit()) {
case STORAGE:
break;
case NAMESPACE:
if (!leftSourceNamespace.equals(rightSourceNamespace)) {
throw new IllegalArgumentException(
CoreError.VIRTUAL_TABLE_SOURCE_TABLES_OUTSIDE_OF_ATOMICITY_UNIT.buildMessage(
storageInfo.getStorageName(),
storageInfo.getMutationAtomicityUnit(),
storageInfo.getAtomicityUnit(),
ScalarDbUtils.getFullTableName(leftSourceNamespace, leftSourceTable),
ScalarDbUtils.getFullTableName(rightSourceNamespace, rightSourceTable)));
}
break;
default:
throw new UnsupportedOperationException(
CoreError.VIRTUAL_TABLE_NOT_SUPPORTED_IN_STORAGE.buildMessage(
storageInfo.getStorageName(), storageInfo.getMutationAtomicityUnit()));
storageInfo.getStorageName(), storageInfo.getAtomicityUnit()));
}

if (!namespaceExists(namespace)) {
Expand Down
32 changes: 23 additions & 9 deletions core/src/main/java/com/scalar/db/common/StorageInfoImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@
public class StorageInfoImpl implements StorageInfo {

private final String storageName;
private final MutationAtomicityUnit mutationAtomicityUnit;
private final AtomicityUnit atomicityUnit;
private final int maxAtomicMutationsCount;
private final boolean consistentReadGuaranteed;

public StorageInfoImpl(
String storageName,
MutationAtomicityUnit mutationAtomicityUnit,
int maxAtomicMutationsCount) {
AtomicityUnit atomicityUnit,
int maxAtomicMutationsCount,
boolean consistentReadGuaranteed) {
this.storageName = storageName;
this.mutationAtomicityUnit = mutationAtomicityUnit;
this.atomicityUnit = atomicityUnit;
this.maxAtomicMutationsCount = maxAtomicMutationsCount;
this.consistentReadGuaranteed = consistentReadGuaranteed;
}

@Override
Expand All @@ -27,15 +30,20 @@ public String getStorageName() {
}

@Override
public MutationAtomicityUnit getMutationAtomicityUnit() {
return mutationAtomicityUnit;
public AtomicityUnit getAtomicityUnit() {
return atomicityUnit;
}

@Override
public int getMaxAtomicMutationsCount() {
return maxAtomicMutationsCount;
}

@Override
public boolean isConsistentReadGuaranteed() {
return consistentReadGuaranteed;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -47,20 +55,26 @@ public boolean equals(Object o) {
StorageInfoImpl that = (StorageInfoImpl) o;
return getMaxAtomicMutationsCount() == that.getMaxAtomicMutationsCount()
&& Objects.equals(getStorageName(), that.getStorageName())
&& getMutationAtomicityUnit() == that.getMutationAtomicityUnit();
&& getAtomicityUnit() == that.getAtomicityUnit()
&& isConsistentReadGuaranteed() == that.isConsistentReadGuaranteed();
}

@Override
public int hashCode() {
return Objects.hash(getStorageName(), getMutationAtomicityUnit(), getMaxAtomicMutationsCount());
return Objects.hash(
getStorageName(),
getAtomicityUnit(),
getMaxAtomicMutationsCount(),
isConsistentReadGuaranteed());
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("storageName", storageName)
.add("mutationAtomicityUnit", mutationAtomicityUnit)
.add("atomicityUnit", atomicityUnit)
.add("maxAtomicMutationsCount", maxAtomicMutationsCount)
.add("consistentReadGuaranteed", consistentReadGuaranteed)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ private boolean isOutOfAtomicityUnit(
&& mutation2.forNamespace().isPresent()
&& mutation2.forTable().isPresent();

switch (storageInfo1.getMutationAtomicityUnit()) {
switch (storageInfo1.getAtomicityUnit()) {
case RECORD:
if (!mutation1.getClusteringKey().equals(mutation2.getClusteringKey())) {
return true; // Different clustering keys
Expand Down Expand Up @@ -402,16 +402,15 @@ private boolean isOutOfAtomicityUnit(
}
break;
default:
throw new AssertionError(
"Unknown mutation atomicity unit: " + storageInfo1.getMutationAtomicityUnit());
throw new AssertionError("Unknown atomicity unit: " + storageInfo1.getAtomicityUnit());
}

return false;
}

private String getErrorMessageForOutOfAtomicityUnit(
StorageInfo storageInfo, List<? extends Mutation> mutations) {
switch (storageInfo.getMutationAtomicityUnit()) {
switch (storageInfo.getAtomicityUnit()) {
case RECORD:
return CoreError.OPERATION_CHECK_ERROR_MULTI_RECORD_MUTATION.buildMessage(
storageInfo.getStorageName(), mutations);
Expand All @@ -427,8 +426,7 @@ private String getErrorMessageForOutOfAtomicityUnit(
case STORAGE:
return CoreError.OPERATION_CHECK_ERROR_MULTI_STORAGE_MUTATION.buildMessage(mutations);
default:
throw new AssertionError(
"Unknown mutation atomicity unit: " + storageInfo.getMutationAtomicityUnit());
throw new AssertionError("Unknown atomicity unit: " + storageInfo.getAtomicityUnit());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ public class CassandraAdmin implements DistributedStorageAdmin {
private static final StorageInfo STORAGE_INFO =
new StorageInfoImpl(
"cassandra",
StorageInfo.MutationAtomicityUnit.PARTITION,
StorageInfo.AtomicityUnit.PARTITION,
// No limit on the number of mutations
Integer.MAX_VALUE);
Integer.MAX_VALUE,
true);

private final ClusterManager clusterManager;
private final String metadataKeyspace;
Expand Down
Loading
Loading