diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java index 814d4779d9..9268905d53 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java @@ -36,6 +36,7 @@ public class CommitHandler { protected final Coordinator coordinator; private final TransactionTableMetadataManager tableMetadataManager; private final ParallelExecutor parallelExecutor; + protected final boolean coordinatorWriteOmissionOnReadOnlyEnabled; @LazyInit @Nullable private BeforePreparationSnapshotHook beforePreparationSnapshotHook; @@ -44,11 +45,13 @@ public CommitHandler( DistributedStorage storage, Coordinator coordinator, TransactionTableMetadataManager tableMetadataManager, - ParallelExecutor parallelExecutor) { + ParallelExecutor parallelExecutor, + boolean coordinatorWriteOmissionOnReadOnlyEnabled) { this.storage = checkNotNull(storage); this.coordinator = checkNotNull(coordinator); this.tableMetadataManager = checkNotNull(tableMetadataManager); this.parallelExecutor = checkNotNull(parallelExecutor); + this.coordinatorWriteOmissionOnReadOnlyEnabled = coordinatorWriteOmissionOnReadOnlyEnabled; } /** @@ -106,42 +109,62 @@ private void waitBeforePreparationSnapshotHookFuture( } } - public void commit(Snapshot snapshot) throws CommitException, UnknownTransactionStatusException { + public void commit(Snapshot snapshot, boolean readOnly) + throws CommitException, UnknownTransactionStatusException { + boolean hasWritesOrDeletesInSnapshot = !readOnly && snapshot.hasWritesOrDeletes(); + Optional> snapshotHookFuture = invokeBeforePreparationSnapshotHook(snapshot); - try { - prepare(snapshot); - } catch (PreparationException e) { - safelyCallOnFailureBeforeCommit(snapshot); - abortState(snapshot.getId()); - rollbackRecords(snapshot); - if (e instanceof PreparationConflictException) { - throw new CommitConflictException(e.getMessage(), e, e.getTransactionId().orElse(null)); + + if (hasWritesOrDeletesInSnapshot) { + try { + prepare(snapshot); + } catch (PreparationException e) { + safelyCallOnFailureBeforeCommit(snapshot); + abortState(snapshot.getId()); + rollbackRecords(snapshot); + if (e instanceof PreparationConflictException) { + throw new CommitConflictException(e.getMessage(), e, e.getTransactionId().orElse(null)); + } + throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null)); + } catch (Exception e) { + safelyCallOnFailureBeforeCommit(snapshot); + throw e; } - throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null)); - } catch (Exception e) { - safelyCallOnFailureBeforeCommit(snapshot); - throw e; } - try { - validate(snapshot); - } catch (ValidationException e) { - safelyCallOnFailureBeforeCommit(snapshot); - abortState(snapshot.getId()); - rollbackRecords(snapshot); - if (e instanceof ValidationConflictException) { - throw new CommitConflictException(e.getMessage(), e, e.getTransactionId().orElse(null)); + if (snapshot.hasReads()) { + try { + validate(snapshot); + } catch (ValidationException e) { + safelyCallOnFailureBeforeCommit(snapshot); + + // If the transaction has no writes and deletes, we don't need to abort-state and + // rollback-records since there are no changes to be made. + if (hasWritesOrDeletesInSnapshot || !coordinatorWriteOmissionOnReadOnlyEnabled) { + abortState(snapshot.getId()); + } + if (hasWritesOrDeletesInSnapshot) { + rollbackRecords(snapshot); + } + + if (e instanceof ValidationConflictException) { + throw new CommitConflictException(e.getMessage(), e, e.getTransactionId().orElse(null)); + } + throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null)); + } catch (Exception e) { + safelyCallOnFailureBeforeCommit(snapshot); + throw e; } - throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null)); - } catch (Exception e) { - safelyCallOnFailureBeforeCommit(snapshot); - throw e; } waitBeforePreparationSnapshotHookFuture(snapshot, snapshotHookFuture.orElse(null)); - commitState(snapshot); - commitRecords(snapshot); + if (hasWritesOrDeletesInSnapshot || !coordinatorWriteOmissionOnReadOnlyEnabled) { + commitState(snapshot); + } + if (hasWritesOrDeletesInSnapshot) { + commitRecords(snapshot); + } } protected void handleCommitConflict(Snapshot snapshot, Exception cause) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java index 4f47f73b5f..ca7fcdb936 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java @@ -5,6 +5,7 @@ import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.TransactionState; import com.scalar.db.exception.transaction.CommitConflictException; +import com.scalar.db.exception.transaction.CommitException; import com.scalar.db.exception.transaction.UnknownTransactionStatusException; import com.scalar.db.transaction.consensuscommit.Coordinator.State; import com.scalar.db.util.groupcommit.Emittable; @@ -28,8 +29,14 @@ public CommitHandlerWithGroupCommit( Coordinator coordinator, TransactionTableMetadataManager tableMetadataManager, ParallelExecutor parallelExecutor, + boolean coordinatorWriteOmissionOnReadOnlyEnabled, CoordinatorGroupCommitter groupCommitter) { - super(storage, coordinator, tableMetadataManager, parallelExecutor); + super( + storage, + coordinator, + tableMetadataManager, + parallelExecutor, + coordinatorWriteOmissionOnReadOnlyEnabled); checkNotNull(groupCommitter); // The methods of this emitter will be called via GroupCommitter.ready(). @@ -37,6 +44,16 @@ public CommitHandlerWithGroupCommit( this.groupCommitter = groupCommitter; } + @Override + public void commit(Snapshot snapshot, boolean readOnly) + throws CommitException, UnknownTransactionStatusException { + if (!readOnly && !snapshot.hasWritesOrDeletes() && coordinatorWriteOmissionOnReadOnlyEnabled) { + cancelGroupCommitIfNeeded(snapshot.getId()); + } + + super.commit(snapshot, readOnly); + } + @Override protected void onFailureBeforeCommit(Snapshot snapshot) { cancelGroupCommitIfNeeded(snapshot.getId()); diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java index c2f6f12797..3a41c11fd9 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java @@ -269,7 +269,7 @@ public void commit() throws CommitException, UnknownTransactionStatusException { CoreError.CONSENSUS_COMMIT_EXECUTING_IMPLICIT_PRE_READ_FAILED.buildMessage(), e, getId()); } - commit.commit(crud.getSnapshot()); + commit.commit(crud.getSnapshot(), crud.isReadOnly()); } @Override @@ -280,7 +280,7 @@ public void rollback() { logger.warn("Failed to close the scanner", e); } - if (groupCommitter != null) { + if (groupCommitter != null && !crud.isReadOnly()) { groupCommitter.remove(crud.getSnapshot().getId()); } } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfig.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfig.java index cff4f4f361..ea61b36c58 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfig.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfig.java @@ -33,6 +33,9 @@ public class ConsensusCommitConfig { public static final String ASYNC_COMMIT_ENABLED = PREFIX + "async_commit.enabled"; public static final String ASYNC_ROLLBACK_ENABLED = PREFIX + "async_rollback.enabled"; + public static final String COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED = + PREFIX + "coordinator.write_omission_on_read_only.enabled"; + public static final String PARALLEL_IMPLICIT_PRE_READ = PREFIX + "parallel_implicit_pre_read.enabled"; @@ -73,10 +76,12 @@ public class ConsensusCommitConfig { private final boolean asyncCommitEnabled; private final boolean asyncRollbackEnabled; - private final boolean isIncludeMetadataEnabled; + private final boolean coordinatorWriteOmissionOnReadOnlyEnabled; private final boolean parallelImplicitPreReadEnabled; + private final boolean isIncludeMetadataEnabled; + private final boolean coordinatorGroupCommitEnabled; private final int coordinatorGroupCommitSlotCapacity; private final int coordinatorGroupCommitGroupSizeFixTimeoutMillis; @@ -92,7 +97,9 @@ public ConsensusCommitConfig(DatabaseConfig databaseConfig) { DatabaseConfig.TRANSACTION_MANAGER + " should be '" + TRANSACTION_MANAGER_NAME + "'"); } - if (databaseConfig.getProperties().containsKey("scalar.db.isolation_level")) { + Properties properties = databaseConfig.getProperties(); + + if (properties.containsKey("scalar.db.isolation_level")) { logger.warn( "The property \"scalar.db.isolation_level\" is deprecated and will be removed in 5.0.0. " + "Please use \"" @@ -102,19 +109,17 @@ public ConsensusCommitConfig(DatabaseConfig databaseConfig) { isolation = Isolation.valueOf( getString( - databaseConfig.getProperties(), + properties, ISOLATION_LEVEL, getString( - databaseConfig.getProperties(), + properties, "scalar.db.isolation_level", // for backward compatibility Isolation.SNAPSHOT.toString())) .toUpperCase(Locale.ROOT)); if (isolation.equals(Isolation.SERIALIZABLE)) { validateCrossPartitionScanConfig(databaseConfig); - if (databaseConfig - .getProperties() - .containsKey("scalar.db.consensus_commit.serializable_strategy")) { + if (properties.containsKey("scalar.db.consensus_commit.serializable_strategy")) { logger.warn( "The property \"scalar.db.consensus_commit.serializable_strategy\" is deprecated and will " + "be removed in 5.0.0. The EXTRA_READ strategy is always used for the SERIALIZABLE " @@ -122,71 +127,60 @@ public ConsensusCommitConfig(DatabaseConfig databaseConfig) { } } - coordinatorNamespace = getString(databaseConfig.getProperties(), COORDINATOR_NAMESPACE, null); + coordinatorNamespace = getString(properties, COORDINATOR_NAMESPACE, null); parallelExecutorCount = - getInt( - databaseConfig.getProperties(), - PARALLEL_EXECUTOR_COUNT, - DEFAULT_PARALLEL_EXECUTOR_COUNT); - parallelPreparationEnabled = - getBoolean(databaseConfig.getProperties(), PARALLEL_PREPARATION_ENABLED, true); - parallelCommitEnabled = - getBoolean(databaseConfig.getProperties(), PARALLEL_COMMIT_ENABLED, true); + getInt(properties, PARALLEL_EXECUTOR_COUNT, DEFAULT_PARALLEL_EXECUTOR_COUNT); + parallelPreparationEnabled = getBoolean(properties, PARALLEL_PREPARATION_ENABLED, true); + parallelCommitEnabled = getBoolean(properties, PARALLEL_COMMIT_ENABLED, true); // Use the value of parallel commit for parallel validation and parallel rollback as default // value parallelValidationEnabled = - getBoolean( - databaseConfig.getProperties(), PARALLEL_VALIDATION_ENABLED, parallelCommitEnabled); + getBoolean(properties, PARALLEL_VALIDATION_ENABLED, parallelCommitEnabled); parallelRollbackEnabled = - getBoolean( - databaseConfig.getProperties(), PARALLEL_ROLLBACK_ENABLED, parallelCommitEnabled); + getBoolean(properties, PARALLEL_ROLLBACK_ENABLED, parallelCommitEnabled); - asyncCommitEnabled = getBoolean(databaseConfig.getProperties(), ASYNC_COMMIT_ENABLED, false); + asyncCommitEnabled = getBoolean(properties, ASYNC_COMMIT_ENABLED, false); // Use the value of async commit for async rollback as default value - asyncRollbackEnabled = - getBoolean(databaseConfig.getProperties(), ASYNC_ROLLBACK_ENABLED, asyncCommitEnabled); + asyncRollbackEnabled = getBoolean(properties, ASYNC_ROLLBACK_ENABLED, asyncCommitEnabled); + + coordinatorWriteOmissionOnReadOnlyEnabled = + getBoolean(properties, COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED, true); - isIncludeMetadataEnabled = - getBoolean(databaseConfig.getProperties(), INCLUDE_METADATA_ENABLED, false); + parallelImplicitPreReadEnabled = getBoolean(properties, PARALLEL_IMPLICIT_PRE_READ, true); - parallelImplicitPreReadEnabled = - getBoolean(databaseConfig.getProperties(), PARALLEL_IMPLICIT_PRE_READ, true); + isIncludeMetadataEnabled = getBoolean(properties, INCLUDE_METADATA_ENABLED, false); - coordinatorGroupCommitEnabled = - getBoolean(databaseConfig.getProperties(), COORDINATOR_GROUP_COMMIT_ENABLED, false); + coordinatorGroupCommitEnabled = getBoolean(properties, COORDINATOR_GROUP_COMMIT_ENABLED, false); coordinatorGroupCommitSlotCapacity = getInt( - databaseConfig.getProperties(), + properties, COORDINATOR_GROUP_COMMIT_SLOT_CAPACITY, DEFAULT_COORDINATOR_GROUP_COMMIT_SLOT_CAPACITY); coordinatorGroupCommitGroupSizeFixTimeoutMillis = getInt( - databaseConfig.getProperties(), + properties, COORDINATOR_GROUP_COMMIT_GROUP_SIZE_FIX_TIMEOUT_MILLIS, DEFAULT_COORDINATOR_GROUP_COMMIT_GROUP_SIZE_FIX_TIMEOUT_MILLIS); coordinatorGroupCommitDelayedSlotMoveTimeoutMillis = getInt( - databaseConfig.getProperties(), + properties, COORDINATOR_GROUP_COMMIT_DELAYED_SLOT_MOVE_TIMEOUT_MILLIS, DEFAULT_COORDINATOR_GROUP_COMMIT_DELAYED_SLOT_MOVE_TIMEOUT_MILLIS); coordinatorGroupCommitOldGroupAbortTimeoutMillis = getInt( - databaseConfig.getProperties(), + properties, COORDINATOR_GROUP_COMMIT_OLD_GROUP_ABORT_TIMEOUT_MILLIS, DEFAULT_COORDINATOR_GROUP_COMMIT_OLD_GROUP_ABORT_TIMEOUT_MILLIS); coordinatorGroupCommitTimeoutCheckIntervalMillis = getInt( - databaseConfig.getProperties(), + properties, COORDINATOR_GROUP_COMMIT_TIMEOUT_CHECK_INTERVAL_MILLIS, DEFAULT_COORDINATOR_GROUP_COMMIT_TIMEOUT_CHECK_INTERVAL_MILLIS); coordinatorGroupCommitMetricsMonitorLogEnabled = - getBoolean( - databaseConfig.getProperties(), - COORDINATOR_GROUP_COMMIT_METRICS_MONITOR_LOG_ENABLED, - false); + getBoolean(properties, COORDINATOR_GROUP_COMMIT_METRICS_MONITOR_LOG_ENABLED, false); } public Isolation getIsolation() { @@ -225,14 +219,18 @@ public boolean isAsyncRollbackEnabled() { return asyncRollbackEnabled; } - public boolean isIncludeMetadataEnabled() { - return isIncludeMetadataEnabled; + public boolean isCoordinatorWriteOmissionOnReadOnlyEnabled() { + return coordinatorWriteOmissionOnReadOnlyEnabled; } public boolean isParallelImplicitPreReadEnabled() { return parallelImplicitPreReadEnabled; } + public boolean isIncludeMetadataEnabled() { + return isIncludeMetadataEnabled; + } + public boolean isCoordinatorGroupCommitEnabled() { return coordinatorGroupCommitEnabled; } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java index d9469f4d5c..4b890d0512 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java @@ -130,9 +130,19 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) { private CommitHandler createCommitHandler() { if (isGroupCommitEnabled()) { return new CommitHandlerWithGroupCommit( - storage, coordinator, tableMetadataManager, parallelExecutor, groupCommitter); + storage, + coordinator, + tableMetadataManager, + parallelExecutor, + config.isCoordinatorWriteOmissionOnReadOnlyEnabled(), + groupCommitter); } else { - return new CommitHandler(storage, coordinator, tableMetadataManager, parallelExecutor); + return new CommitHandler( + storage, + coordinator, + tableMetadataManager, + parallelExecutor, + config.isCoordinatorWriteOmissionOnReadOnlyEnabled()); } } @@ -222,7 +232,7 @@ DistributedTransaction begin( String txId, Isolation isolation, boolean readOnly, boolean oneOperation) { checkArgument(!Strings.isNullOrEmpty(txId)); checkNotNull(isolation); - if (isGroupCommitEnabled()) { + if (!readOnly && isGroupCommitEnabled()) { assert groupCommitter != null; txId = groupCommitter.reserve(txId); } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java index 76b44fb626..66d213c3bc 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java @@ -469,6 +469,10 @@ public Snapshot getSnapshot() { return snapshot; } + public boolean isReadOnly() { + return readOnly; + } + private interface ConsensusCommitScanner extends TransactionCrudOperable.Scanner { boolean isClosed(); } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java index f9691249c2..195cb1bbc4 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java @@ -217,6 +217,14 @@ public boolean containsKeyInGetSet(Get get) { return getSet.containsKey(get); } + public boolean hasWritesOrDeletes() { + return !writeSet.isEmpty() || !deleteSet.isEmpty(); + } + + public boolean hasReads() { + return !getSet.isEmpty() || !scanSet.isEmpty() || !scannerSet.isEmpty(); + } + public Optional getResult(Key key) throws CrudException { Optional result = readSet.getOrDefault(key, Optional.empty()); return mergeResult(key, result); diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java index d606784b2a..eb06445e67 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java @@ -74,7 +74,13 @@ public TwoPhaseConsensusCommitManager( coordinator = new Coordinator(storage, config); parallelExecutor = new ParallelExecutor(config); recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager); - commit = new CommitHandler(storage, coordinator, tableMetadataManager, parallelExecutor); + commit = + new CommitHandler( + storage, + coordinator, + tableMetadataManager, + parallelExecutor, + config.isCoordinatorWriteOmissionOnReadOnlyEnabled()); isIncludeMetadataEnabled = config.isIncludeMetadataEnabled(); mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager); } @@ -91,7 +97,13 @@ public TwoPhaseConsensusCommitManager(DatabaseConfig databaseConfig) { coordinator = new Coordinator(storage, config); parallelExecutor = new ParallelExecutor(config); recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager); - commit = new CommitHandler(storage, coordinator, tableMetadataManager, parallelExecutor); + commit = + new CommitHandler( + storage, + coordinator, + tableMetadataManager, + parallelExecutor, + config.isCoordinatorWriteOmissionOnReadOnlyEnabled()); isIncludeMetadataEnabled = config.isIncludeMetadataEnabled(); mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager); } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java index 9ba11373da..51df00f89e 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java @@ -15,6 +15,7 @@ import com.google.common.util.concurrent.Uninterruptibles; import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.Get; import com.scalar.db.api.Put; import com.scalar.db.api.TransactionState; import com.scalar.db.exception.storage.ExecutionException; @@ -72,14 +73,19 @@ protected void extraInitialize() {} protected void extraCleanup() {} - protected CommitHandler createCommitHandler() { - return new CommitHandler(storage, coordinator, tableMetadataManager, parallelExecutor); + protected CommitHandler createCommitHandler(boolean coordinatorWriteOmissionOnReadOnlyEnabled) { + return new CommitHandler( + storage, + coordinator, + tableMetadataManager, + parallelExecutor, + coordinatorWriteOmissionOnReadOnlyEnabled); } @BeforeEach void setUp() throws Exception { parallelExecutor = new ParallelExecutor(config); - handler = spy(createCommitHandler()); + handler = spy(createCommitHandler(true)); extraInitialize(); } @@ -118,6 +124,17 @@ private Put preparePut3() { .withValue(ANY_NAME_3, ANY_INT_2); } + private Get prepareGet() { + Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1); + Key clusteringKey = new Key(ANY_NAME_2, ANY_TEXT_3); + return Get.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .build(); + } + private Snapshot prepareSnapshotWithDifferentPartitionPut() { Snapshot snapshot = new Snapshot( @@ -129,6 +146,9 @@ private Snapshot prepareSnapshotWithDifferentPartitionPut() { snapshot.putIntoWriteSet(new Snapshot.Key(put1), put1); snapshot.putIntoWriteSet(new Snapshot.Key(put2), put2); + Get get = prepareGet(); + snapshot.putIntoGetSet(get, Optional.empty()); + return snapshot; } @@ -143,6 +163,34 @@ private Snapshot prepareSnapshotWithSamePartitionPut() { snapshot.putIntoWriteSet(new Snapshot.Key(put1), put1); snapshot.putIntoWriteSet(new Snapshot.Key(put3), put3); + Get get = prepareGet(); + snapshot.putIntoGetSet(get, Optional.empty()); + + return snapshot; + } + + private Snapshot prepareSnapshotWithoutWrites() { + Snapshot snapshot = + new Snapshot( + anyId(), Isolation.SNAPSHOT, tableMetadataManager, new ParallelExecutor(config)); + + Get get = prepareGet(); + snapshot.putIntoGetSet(get, Optional.empty()); + + return snapshot; + } + + private Snapshot prepareSnapshotWithoutReads() { + Snapshot snapshot = + new Snapshot( + anyId(), Isolation.SNAPSHOT, tableMetadataManager, new ParallelExecutor(config)); + + // same partition + Put put1 = preparePut1(); + Put put3 = preparePut3(); + snapshot.putIntoWriteSet(new Snapshot.Key(put1), put1); + snapshot.putIntoWriteSet(new Snapshot.Key(put3), put3); + return snapshot; } @@ -160,19 +208,20 @@ private void setBeforePreparationSnapshotHookIfNeeded(boolean withSnapshotHook) public void commit_SnapshotWithDifferentPartitionPutsGiven_ShouldCommitRespectively( boolean withSnapshotHook) throws CommitException, UnknownTransactionStatusException, ExecutionException, - CoordinatorException { + CoordinatorException, ValidationConflictException { // Arrange - Snapshot snapshot = prepareSnapshotWithDifferentPartitionPut(); + Snapshot snapshot = spy(prepareSnapshotWithDifferentPartitionPut()); ReadWriteSets readWriteSets = snapshot.getReadWriteSets(); doNothing().when(storage).mutate(anyList()); doNothingWhenCoordinatorPutState(); setBeforePreparationSnapshotHookIfNeeded(withSnapshotHook); // Act - handler.commit(snapshot); + handler.commit(snapshot, false); // Assert verify(storage, times(4)).mutate(anyList()); + verify(snapshot).toSerializable(storage); verifyCoordinatorPutState(TransactionState.COMMITTED); verifySnapshotHook(withSnapshotHook, readWriteSets); verify(handler, never()).onFailureBeforeCommit(any()); @@ -182,19 +231,161 @@ public void commit_SnapshotWithDifferentPartitionPutsGiven_ShouldCommitRespectiv @ValueSource(booleans = {false, true}) public void commit_SnapshotWithSamePartitionPutsGiven_ShouldCommitAtOnce(boolean withSnapshotHook) throws CommitException, UnknownTransactionStatusException, ExecutionException, - CoordinatorException { + CoordinatorException, ValidationConflictException { // Arrange - Snapshot snapshot = prepareSnapshotWithSamePartitionPut(); + Snapshot snapshot = spy(prepareSnapshotWithSamePartitionPut()); ReadWriteSets readWriteSets = snapshot.getReadWriteSets(); doNothing().when(storage).mutate(anyList()); doNothingWhenCoordinatorPutState(); setBeforePreparationSnapshotHookIfNeeded(withSnapshotHook); // Act - handler.commit(snapshot); + handler.commit(snapshot, false); // Assert verify(storage, times(2)).mutate(anyList()); + verify(snapshot).toSerializable(storage); + verifyCoordinatorPutState(TransactionState.COMMITTED); + verifySnapshotHook(withSnapshotHook, readWriteSets); + verify(handler, never()).onFailureBeforeCommit(any()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void commit_InReadOnlyMode_ShouldNotPrepareRecordsAndCommitStateAndCommitRecords( + boolean withSnapshotHook) + throws CommitException, UnknownTransactionStatusException, ExecutionException, + CoordinatorException, ValidationConflictException { + // Arrange + Snapshot snapshot = spy(prepareSnapshotWithoutWrites()); + ReadWriteSets readWriteSets = snapshot.getReadWriteSets(); + setBeforePreparationSnapshotHookIfNeeded(withSnapshotHook); + + // Act + handler.commit(snapshot, true); + + // Assert + verify(storage, never()).mutate(anyList()); + verify(snapshot).toSerializable(storage); + verify(coordinator, never()).putState(any()); + verifySnapshotHook(withSnapshotHook, readWriteSets); + verify(handler, never()).onFailureBeforeCommit(any()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void + commit_NoWritesAndDeletesInSnapshot_ShouldNotPrepareRecordsAndCommitStateAndCommitRecords( + boolean withSnapshotHook) + throws CommitException, UnknownTransactionStatusException, ExecutionException, + CoordinatorException, ValidationConflictException { + // Arrange + Snapshot snapshot = spy(prepareSnapshotWithoutWrites()); + ReadWriteSets readWriteSets = snapshot.getReadWriteSets(); + setBeforePreparationSnapshotHookIfNeeded(withSnapshotHook); + + // Act + handler.commit(snapshot, false); + + // Assert + verify(storage, never()).mutate(anyList()); + verify(snapshot).toSerializable(storage); + verify(coordinator, never()).putState(any()); + verifySnapshotHook(withSnapshotHook, readWriteSets); + verify(handler, never()).onFailureBeforeCommit(any()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void + commit_NoWritesAndDeletesInSnapshot_CoordinatorWriteOmissionOnReadOnlyDisabled_ShouldNotPrepareRecordsAndCommitRecordsButShouldCommitState( + boolean withSnapshotHook) + throws CommitException, UnknownTransactionStatusException, ExecutionException, + CoordinatorException, ValidationConflictException { + // Arrange + handler = spy(createCommitHandler(false)); + Snapshot snapshot = spy(prepareSnapshotWithoutWrites()); + ReadWriteSets readWriteSets = snapshot.getReadWriteSets(); + setBeforePreparationSnapshotHookIfNeeded(withSnapshotHook); + + // Act + handler.commit(snapshot, false); + + // Assert + verify(storage, never()).mutate(anyList()); + verify(snapshot).toSerializable(storage); + verifyCoordinatorPutState(TransactionState.COMMITTED); + verifySnapshotHook(withSnapshotHook, readWriteSets); + verify(handler, never()).onFailureBeforeCommit(any()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void + commit_NoWritesAndDeletesInSnapshot_ValidationFailed_ShouldNotPrepareRecordsAndAbortStateAndRollbackRecords( + boolean withSnapshotHook) + throws ExecutionException, CoordinatorException, ValidationConflictException { + // Arrange + Snapshot snapshot = spy(prepareSnapshotWithoutWrites()); + ReadWriteSets readWriteSets = snapshot.getReadWriteSets(); + setBeforePreparationSnapshotHookIfNeeded(withSnapshotHook); + doThrow(ValidationConflictException.class).when(snapshot).toSerializable(storage); + + // Act Assert + assertThatThrownBy(() -> handler.commit(snapshot, false)) + .isInstanceOf(CommitConflictException.class); + + // Assert + verify(storage, never()).mutate(anyList()); + verify(snapshot).toSerializable(storage); + verify(coordinator, never()).putState(any()); + verifySnapshotHook(withSnapshotHook, readWriteSets); + verify(handler).onFailureBeforeCommit(any()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void + commit_NoWritesAndDeletesInSnapshot_ValidationFailed_CoordinatorWriteOmissionOnReadOnlyDisabled_ShouldNotPrepareRecordsAndRollbackRecordsButShouldAbortState( + boolean withSnapshotHook) + throws ExecutionException, CoordinatorException, ValidationConflictException { + // Arrange + handler = spy(createCommitHandler(false)); + Snapshot snapshot = spy(prepareSnapshotWithoutWrites()); + ReadWriteSets readWriteSets = snapshot.getReadWriteSets(); + setBeforePreparationSnapshotHookIfNeeded(withSnapshotHook); + doThrow(ValidationConflictException.class).when(snapshot).toSerializable(storage); + + // Act Assert + assertThatThrownBy(() -> handler.commit(snapshot, false)) + .isInstanceOf(CommitConflictException.class); + + // Assert + verify(storage, never()).mutate(anyList()); + verify(snapshot).toSerializable(storage); + verify(coordinator).putState(any()); + verifySnapshotHook(withSnapshotHook, readWriteSets); + verify(handler).onFailureBeforeCommit(any()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void commit_NoReadsInSnapshot_ShouldNotValidateRecords(boolean withSnapshotHook) + throws CommitException, UnknownTransactionStatusException, ExecutionException, + CoordinatorException, ValidationConflictException { + // Arrange + Snapshot snapshot = spy(prepareSnapshotWithoutReads()); + ReadWriteSets readWriteSets = snapshot.getReadWriteSets(); + doNothing().when(storage).mutate(anyList()); + doNothingWhenCoordinatorPutState(); + setBeforePreparationSnapshotHookIfNeeded(withSnapshotHook); + + // Act + handler.commit(snapshot, false); + + // Assert + verify(storage, times(2)).mutate(anyList()); + verify(snapshot, never()).toSerializable(storage); verifyCoordinatorPutState(TransactionState.COMMITTED); verifySnapshotHook(withSnapshotHook, readWriteSets); verify(handler, never()).onFailureBeforeCommit(any()); @@ -210,7 +401,8 @@ public void commit_NoMutationExceptionThrownInPrepareRecords_ShouldThrowCCExcept doNothing().when(handler).rollbackRecords(any(Snapshot.class)); // Act - assertThatThrownBy(() -> handler.commit(snapshot)).isInstanceOf(CommitConflictException.class); + assertThatThrownBy(() -> handler.commit(snapshot, false)) + .isInstanceOf(CommitConflictException.class); // Assert @@ -233,7 +425,8 @@ public void commit_RetriableExecutionExceptionThrownInPrepareRecords_ShouldThrow doNothing().when(handler).rollbackRecords(any(Snapshot.class)); // Act - assertThatThrownBy(() -> handler.commit(snapshot)).isInstanceOf(CommitConflictException.class); + assertThatThrownBy(() -> handler.commit(snapshot, false)) + .isInstanceOf(CommitConflictException.class); // Assert @@ -256,7 +449,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords doNothing().when(handler).rollbackRecords(any(Snapshot.class)); // Act - assertThatThrownBy(() -> handler.commit(snapshot)).isInstanceOf(CommitException.class); + assertThatThrownBy(() -> handler.commit(snapshot, false)).isInstanceOf(CommitException.class); // Assert @@ -285,7 +478,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords doNothing().when(handler).rollbackRecords(any(Snapshot.class)); // Act - assertThatThrownBy(() -> handler.commit(snapshot)).isInstanceOf(CommitException.class); + assertThatThrownBy(() -> handler.commit(snapshot, false)).isInstanceOf(CommitException.class); // Assert @@ -312,7 +505,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords doReturn(Optional.empty()).when(coordinator).getState(anyId()); // Act - assertThatThrownBy(() -> handler.commit(snapshot)) + assertThatThrownBy(() -> handler.commit(snapshot, false)) .isInstanceOf(UnknownTransactionStatusException.class); // Assert @@ -340,7 +533,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords doThrow(CoordinatorException.class).when(coordinator).getState(anyId()); // Act - assertThatThrownBy(() -> handler.commit(snapshot)) + assertThatThrownBy(() -> handler.commit(snapshot, false)) .isInstanceOf(UnknownTransactionStatusException.class); // Assert @@ -367,7 +560,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords .putState(new Coordinator.State(anyId(), TransactionState.ABORTED)); // Act - assertThatThrownBy(() -> handler.commit(snapshot)) + assertThatThrownBy(() -> handler.commit(snapshot, false)) .isInstanceOf(UnknownTransactionStatusException.class); // Assert @@ -392,7 +585,7 @@ public void commit_ValidationConflictExceptionThrownInValidation_ShouldAbortAndR doNothing().when(handler).rollbackRecords(any(Snapshot.class)); // Act - assertThatThrownBy(() -> handler.commit(snapshot)).isInstanceOf(CommitException.class); + assertThatThrownBy(() -> handler.commit(snapshot, false)).isInstanceOf(CommitException.class); // Assert @@ -416,7 +609,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() doNothing().when(handler).rollbackRecords(any(Snapshot.class)); // Act - assertThatThrownBy(() -> handler.commit(snapshot)).isInstanceOf(CommitException.class); + assertThatThrownBy(() -> handler.commit(snapshot, false)).isInstanceOf(CommitException.class); // Assert @@ -446,7 +639,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() doNothing().when(handler).rollbackRecords(any(Snapshot.class)); // Act - assertThatThrownBy(() -> handler.commit(snapshot)).isInstanceOf(CommitException.class); + assertThatThrownBy(() -> handler.commit(snapshot, false)).isInstanceOf(CommitException.class); // Assert @@ -474,7 +667,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() doReturn(Optional.empty()).when(coordinator).getState(anyId()); // Act - assertThatThrownBy(() -> handler.commit(snapshot)) + assertThatThrownBy(() -> handler.commit(snapshot, false)) .isInstanceOf(UnknownTransactionStatusException.class); // Assert @@ -503,7 +696,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() doThrow(CoordinatorException.class).when(coordinator).getState(anyId()); // Act - assertThatThrownBy(() -> handler.commit(snapshot)) + assertThatThrownBy(() -> handler.commit(snapshot, false)) .isInstanceOf(UnknownTransactionStatusException.class); // Assert @@ -531,7 +724,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() .putState(new Coordinator.State(anyId(), TransactionState.ABORTED)); // Act - assertThatThrownBy(() -> handler.commit(snapshot)) + assertThatThrownBy(() -> handler.commit(snapshot, false)) .isInstanceOf(UnknownTransactionStatusException.class); // Assert @@ -560,7 +753,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() .getState(anyId()); // Act - handler.commit(snapshot); + handler.commit(snapshot, false); // Assert verify(storage, times(4)).mutate(anyList()); @@ -584,7 +777,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() .getState(anyId()); // Act - assertThatThrownBy(() -> handler.commit(snapshot)).isInstanceOf(CommitException.class); + assertThatThrownBy(() -> handler.commit(snapshot, false)).isInstanceOf(CommitException.class); // Assert verify(storage, times(2)).mutate(anyList()); @@ -606,7 +799,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() doReturn(Optional.empty()).when(coordinator).getState(anyId()); // Act - assertThatThrownBy(() -> handler.commit(snapshot)) + assertThatThrownBy(() -> handler.commit(snapshot, false)) .isInstanceOf(UnknownTransactionStatusException.class); // Assert @@ -629,7 +822,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() doThrow(CoordinatorException.class).when(coordinator).getState(anyId()); // Act - assertThatThrownBy(() -> handler.commit(snapshot)) + assertThatThrownBy(() -> handler.commit(snapshot, false)) .isInstanceOf(UnknownTransactionStatusException.class); // Assert @@ -649,7 +842,7 @@ public void commit_ExceptionThrownInCoordinatorCommit_ShouldThrowUnknown() doThrowExceptionWhenCoordinatorPutState(TransactionState.COMMITTED, CoordinatorException.class); // Act - assertThatThrownBy(() -> handler.commit(snapshot)) + assertThatThrownBy(() -> handler.commit(snapshot, false)) .isInstanceOf(UnknownTransactionStatusException.class); // Assert @@ -683,7 +876,7 @@ public Future handle( // Act Instant start = Instant.now(); - assertThatThrownBy(() -> handler.commit(snapshot)) + assertThatThrownBy(() -> handler.commit(snapshot, false)) .isInstanceOf(UnknownTransactionStatusException.class); Instant end = Instant.now(); @@ -709,7 +902,7 @@ public void commit_FailingSnapshotHookGiven_ShouldThrowCommitException() handler.setBeforePreparationSnapshotHook(beforePreparationSnapshotHook); // Act - assertThatThrownBy(() -> handler.commit(snapshot)).isInstanceOf(CommitException.class); + assertThatThrownBy(() -> handler.commit(snapshot, false)).isInstanceOf(CommitException.class); // Assert verify(storage, never()).mutate(anyList()); @@ -733,7 +926,7 @@ public void commit_FailingSnapshotHookFutureGiven_ShouldThrowCommitException() setBeforePreparationSnapshotHookIfNeeded(true); // Act - assertThatThrownBy(() -> handler.commit(snapshot)).isInstanceOf(CommitException.class); + assertThatThrownBy(() -> handler.commit(snapshot, false)).isInstanceOf(CommitException.class); // Assert verify(storage, times(2)).mutate(anyList()); @@ -747,7 +940,6 @@ public void commit_FailingSnapshotHookFutureGiven_ShouldThrowCommitException() protected void doThrowExceptionWhenCoordinatorPutState( TransactionState targetState, Class exceptionClass) throws CoordinatorException { - doThrow(exceptionClass).when(coordinator).putState(new Coordinator.State(anyId(), targetState)); } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java index fdafe6dc7d..fe79a9db69 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java @@ -6,15 +6,24 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import com.scalar.db.api.TransactionState; +import com.scalar.db.exception.storage.ExecutionException; +import com.scalar.db.exception.transaction.CommitException; +import com.scalar.db.exception.transaction.UnknownTransactionStatusException; +import com.scalar.db.exception.transaction.ValidationConflictException; import com.scalar.db.transaction.consensuscommit.CoordinatorGroupCommitter.CoordinatorGroupCommitKeyManipulator; import com.scalar.db.util.groupcommit.GroupCommitConfig; import java.util.List; import java.util.UUID; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.Captor; @@ -45,15 +54,21 @@ protected void extraCleanup() { private void createGroupCommitterIfNotExists() { if (groupCommitter == null) { - groupCommitter = new CoordinatorGroupCommitter(new GroupCommitConfig(4, 100, 500, 60000, 10)); + groupCommitter = + spy(new CoordinatorGroupCommitter(new GroupCommitConfig(4, 100, 500, 60000, 10))); } } @Override - protected CommitHandler createCommitHandler() { + protected CommitHandler createCommitHandler(boolean coordinatorWriteOmissionOnReadOnlyEnabled) { createGroupCommitterIfNotExists(); return new CommitHandlerWithGroupCommit( - storage, coordinator, tableMetadataManager, parallelExecutor, groupCommitter); + storage, + coordinator, + tableMetadataManager, + parallelExecutor, + coordinatorWriteOmissionOnReadOnlyEnabled, + groupCommitter); } private String anyGroupCommitParentId() { @@ -92,4 +107,91 @@ protected void verifyCoordinatorPutState(TransactionState expectedTransactionSta assertThat(fullIds.size()).isEqualTo(1); assertThat(fullIds.get(0)).isEqualTo(anyId()); } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + @Override + public void commit_SnapshotWithDifferentPartitionPutsGiven_ShouldCommitRespectively( + boolean withSnapshotHook) + throws CommitException, UnknownTransactionStatusException, ExecutionException, + CoordinatorException, ValidationConflictException { + super.commit_SnapshotWithDifferentPartitionPutsGiven_ShouldCommitRespectively(withSnapshotHook); + + // Assert + verify(groupCommitter, never()).remove(anyId()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + @Override + public void commit_SnapshotWithSamePartitionPutsGiven_ShouldCommitAtOnce(boolean withSnapshotHook) + throws CommitException, UnknownTransactionStatusException, ExecutionException, + CoordinatorException, ValidationConflictException { + super.commit_SnapshotWithSamePartitionPutsGiven_ShouldCommitAtOnce(withSnapshotHook); + + // Assert + verify(groupCommitter, never()).remove(anyId()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + @Override + public void commit_InReadOnlyMode_ShouldNotPrepareRecordsAndCommitStateAndCommitRecords( + boolean withSnapshotHook) + throws CommitException, UnknownTransactionStatusException, ExecutionException, + CoordinatorException, ValidationConflictException { + // Arrange + groupCommitter.remove(anyId()); + clearInvocations(groupCommitter); + + super.commit_InReadOnlyMode_ShouldNotPrepareRecordsAndCommitStateAndCommitRecords( + withSnapshotHook); + + // Assert + verify(groupCommitter, never()).remove(anyId()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + @Override + public void + commit_NoWritesAndDeletesInSnapshot_ShouldNotPrepareRecordsAndCommitStateAndCommitRecords( + boolean withSnapshotHook) + throws CommitException, UnknownTransactionStatusException, ExecutionException, + CoordinatorException, ValidationConflictException { + + super.commit_NoWritesAndDeletesInSnapshot_ShouldNotPrepareRecordsAndCommitStateAndCommitRecords( + withSnapshotHook); + + // Assert + verify(groupCommitter).remove(anyId()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + @Override + public void + commit_NoWritesAndDeletesInSnapshot_CoordinatorWriteOmissionOnReadOnlyDisabled_ShouldNotPrepareRecordsAndCommitRecordsButShouldCommitState( + boolean withSnapshotHook) + throws CommitException, UnknownTransactionStatusException, ExecutionException, + CoordinatorException, ValidationConflictException { + super + .commit_NoWritesAndDeletesInSnapshot_CoordinatorWriteOmissionOnReadOnlyDisabled_ShouldNotPrepareRecordsAndCommitRecordsButShouldCommitState( + withSnapshotHook); + + // Assert + verify(groupCommitter, never()).remove(anyId()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + @Override + public void commit_NoReadsInSnapshot_ShouldNotValidateRecords(boolean withSnapshotHook) + throws CommitException, UnknownTransactionStatusException, ExecutionException, + CoordinatorException, ValidationConflictException { + super.commit_NoReadsInSnapshot_ShouldNotValidateRecords(withSnapshotHook); + + // Assert + verify(groupCommitter, never()).remove(anyId()); + } } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfigTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfigTest.java index ff9e41652e..d93fd0ad8a 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfigTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfigTest.java @@ -27,8 +27,9 @@ public void constructor_NoPropertiesGiven_ShouldLoadAsDefaultValues() { assertThat(config.isParallelRollbackEnabled()).isTrue(); assertThat(config.isAsyncCommitEnabled()).isFalse(); assertThat(config.isAsyncRollbackEnabled()).isFalse(); - assertThat(config.isIncludeMetadataEnabled()).isFalse(); assertThat(config.isParallelImplicitPreReadEnabled()).isTrue(); + assertThat(config.isCoordinatorWriteOmissionOnReadOnlyEnabled()).isTrue(); + assertThat(config.isIncludeMetadataEnabled()).isFalse(); } @Test @@ -154,16 +155,18 @@ public void constructor_AsyncExecutionRelatedPropertiesGiven_ShouldLoadProperly( } @Test - public void constructor_PropertiesWithIncludeMetadataEnabledGiven_ShouldLoadProperly() { + public void + constructor_PropertiesWithCoordinatorWriteOmissionOnReadOnlyEnabledGiven_ShouldLoadProperly() { // Arrange Properties props = new Properties(); - props.setProperty(ConsensusCommitConfig.INCLUDE_METADATA_ENABLED, "true"); + props.setProperty( + ConsensusCommitConfig.COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED, "false"); // Act ConsensusCommitConfig config = new ConsensusCommitConfig(new DatabaseConfig(props)); // Assert - assertThat(config.isIncludeMetadataEnabled()).isTrue(); + assertThat(config.isCoordinatorWriteOmissionOnReadOnlyEnabled()).isFalse(); } @Test @@ -178,4 +181,17 @@ public void constructor_PropertiesWithParallelImplicitPreReadEnabledGiven_Should // Assert assertThat(config.isParallelImplicitPreReadEnabled()).isFalse(); } + + @Test + public void constructor_PropertiesWithIncludeMetadataEnabledGiven_ShouldLoadProperly() { + // Arrange + Properties props = new Properties(); + props.setProperty(ConsensusCommitConfig.INCLUDE_METADATA_ENABLED, "true"); + + // Act + ConsensusCommitConfig config = new ConsensusCommitConfig(new DatabaseConfig(props)); + + // Assert + assertThat(config.isIncludeMetadataEnabled()).isTrue(); + } } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java index a056ac7de1..3cb717a846 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java @@ -3,11 +3,13 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -29,6 +31,7 @@ import com.scalar.db.api.Update; import com.scalar.db.api.Upsert; import com.scalar.db.common.ActiveTransactionManagedDistributedTransactionManager; +import com.scalar.db.common.DecoratedDistributedTransaction; import com.scalar.db.common.ReadOnlyDistributedTransaction; import com.scalar.db.config.DatabaseConfig; import com.scalar.db.exception.transaction.CommitConflictException; @@ -144,6 +147,40 @@ public void begin_TxIdGiven_ReturnWithSpecifiedTxIdAndSnapshotIsolation() { assertThat(snapshot.getIsolation()).isEqualTo(Isolation.SNAPSHOT); } + @Test + public void + beginReadOnly_TxIdGivenWithGroupCommitter_ReturnWithSpecifiedTxIdAndSnapshotIsolation() { + // Arrange + CoordinatorGroupCommitKeyManipulator keyManipulator = + new CoordinatorGroupCommitKeyManipulator(); + CoordinatorGroupCommitter groupCommitter = mock(CoordinatorGroupCommitter.class); + ConsensusCommitManager managerWithGroupCommit = + new ConsensusCommitManager( + storage, + admin, + consensusCommitConfig, + databaseConfig, + coordinator, + parallelExecutor, + recovery, + commit, + groupCommitter); + + // Act + DistributedTransaction transaction = managerWithGroupCommit.beginReadOnly(ANY_TX_ID); + + // Assert + assertThat(transaction.getId()).isEqualTo(ANY_TX_ID); + Snapshot snapshot = + ((ConsensusCommit) ((DecoratedDistributedTransaction) transaction).getOriginalTransaction()) + .getCrudHandler() + .getSnapshot(); + assertThat(snapshot.getId()).isEqualTo(ANY_TX_ID); + assertThat(keyManipulator.isFullKey(transaction.getId())).isFalse(); + verify(groupCommitter, never()).reserve(ANY_TX_ID); + assertThat(snapshot.getIsolation()).isEqualTo(Isolation.SNAPSHOT); + } + @Test public void begin_CalledTwice_ReturnRespectiveConsensusCommitWithSharedCommitAndRecovery() { // Arrange @@ -368,7 +405,7 @@ public void resume_CalledWithBeginAndCommit_CommitExceptionThrown_ReturnSameTran DistributedTransactionManager manager = new ActiveTransactionManagedDistributedTransactionManager(this.manager, -1); - doThrow(CommitException.class).when(commit).commit(any()); + doThrow(CommitException.class).when(commit).commit(any(), anyBoolean()); DistributedTransaction transaction1 = manager.begin(ANY_TX_ID); try { @@ -447,7 +484,7 @@ public void join_CalledWithBeginAndCommit_CommitExceptionThrown_ReturnSameTransa DistributedTransactionManager manager = new ActiveTransactionManagedDistributedTransactionManager(this.manager, -1); - doThrow(CommitException.class).when(commit).commit(any()); + doThrow(CommitException.class).when(commit).commit(any(), anyBoolean()); DistributedTransaction transaction1 = manager.begin(ANY_TX_ID); try { diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitTest.java index f81a85b70b..99a6f6ae37 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitTest.java @@ -4,6 +4,7 @@ import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -707,15 +708,32 @@ public void mutate_PutAndDeleteGiven_ShouldCallCrudHandlerPutAndDelete() public void commit_ProcessedCrudGiven_ShouldCommitWithSnapshot() throws CommitException, UnknownTransactionStatusException, CrudException { // Arrange - doNothing().when(commit).commit(any(Snapshot.class)); + doNothing().when(commit).commit(any(Snapshot.class), anyBoolean()); when(crud.getSnapshot()).thenReturn(snapshot); + when(crud.isReadOnly()).thenReturn(false); // Act consensus.commit(); // Assert verify(crud).readIfImplicitPreReadEnabled(); - verify(commit).commit(snapshot); + verify(commit).commit(snapshot, false); + } + + @Test + public void commit_ProcessedCrudGiven_InReadOnlyMode_ShouldCommitWithSnapshot() + throws CommitException, UnknownTransactionStatusException, CrudException { + // Arrange + doNothing().when(commit).commit(any(Snapshot.class), anyBoolean()); + when(crud.getSnapshot()).thenReturn(snapshot); + when(crud.isReadOnly()).thenReturn(true); + + // Act + consensus.commit(); + + // Assert + verify(crud).readIfImplicitPreReadEnabled(); + verify(commit).commit(snapshot, true); } @Test @@ -807,4 +825,27 @@ public void rollback_WithGroupCommitter_ShouldRemoveTxFromGroupCommitter() verify(commit, never()).rollbackRecords(any(Snapshot.class)); verify(commit, never()).abortState(anyString()); } + + @Test + public void rollback_WithGroupCommitter_InReadOnlyMode_ShouldNotRemoveTxFromGroupCommitter() + throws CrudException, UnknownTransactionStatusException { + // Arrange + String txId = "tx-id"; + Snapshot snapshot = mock(Snapshot.class); + doReturn(txId).when(snapshot).getId(); + doReturn(snapshot).when(crud).getSnapshot(); + doReturn(true).when(crud).isReadOnly(); + CoordinatorGroupCommitter groupCommitter = mock(CoordinatorGroupCommitter.class); + ConsensusCommit consensusWithGroupCommit = + new ConsensusCommit(crud, commit, recovery, mutationOperationChecker, groupCommitter); + + // Act + consensusWithGroupCommit.rollback(); + + // Assert + verify(crud).closeScanners(); + verify(groupCommitter, never()).remove(anyString()); + verify(commit, never()).rollbackRecords(any(Snapshot.class)); + verify(commit, never()).abortState(anyString()); + } } diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java index 1a84d83d41..67c5e4f204 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java @@ -158,9 +158,9 @@ private CommitHandler createCommitHandler( @Nullable CoordinatorGroupCommitter groupCommitter) { if (groupCommitter != null) { return new CommitHandlerWithGroupCommit( - storage, coordinator, tableMetadataManager, parallelExecutor, groupCommitter); + storage, coordinator, tableMetadataManager, parallelExecutor, true, groupCommitter); } else { - return new CommitHandler(storage, coordinator, tableMetadataManager, parallelExecutor); + return new CommitHandler(storage, coordinator, tableMetadataManager, parallelExecutor, true); } } diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java index bccb946f4c..c046a044c3 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java @@ -192,9 +192,9 @@ private CommitHandler createCommitHandler( @Nullable CoordinatorGroupCommitter groupCommitter) { if (groupCommitter != null) { return new CommitHandlerWithGroupCommit( - storage, coordinator, tableMetadataManager, parallelExecutor, groupCommitter); + storage, coordinator, tableMetadataManager, parallelExecutor, true, groupCommitter); } else { - return new CommitHandler(storage, coordinator, tableMetadataManager, parallelExecutor); + return new CommitHandler(storage, coordinator, tableMetadataManager, parallelExecutor, true); } }