From 19aac3d79d57ceb2fd2bbbf34c385db507394a5e Mon Sep 17 00:00:00 2001 From: brfrn169 Date: Thu, 22 May 2025 15:23:43 +0900 Subject: [PATCH 1/6] Skip commit-state for read-only transactions --- .../consensuscommit/CommitHandler.java | 64 ++++--- .../CommitHandlerWithGroupCommit.java | 11 ++ .../consensuscommit/ConsensusCommit.java | 4 +- .../ConsensusCommitManager.java | 2 +- .../consensuscommit/CrudHandler.java | 4 + .../transaction/consensuscommit/Snapshot.java | 8 + .../consensuscommit/CommitHandlerTest.java | 170 +++++++++++++++--- .../CommitHandlerWithGroupCommitTest.java | 83 ++++++++- .../ConsensusCommitManagerTest.java | 41 ++++- .../consensuscommit/ConsensusCommitTest.java | 45 ++++- 10 files changed, 369 insertions(+), 63 deletions(-) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java index 814d4779d9..3fb457fe1a 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java @@ -106,42 +106,52 @@ private void waitBeforePreparationSnapshotHookFuture( } } - public void commit(Snapshot snapshot) throws CommitException, UnknownTransactionStatusException { + public void commit(Snapshot snapshot, boolean readOnly) + throws CommitException, UnknownTransactionStatusException { + boolean hasNoWritesAndDeletesInSnapshot = readOnly || snapshot.hasNoWritesAndDeletes(); + Optional> snapshotHookFuture = invokeBeforePreparationSnapshotHook(snapshot); - try { - prepare(snapshot); - } catch (PreparationException e) { - safelyCallOnFailureBeforeCommit(snapshot); - abortState(snapshot.getId()); - rollbackRecords(snapshot); - if (e instanceof PreparationConflictException) { - throw new CommitConflictException(e.getMessage(), e, e.getTransactionId().orElse(null)); + + if (!hasNoWritesAndDeletesInSnapshot) { + try { + prepare(snapshot); + } catch (PreparationException e) { + safelyCallOnFailureBeforeCommit(snapshot); + abortState(snapshot.getId()); + rollbackRecords(snapshot); + if (e instanceof PreparationConflictException) { + throw new CommitConflictException(e.getMessage(), e, e.getTransactionId().orElse(null)); + } + throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null)); + } catch (Exception e) { + safelyCallOnFailureBeforeCommit(snapshot); + throw e; } - throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null)); - } catch (Exception e) { - safelyCallOnFailureBeforeCommit(snapshot); - throw e; } - try { - validate(snapshot); - } catch (ValidationException e) { - safelyCallOnFailureBeforeCommit(snapshot); - abortState(snapshot.getId()); - rollbackRecords(snapshot); - if (e instanceof ValidationConflictException) { - throw new CommitConflictException(e.getMessage(), e, e.getTransactionId().orElse(null)); + if (!snapshot.hasNoReads()) { + try { + validate(snapshot); + } catch (ValidationException e) { + safelyCallOnFailureBeforeCommit(snapshot); + abortState(snapshot.getId()); + rollbackRecords(snapshot); + if (e instanceof ValidationConflictException) { + throw new CommitConflictException(e.getMessage(), e, e.getTransactionId().orElse(null)); + } + throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null)); + } catch (Exception e) { + safelyCallOnFailureBeforeCommit(snapshot); + throw e; } - throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null)); - } catch (Exception e) { - safelyCallOnFailureBeforeCommit(snapshot); - throw e; } waitBeforePreparationSnapshotHookFuture(snapshot, snapshotHookFuture.orElse(null)); - commitState(snapshot); - commitRecords(snapshot); + if (!hasNoWritesAndDeletesInSnapshot) { + commitState(snapshot); + commitRecords(snapshot); + } } protected void handleCommitConflict(Snapshot snapshot, Exception cause) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java index 4f47f73b5f..6decbfdbe0 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java @@ -5,6 +5,7 @@ import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.TransactionState; import com.scalar.db.exception.transaction.CommitConflictException; +import com.scalar.db.exception.transaction.CommitException; import com.scalar.db.exception.transaction.UnknownTransactionStatusException; import com.scalar.db.transaction.consensuscommit.Coordinator.State; import com.scalar.db.util.groupcommit.Emittable; @@ -37,6 +38,16 @@ public CommitHandlerWithGroupCommit( this.groupCommitter = groupCommitter; } + @Override + public void commit(Snapshot snapshot, boolean readOnly) + throws CommitException, UnknownTransactionStatusException { + if (!readOnly && snapshot.hasNoWritesAndDeletes()) { + cancelGroupCommitIfNeeded(snapshot.getId()); + } + + super.commit(snapshot, readOnly); + } + @Override protected void onFailureBeforeCommit(Snapshot snapshot) { cancelGroupCommitIfNeeded(snapshot.getId()); diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java index c2f6f12797..3a41c11fd9 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java @@ -269,7 +269,7 @@ public void commit() throws CommitException, UnknownTransactionStatusException { CoreError.CONSENSUS_COMMIT_EXECUTING_IMPLICIT_PRE_READ_FAILED.buildMessage(), e, getId()); } - commit.commit(crud.getSnapshot()); + commit.commit(crud.getSnapshot(), crud.isReadOnly()); } @Override @@ -280,7 +280,7 @@ public void rollback() { logger.warn("Failed to close the scanner", e); } - if (groupCommitter != null) { + if (groupCommitter != null && !crud.isReadOnly()) { groupCommitter.remove(crud.getSnapshot().getId()); } } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java index 1657611990..4eea4589eb 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java @@ -221,7 +221,7 @@ DistributedTransaction beginReadOnly(Isolation isolation) { DistributedTransaction begin(String txId, Isolation isolation, boolean readOnly) { checkArgument(!Strings.isNullOrEmpty(txId)); checkNotNull(isolation); - if (isGroupCommitEnabled()) { + if (!readOnly && isGroupCommitEnabled()) { assert groupCommitter != null; txId = groupCommitter.reserve(txId); } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java index 73d420215f..150e19064d 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java @@ -420,6 +420,10 @@ public Snapshot getSnapshot() { return snapshot; } + public boolean isReadOnly() { + return readOnly; + } + private interface ConsensusCommitScanner extends TransactionCrudOperable.Scanner { boolean isClosed(); } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java index ff0ac712f3..8f9ce2abbc 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java @@ -217,6 +217,14 @@ public boolean containsKeyInGetSet(Get get) { return getSet.containsKey(get); } + public boolean hasNoWritesAndDeletes() { + return writeSet.isEmpty() && deleteSet.isEmpty(); + } + + public boolean hasNoReads() { + return getSet.isEmpty() && scanSet.isEmpty() && scannerSet.isEmpty(); + } + public Optional getResult(Key key) throws CrudException { Optional result = readSet.getOrDefault(key, Optional.empty()); return mergeResult(key, result); diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java index 9ba11373da..7d4c8fe636 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java @@ -15,6 +15,7 @@ import com.google.common.util.concurrent.Uninterruptibles; import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.Get; import com.scalar.db.api.Put; import com.scalar.db.api.TransactionState; import com.scalar.db.exception.storage.ExecutionException; @@ -118,6 +119,17 @@ private Put preparePut3() { .withValue(ANY_NAME_3, ANY_INT_2); } + private Get prepareGet() { + Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1); + Key clusteringKey = new Key(ANY_NAME_2, ANY_TEXT_3); + return Get.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .build(); + } + private Snapshot prepareSnapshotWithDifferentPartitionPut() { Snapshot snapshot = new Snapshot( @@ -129,6 +141,9 @@ private Snapshot prepareSnapshotWithDifferentPartitionPut() { snapshot.putIntoWriteSet(new Snapshot.Key(put1), put1); snapshot.putIntoWriteSet(new Snapshot.Key(put2), put2); + Get get = prepareGet(); + snapshot.putIntoGetSet(get, Optional.empty()); + return snapshot; } @@ -143,6 +158,34 @@ private Snapshot prepareSnapshotWithSamePartitionPut() { snapshot.putIntoWriteSet(new Snapshot.Key(put1), put1); snapshot.putIntoWriteSet(new Snapshot.Key(put3), put3); + Get get = prepareGet(); + snapshot.putIntoGetSet(get, Optional.empty()); + + return snapshot; + } + + private Snapshot prepareSnapshotWithoutWrites() { + Snapshot snapshot = + new Snapshot( + anyId(), Isolation.SNAPSHOT, tableMetadataManager, new ParallelExecutor(config)); + + Get get = prepareGet(); + snapshot.putIntoGetSet(get, Optional.empty()); + + return snapshot; + } + + private Snapshot prepareSnapshotWithoutReads() { + Snapshot snapshot = + new Snapshot( + anyId(), Isolation.SNAPSHOT, tableMetadataManager, new ParallelExecutor(config)); + + // same partition + Put put1 = preparePut1(); + Put put3 = preparePut3(); + snapshot.putIntoWriteSet(new Snapshot.Key(put1), put1); + snapshot.putIntoWriteSet(new Snapshot.Key(put3), put3); + return snapshot; } @@ -160,19 +203,20 @@ private void setBeforePreparationSnapshotHookIfNeeded(boolean withSnapshotHook) public void commit_SnapshotWithDifferentPartitionPutsGiven_ShouldCommitRespectively( boolean withSnapshotHook) throws CommitException, UnknownTransactionStatusException, ExecutionException, - CoordinatorException { + CoordinatorException, ValidationConflictException { // Arrange - Snapshot snapshot = prepareSnapshotWithDifferentPartitionPut(); + Snapshot snapshot = spy(prepareSnapshotWithDifferentPartitionPut()); ReadWriteSets readWriteSets = snapshot.getReadWriteSets(); doNothing().when(storage).mutate(anyList()); doNothingWhenCoordinatorPutState(); setBeforePreparationSnapshotHookIfNeeded(withSnapshotHook); // Act - handler.commit(snapshot); + handler.commit(snapshot, false); // Assert verify(storage, times(4)).mutate(anyList()); + verify(snapshot).toSerializable(storage); verifyCoordinatorPutState(TransactionState.COMMITTED); verifySnapshotHook(withSnapshotHook, readWriteSets); verify(handler, never()).onFailureBeforeCommit(any()); @@ -182,19 +226,88 @@ public void commit_SnapshotWithDifferentPartitionPutsGiven_ShouldCommitRespectiv @ValueSource(booleans = {false, true}) public void commit_SnapshotWithSamePartitionPutsGiven_ShouldCommitAtOnce(boolean withSnapshotHook) throws CommitException, UnknownTransactionStatusException, ExecutionException, - CoordinatorException { + CoordinatorException, ValidationConflictException { // Arrange - Snapshot snapshot = prepareSnapshotWithSamePartitionPut(); + Snapshot snapshot = spy(prepareSnapshotWithSamePartitionPut()); ReadWriteSets readWriteSets = snapshot.getReadWriteSets(); doNothing().when(storage).mutate(anyList()); doNothingWhenCoordinatorPutState(); setBeforePreparationSnapshotHookIfNeeded(withSnapshotHook); // Act - handler.commit(snapshot); + handler.commit(snapshot, false); // Assert verify(storage, times(2)).mutate(anyList()); + verify(snapshot).toSerializable(storage); + verifyCoordinatorPutState(TransactionState.COMMITTED); + verifySnapshotHook(withSnapshotHook, readWriteSets); + verify(handler, never()).onFailureBeforeCommit(any()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void commit_InReadOnlyMode_ShouldNotPrepareRecordsAndCommitStateAndCommitRecords( + boolean withSnapshotHook) + throws CommitException, UnknownTransactionStatusException, ExecutionException, + CoordinatorException, ValidationConflictException { + // Arrange + Snapshot snapshot = spy(prepareSnapshotWithoutWrites()); + ReadWriteSets readWriteSets = snapshot.getReadWriteSets(); + setBeforePreparationSnapshotHookIfNeeded(withSnapshotHook); + + // Act + handler.commit(snapshot, true); + + // Assert + verify(storage, never()).mutate(anyList()); + verify(snapshot).toSerializable(storage); + verify(coordinator, never()).putState(any()); + verifySnapshotHook(withSnapshotHook, readWriteSets); + verify(handler, never()).onFailureBeforeCommit(any()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void + commit_NoWritesAndDeletesInSnapshot_ShouldNotPrepareRecordsAndCommitStateAndCommitRecords( + boolean withSnapshotHook) + throws CommitException, UnknownTransactionStatusException, ExecutionException, + CoordinatorException, ValidationConflictException { + // Arrange + Snapshot snapshot = spy(prepareSnapshotWithoutWrites()); + ReadWriteSets readWriteSets = snapshot.getReadWriteSets(); + setBeforePreparationSnapshotHookIfNeeded(withSnapshotHook); + + // Act + handler.commit(snapshot, false); + + // Assert + verify(storage, never()).mutate(anyList()); + verify(snapshot).toSerializable(storage); + verify(coordinator, never()).putState(any()); + verifySnapshotHook(withSnapshotHook, readWriteSets); + verify(handler, never()).onFailureBeforeCommit(any()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void commit_NoReadsInSnapshot_ShouldNotValidateRecords(boolean withSnapshotHook) + throws CommitException, UnknownTransactionStatusException, ExecutionException, + CoordinatorException, ValidationConflictException { + // Arrange + Snapshot snapshot = spy(prepareSnapshotWithoutReads()); + ReadWriteSets readWriteSets = snapshot.getReadWriteSets(); + doNothing().when(storage).mutate(anyList()); + doNothingWhenCoordinatorPutState(); + setBeforePreparationSnapshotHookIfNeeded(withSnapshotHook); + + // Act + handler.commit(snapshot, false); + + // Assert + verify(storage, times(2)).mutate(anyList()); + verify(snapshot, never()).toSerializable(storage); verifyCoordinatorPutState(TransactionState.COMMITTED); verifySnapshotHook(withSnapshotHook, readWriteSets); verify(handler, never()).onFailureBeforeCommit(any()); @@ -210,7 +323,8 @@ public void commit_NoMutationExceptionThrownInPrepareRecords_ShouldThrowCCExcept doNothing().when(handler).rollbackRecords(any(Snapshot.class)); // Act - assertThatThrownBy(() -> handler.commit(snapshot)).isInstanceOf(CommitConflictException.class); + assertThatThrownBy(() -> handler.commit(snapshot, false)) + .isInstanceOf(CommitConflictException.class); // Assert @@ -233,7 +347,8 @@ public void commit_RetriableExecutionExceptionThrownInPrepareRecords_ShouldThrow doNothing().when(handler).rollbackRecords(any(Snapshot.class)); // Act - assertThatThrownBy(() -> handler.commit(snapshot)).isInstanceOf(CommitConflictException.class); + assertThatThrownBy(() -> handler.commit(snapshot, false)) + .isInstanceOf(CommitConflictException.class); // Assert @@ -256,7 +371,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords doNothing().when(handler).rollbackRecords(any(Snapshot.class)); // Act - assertThatThrownBy(() -> handler.commit(snapshot)).isInstanceOf(CommitException.class); + assertThatThrownBy(() -> handler.commit(snapshot, false)).isInstanceOf(CommitException.class); // Assert @@ -285,7 +400,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords doNothing().when(handler).rollbackRecords(any(Snapshot.class)); // Act - assertThatThrownBy(() -> handler.commit(snapshot)).isInstanceOf(CommitException.class); + assertThatThrownBy(() -> handler.commit(snapshot, false)).isInstanceOf(CommitException.class); // Assert @@ -312,7 +427,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords doReturn(Optional.empty()).when(coordinator).getState(anyId()); // Act - assertThatThrownBy(() -> handler.commit(snapshot)) + assertThatThrownBy(() -> handler.commit(snapshot, false)) .isInstanceOf(UnknownTransactionStatusException.class); // Assert @@ -340,7 +455,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords doThrow(CoordinatorException.class).when(coordinator).getState(anyId()); // Act - assertThatThrownBy(() -> handler.commit(snapshot)) + assertThatThrownBy(() -> handler.commit(snapshot, false)) .isInstanceOf(UnknownTransactionStatusException.class); // Assert @@ -367,7 +482,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords .putState(new Coordinator.State(anyId(), TransactionState.ABORTED)); // Act - assertThatThrownBy(() -> handler.commit(snapshot)) + assertThatThrownBy(() -> handler.commit(snapshot, false)) .isInstanceOf(UnknownTransactionStatusException.class); // Assert @@ -392,7 +507,7 @@ public void commit_ValidationConflictExceptionThrownInValidation_ShouldAbortAndR doNothing().when(handler).rollbackRecords(any(Snapshot.class)); // Act - assertThatThrownBy(() -> handler.commit(snapshot)).isInstanceOf(CommitException.class); + assertThatThrownBy(() -> handler.commit(snapshot, false)).isInstanceOf(CommitException.class); // Assert @@ -416,7 +531,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() doNothing().when(handler).rollbackRecords(any(Snapshot.class)); // Act - assertThatThrownBy(() -> handler.commit(snapshot)).isInstanceOf(CommitException.class); + assertThatThrownBy(() -> handler.commit(snapshot, false)).isInstanceOf(CommitException.class); // Assert @@ -446,7 +561,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() doNothing().when(handler).rollbackRecords(any(Snapshot.class)); // Act - assertThatThrownBy(() -> handler.commit(snapshot)).isInstanceOf(CommitException.class); + assertThatThrownBy(() -> handler.commit(snapshot, false)).isInstanceOf(CommitException.class); // Assert @@ -474,7 +589,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() doReturn(Optional.empty()).when(coordinator).getState(anyId()); // Act - assertThatThrownBy(() -> handler.commit(snapshot)) + assertThatThrownBy(() -> handler.commit(snapshot, false)) .isInstanceOf(UnknownTransactionStatusException.class); // Assert @@ -503,7 +618,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() doThrow(CoordinatorException.class).when(coordinator).getState(anyId()); // Act - assertThatThrownBy(() -> handler.commit(snapshot)) + assertThatThrownBy(() -> handler.commit(snapshot, false)) .isInstanceOf(UnknownTransactionStatusException.class); // Assert @@ -531,7 +646,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() .putState(new Coordinator.State(anyId(), TransactionState.ABORTED)); // Act - assertThatThrownBy(() -> handler.commit(snapshot)) + assertThatThrownBy(() -> handler.commit(snapshot, false)) .isInstanceOf(UnknownTransactionStatusException.class); // Assert @@ -560,7 +675,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() .getState(anyId()); // Act - handler.commit(snapshot); + handler.commit(snapshot, false); // Assert verify(storage, times(4)).mutate(anyList()); @@ -584,7 +699,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() .getState(anyId()); // Act - assertThatThrownBy(() -> handler.commit(snapshot)).isInstanceOf(CommitException.class); + assertThatThrownBy(() -> handler.commit(snapshot, false)).isInstanceOf(CommitException.class); // Assert verify(storage, times(2)).mutate(anyList()); @@ -606,7 +721,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() doReturn(Optional.empty()).when(coordinator).getState(anyId()); // Act - assertThatThrownBy(() -> handler.commit(snapshot)) + assertThatThrownBy(() -> handler.commit(snapshot, false)) .isInstanceOf(UnknownTransactionStatusException.class); // Assert @@ -629,7 +744,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() doThrow(CoordinatorException.class).when(coordinator).getState(anyId()); // Act - assertThatThrownBy(() -> handler.commit(snapshot)) + assertThatThrownBy(() -> handler.commit(snapshot, false)) .isInstanceOf(UnknownTransactionStatusException.class); // Assert @@ -649,7 +764,7 @@ public void commit_ExceptionThrownInCoordinatorCommit_ShouldThrowUnknown() doThrowExceptionWhenCoordinatorPutState(TransactionState.COMMITTED, CoordinatorException.class); // Act - assertThatThrownBy(() -> handler.commit(snapshot)) + assertThatThrownBy(() -> handler.commit(snapshot, false)) .isInstanceOf(UnknownTransactionStatusException.class); // Assert @@ -683,7 +798,7 @@ public Future handle( // Act Instant start = Instant.now(); - assertThatThrownBy(() -> handler.commit(snapshot)) + assertThatThrownBy(() -> handler.commit(snapshot, false)) .isInstanceOf(UnknownTransactionStatusException.class); Instant end = Instant.now(); @@ -709,7 +824,7 @@ public void commit_FailingSnapshotHookGiven_ShouldThrowCommitException() handler.setBeforePreparationSnapshotHook(beforePreparationSnapshotHook); // Act - assertThatThrownBy(() -> handler.commit(snapshot)).isInstanceOf(CommitException.class); + assertThatThrownBy(() -> handler.commit(snapshot, false)).isInstanceOf(CommitException.class); // Assert verify(storage, never()).mutate(anyList()); @@ -733,7 +848,7 @@ public void commit_FailingSnapshotHookFutureGiven_ShouldThrowCommitException() setBeforePreparationSnapshotHookIfNeeded(true); // Act - assertThatThrownBy(() -> handler.commit(snapshot)).isInstanceOf(CommitException.class); + assertThatThrownBy(() -> handler.commit(snapshot, false)).isInstanceOf(CommitException.class); // Assert verify(storage, times(2)).mutate(anyList()); @@ -747,7 +862,6 @@ public void commit_FailingSnapshotHookFutureGiven_ShouldThrowCommitException() protected void doThrowExceptionWhenCoordinatorPutState( TransactionState targetState, Class exceptionClass) throws CoordinatorException { - doThrow(exceptionClass).when(coordinator).putState(new Coordinator.State(anyId(), targetState)); } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java index fdafe6dc7d..fad32b8041 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java @@ -6,15 +6,24 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import com.scalar.db.api.TransactionState; +import com.scalar.db.exception.storage.ExecutionException; +import com.scalar.db.exception.transaction.CommitException; +import com.scalar.db.exception.transaction.UnknownTransactionStatusException; +import com.scalar.db.exception.transaction.ValidationConflictException; import com.scalar.db.transaction.consensuscommit.CoordinatorGroupCommitter.CoordinatorGroupCommitKeyManipulator; import com.scalar.db.util.groupcommit.GroupCommitConfig; import java.util.List; import java.util.UUID; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.Captor; @@ -45,7 +54,8 @@ protected void extraCleanup() { private void createGroupCommitterIfNotExists() { if (groupCommitter == null) { - groupCommitter = new CoordinatorGroupCommitter(new GroupCommitConfig(4, 100, 500, 60000, 10)); + groupCommitter = + spy(new CoordinatorGroupCommitter(new GroupCommitConfig(4, 100, 500, 60000, 10))); } } @@ -92,4 +102,75 @@ protected void verifyCoordinatorPutState(TransactionState expectedTransactionSta assertThat(fullIds.size()).isEqualTo(1); assertThat(fullIds.get(0)).isEqualTo(anyId()); } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + @Override + public void commit_SnapshotWithDifferentPartitionPutsGiven_ShouldCommitRespectively( + boolean withSnapshotHook) + throws CommitException, UnknownTransactionStatusException, ExecutionException, + CoordinatorException, ValidationConflictException { + super.commit_SnapshotWithDifferentPartitionPutsGiven_ShouldCommitRespectively(withSnapshotHook); + + // Assert + verify(groupCommitter, never()).remove(anyId()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + @Override + public void commit_SnapshotWithSamePartitionPutsGiven_ShouldCommitAtOnce(boolean withSnapshotHook) + throws CommitException, UnknownTransactionStatusException, ExecutionException, + CoordinatorException, ValidationConflictException { + super.commit_SnapshotWithSamePartitionPutsGiven_ShouldCommitAtOnce(withSnapshotHook); + + // Assert + verify(groupCommitter, never()).remove(anyId()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + @Override + public void commit_InReadOnlyMode_ShouldNotPrepareRecordsAndCommitStateAndCommitRecords( + boolean withSnapshotHook) + throws CommitException, UnknownTransactionStatusException, ExecutionException, + CoordinatorException, ValidationConflictException { + // Arrange + groupCommitter.remove(anyId()); + clearInvocations(groupCommitter); + + super.commit_InReadOnlyMode_ShouldNotPrepareRecordsAndCommitStateAndCommitRecords( + withSnapshotHook); + + // Assert + verify(groupCommitter, never()).remove(anyId()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + @Override + public void + commit_NoWritesAndDeletesInSnapshot_ShouldNotPrepareRecordsAndCommitStateAndCommitRecords( + boolean withSnapshotHook) + throws CommitException, UnknownTransactionStatusException, ExecutionException, + CoordinatorException, ValidationConflictException { + + super.commit_NoWritesAndDeletesInSnapshot_ShouldNotPrepareRecordsAndCommitStateAndCommitRecords( + withSnapshotHook); + + // Assert + verify(groupCommitter).remove(anyId()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + @Override + public void commit_NoReadsInSnapshot_ShouldNotValidateRecords(boolean withSnapshotHook) + throws CommitException, UnknownTransactionStatusException, ExecutionException, + CoordinatorException, ValidationConflictException { + super.commit_NoReadsInSnapshot_ShouldNotValidateRecords(withSnapshotHook); + + // Assert + verify(groupCommitter, never()).remove(anyId()); + } } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java index e1d61e8253..ce0a14e1c8 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java @@ -3,11 +3,13 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -29,6 +31,7 @@ import com.scalar.db.api.Update; import com.scalar.db.api.Upsert; import com.scalar.db.common.ActiveTransactionManagedDistributedTransactionManager; +import com.scalar.db.common.DecoratedDistributedTransaction; import com.scalar.db.common.ReadOnlyDistributedTransaction; import com.scalar.db.config.DatabaseConfig; import com.scalar.db.exception.transaction.CommitConflictException; @@ -144,6 +147,40 @@ public void begin_TxIdGiven_ReturnWithSpecifiedTxIdAndSnapshotIsolation() { assertThat(snapshot.getIsolation()).isEqualTo(Isolation.SNAPSHOT); } + @Test + public void + beginReadOnly_TxIdGivenWithGroupCommitter_ReturnWithSpecifiedTxIdAndSnapshotIsolation() { + // Arrange + CoordinatorGroupCommitKeyManipulator keyManipulator = + new CoordinatorGroupCommitKeyManipulator(); + CoordinatorGroupCommitter groupCommitter = mock(CoordinatorGroupCommitter.class); + ConsensusCommitManager managerWithGroupCommit = + new ConsensusCommitManager( + storage, + admin, + consensusCommitConfig, + databaseConfig, + coordinator, + parallelExecutor, + recovery, + commit, + groupCommitter); + + // Act + DistributedTransaction transaction = managerWithGroupCommit.beginReadOnly(ANY_TX_ID); + + // Assert + assertThat(transaction.getId()).isEqualTo(ANY_TX_ID); + Snapshot snapshot = + ((ConsensusCommit) ((DecoratedDistributedTransaction) transaction).getOriginalTransaction()) + .getCrudHandler() + .getSnapshot(); + assertThat(snapshot.getId()).isEqualTo(ANY_TX_ID); + assertThat(keyManipulator.isFullKey(transaction.getId())).isFalse(); + verify(groupCommitter, never()).reserve(ANY_TX_ID); + assertThat(snapshot.getIsolation()).isEqualTo(Isolation.SNAPSHOT); + } + @Test public void begin_CalledTwice_ReturnRespectiveConsensusCommitWithSharedCommitAndRecovery() { // Arrange @@ -368,7 +405,7 @@ public void resume_CalledWithBeginAndCommit_CommitExceptionThrown_ReturnSameTran DistributedTransactionManager manager = new ActiveTransactionManagedDistributedTransactionManager(this.manager, -1); - doThrow(CommitException.class).when(commit).commit(any()); + doThrow(CommitException.class).when(commit).commit(any(), anyBoolean()); DistributedTransaction transaction1 = manager.begin(ANY_TX_ID); try { @@ -447,7 +484,7 @@ public void join_CalledWithBeginAndCommit_CommitExceptionThrown_ReturnSameTransa DistributedTransactionManager manager = new ActiveTransactionManagedDistributedTransactionManager(this.manager, -1); - doThrow(CommitException.class).when(commit).commit(any()); + doThrow(CommitException.class).when(commit).commit(any(), anyBoolean()); DistributedTransaction transaction1 = manager.begin(ANY_TX_ID); try { diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitTest.java index f81a85b70b..99a6f6ae37 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitTest.java @@ -4,6 +4,7 @@ import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -707,15 +708,32 @@ public void mutate_PutAndDeleteGiven_ShouldCallCrudHandlerPutAndDelete() public void commit_ProcessedCrudGiven_ShouldCommitWithSnapshot() throws CommitException, UnknownTransactionStatusException, CrudException { // Arrange - doNothing().when(commit).commit(any(Snapshot.class)); + doNothing().when(commit).commit(any(Snapshot.class), anyBoolean()); when(crud.getSnapshot()).thenReturn(snapshot); + when(crud.isReadOnly()).thenReturn(false); // Act consensus.commit(); // Assert verify(crud).readIfImplicitPreReadEnabled(); - verify(commit).commit(snapshot); + verify(commit).commit(snapshot, false); + } + + @Test + public void commit_ProcessedCrudGiven_InReadOnlyMode_ShouldCommitWithSnapshot() + throws CommitException, UnknownTransactionStatusException, CrudException { + // Arrange + doNothing().when(commit).commit(any(Snapshot.class), anyBoolean()); + when(crud.getSnapshot()).thenReturn(snapshot); + when(crud.isReadOnly()).thenReturn(true); + + // Act + consensus.commit(); + + // Assert + verify(crud).readIfImplicitPreReadEnabled(); + verify(commit).commit(snapshot, true); } @Test @@ -807,4 +825,27 @@ public void rollback_WithGroupCommitter_ShouldRemoveTxFromGroupCommitter() verify(commit, never()).rollbackRecords(any(Snapshot.class)); verify(commit, never()).abortState(anyString()); } + + @Test + public void rollback_WithGroupCommitter_InReadOnlyMode_ShouldNotRemoveTxFromGroupCommitter() + throws CrudException, UnknownTransactionStatusException { + // Arrange + String txId = "tx-id"; + Snapshot snapshot = mock(Snapshot.class); + doReturn(txId).when(snapshot).getId(); + doReturn(snapshot).when(crud).getSnapshot(); + doReturn(true).when(crud).isReadOnly(); + CoordinatorGroupCommitter groupCommitter = mock(CoordinatorGroupCommitter.class); + ConsensusCommit consensusWithGroupCommit = + new ConsensusCommit(crud, commit, recovery, mutationOperationChecker, groupCommitter); + + // Act + consensusWithGroupCommit.rollback(); + + // Assert + verify(crud).closeScanners(); + verify(groupCommitter, never()).remove(anyString()); + verify(commit, never()).rollbackRecords(any(Snapshot.class)); + verify(commit, never()).abortState(anyString()); + } } From e7cefba1f6ee150d744ec273f6037d8d72022c40 Mon Sep 17 00:00:00 2001 From: brfrn169 Date: Sat, 14 Jun 2025 10:21:04 +0900 Subject: [PATCH 2/6] Fix --- .../consensuscommit/CommitHandler.java | 10 ++++++-- .../consensuscommit/CommitHandlerTest.java | 24 +++++++++++++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java index 3fb457fe1a..59ff1ff09d 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java @@ -134,8 +134,14 @@ public void commit(Snapshot snapshot, boolean readOnly) validate(snapshot); } catch (ValidationException e) { safelyCallOnFailureBeforeCommit(snapshot); - abortState(snapshot.getId()); - rollbackRecords(snapshot); + + // If the transaction has no writes and deletes, we don't need to abort-state and + // rollback-records since there are no changes to be made. + if (!hasNoWritesAndDeletesInSnapshot) { + abortState(snapshot.getId()); + rollbackRecords(snapshot); + } + if (e instanceof ValidationConflictException) { throw new CommitConflictException(e.getMessage(), e, e.getTransactionId().orElse(null)); } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java index 7d4c8fe636..0c4f13b97f 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java @@ -290,6 +290,30 @@ public void commit_InReadOnlyMode_ShouldNotPrepareRecordsAndCommitStateAndCommit verify(handler, never()).onFailureBeforeCommit(any()); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void + commit_NoWritesAndDeletesInSnapshot_ValidationFailed_ShouldNotPrepareRecordsAndAbortStateAndRollbackRecords( + boolean withSnapshotHook) + throws ExecutionException, CoordinatorException, ValidationConflictException { + // Arrange + Snapshot snapshot = spy(prepareSnapshotWithoutWrites()); + ReadWriteSets readWriteSets = snapshot.getReadWriteSets(); + setBeforePreparationSnapshotHookIfNeeded(withSnapshotHook); + doThrow(ValidationConflictException.class).when(snapshot).toSerializable(storage); + + // Act Assert + assertThatThrownBy(() -> handler.commit(snapshot, false)) + .isInstanceOf(CommitConflictException.class); + + // Assert + verify(storage, never()).mutate(anyList()); + verify(snapshot).toSerializable(storage); + verify(coordinator, never()).putState(any()); + verifySnapshotHook(withSnapshotHook, readWriteSets); + verify(handler).onFailureBeforeCommit(any()); + } + @ParameterizedTest @ValueSource(booleans = {false, true}) public void commit_NoReadsInSnapshot_ShouldNotValidateRecords(boolean withSnapshotHook) From 6c6299ddb11bc046d23766ef6b544dcbd7a3dc0c Mon Sep 17 00:00:00 2001 From: brfrn169 Date: Mon, 16 Jun 2025 14:51:18 +0900 Subject: [PATCH 3/6] Fix based on feedback --- .../db/transaction/consensuscommit/CommitHandler.java | 10 +++++----- .../consensuscommit/CommitHandlerWithGroupCommit.java | 2 +- .../db/transaction/consensuscommit/Snapshot.java | 8 ++++---- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java index 59ff1ff09d..c56731172f 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java @@ -108,11 +108,11 @@ private void waitBeforePreparationSnapshotHookFuture( public void commit(Snapshot snapshot, boolean readOnly) throws CommitException, UnknownTransactionStatusException { - boolean hasNoWritesAndDeletesInSnapshot = readOnly || snapshot.hasNoWritesAndDeletes(); + boolean hasSomeWritesOrDeletesInSnapshot = !readOnly && snapshot.hasSomeWritesOrDeletes(); Optional> snapshotHookFuture = invokeBeforePreparationSnapshotHook(snapshot); - if (!hasNoWritesAndDeletesInSnapshot) { + if (hasSomeWritesOrDeletesInSnapshot) { try { prepare(snapshot); } catch (PreparationException e) { @@ -129,7 +129,7 @@ public void commit(Snapshot snapshot, boolean readOnly) } } - if (!snapshot.hasNoReads()) { + if (snapshot.hasSomeReads()) { try { validate(snapshot); } catch (ValidationException e) { @@ -137,7 +137,7 @@ public void commit(Snapshot snapshot, boolean readOnly) // If the transaction has no writes and deletes, we don't need to abort-state and // rollback-records since there are no changes to be made. - if (!hasNoWritesAndDeletesInSnapshot) { + if (hasSomeWritesOrDeletesInSnapshot) { abortState(snapshot.getId()); rollbackRecords(snapshot); } @@ -154,7 +154,7 @@ public void commit(Snapshot snapshot, boolean readOnly) waitBeforePreparationSnapshotHookFuture(snapshot, snapshotHookFuture.orElse(null)); - if (!hasNoWritesAndDeletesInSnapshot) { + if (hasSomeWritesOrDeletesInSnapshot) { commitState(snapshot); commitRecords(snapshot); } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java index 6decbfdbe0..652b45954a 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java @@ -41,7 +41,7 @@ public CommitHandlerWithGroupCommit( @Override public void commit(Snapshot snapshot, boolean readOnly) throws CommitException, UnknownTransactionStatusException { - if (!readOnly && snapshot.hasNoWritesAndDeletes()) { + if (!readOnly && !snapshot.hasSomeWritesOrDeletes()) { cancelGroupCommitIfNeeded(snapshot.getId()); } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java index 8f9ce2abbc..4507575932 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java @@ -217,12 +217,12 @@ public boolean containsKeyInGetSet(Get get) { return getSet.containsKey(get); } - public boolean hasNoWritesAndDeletes() { - return writeSet.isEmpty() && deleteSet.isEmpty(); + public boolean hasSomeWritesOrDeletes() { + return !writeSet.isEmpty() || !deleteSet.isEmpty(); } - public boolean hasNoReads() { - return getSet.isEmpty() && scanSet.isEmpty() && scannerSet.isEmpty(); + public boolean hasSomeReads() { + return !getSet.isEmpty() || !scanSet.isEmpty() || !scannerSet.isEmpty(); } public Optional getResult(Key key) throws CrudException { From a40322c3760b6e0dfb8a5f273fc0fecd6bda056c Mon Sep 17 00:00:00 2001 From: brfrn169 Date: Tue, 17 Jun 2025 15:05:04 +0900 Subject: [PATCH 4/6] Add a property `scalar.db.consensus_commit.coordinator.write_omission_on_read_only.enabled` --- .../consensuscommit/CommitHandler.java | 13 +++- .../CommitHandlerWithGroupCommit.java | 12 ++- .../ConsensusCommitConfig.java | 78 +++++++++---------- .../ConsensusCommitManager.java | 14 +++- .../TwoPhaseConsensusCommitManager.java | 16 +++- .../consensuscommit/CommitHandlerTest.java | 60 +++++++++++++- .../CommitHandlerWithGroupCommitTest.java | 25 +++++- .../ConsensusCommitConfigTest.java | 24 +++++- ...CommitNullMetadataIntegrationTestBase.java | 4 +- ...nsusCommitSpecificIntegrationTestBase.java | 4 +- 10 files changed, 188 insertions(+), 62 deletions(-) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java index c56731172f..6859687fe7 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java @@ -36,6 +36,7 @@ public class CommitHandler { protected final Coordinator coordinator; private final TransactionTableMetadataManager tableMetadataManager; private final ParallelExecutor parallelExecutor; + protected final boolean coordinatorWriteOmissionOnReadOnlyEnabled; @LazyInit @Nullable private BeforePreparationSnapshotHook beforePreparationSnapshotHook; @@ -44,11 +45,13 @@ public CommitHandler( DistributedStorage storage, Coordinator coordinator, TransactionTableMetadataManager tableMetadataManager, - ParallelExecutor parallelExecutor) { + ParallelExecutor parallelExecutor, + boolean coordinatorWriteOmissionOnReadOnlyEnabled) { this.storage = checkNotNull(storage); this.coordinator = checkNotNull(coordinator); this.tableMetadataManager = checkNotNull(tableMetadataManager); this.parallelExecutor = checkNotNull(parallelExecutor); + this.coordinatorWriteOmissionOnReadOnlyEnabled = coordinatorWriteOmissionOnReadOnlyEnabled; } /** @@ -137,8 +140,10 @@ public void commit(Snapshot snapshot, boolean readOnly) // If the transaction has no writes and deletes, we don't need to abort-state and // rollback-records since there are no changes to be made. - if (hasSomeWritesOrDeletesInSnapshot) { + if (hasSomeWritesOrDeletesInSnapshot || !coordinatorWriteOmissionOnReadOnlyEnabled) { abortState(snapshot.getId()); + } + if (hasSomeWritesOrDeletesInSnapshot) { rollbackRecords(snapshot); } @@ -154,8 +159,10 @@ public void commit(Snapshot snapshot, boolean readOnly) waitBeforePreparationSnapshotHookFuture(snapshot, snapshotHookFuture.orElse(null)); - if (hasSomeWritesOrDeletesInSnapshot) { + if (hasSomeWritesOrDeletesInSnapshot || !coordinatorWriteOmissionOnReadOnlyEnabled) { commitState(snapshot); + } + if (hasSomeWritesOrDeletesInSnapshot) { commitRecords(snapshot); } } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java index 652b45954a..2c749519f7 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java @@ -29,8 +29,14 @@ public CommitHandlerWithGroupCommit( Coordinator coordinator, TransactionTableMetadataManager tableMetadataManager, ParallelExecutor parallelExecutor, + boolean coordinatorWriteOmissionOnReadOnlyEnabled, CoordinatorGroupCommitter groupCommitter) { - super(storage, coordinator, tableMetadataManager, parallelExecutor); + super( + storage, + coordinator, + tableMetadataManager, + parallelExecutor, + coordinatorWriteOmissionOnReadOnlyEnabled); checkNotNull(groupCommitter); // The methods of this emitter will be called via GroupCommitter.ready(). @@ -41,7 +47,9 @@ public CommitHandlerWithGroupCommit( @Override public void commit(Snapshot snapshot, boolean readOnly) throws CommitException, UnknownTransactionStatusException { - if (!readOnly && !snapshot.hasSomeWritesOrDeletes()) { + if (!readOnly + && !snapshot.hasSomeWritesOrDeletes() + && coordinatorWriteOmissionOnReadOnlyEnabled) { cancelGroupCommitIfNeeded(snapshot.getId()); } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfig.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfig.java index cff4f4f361..ea61b36c58 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfig.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfig.java @@ -33,6 +33,9 @@ public class ConsensusCommitConfig { public static final String ASYNC_COMMIT_ENABLED = PREFIX + "async_commit.enabled"; public static final String ASYNC_ROLLBACK_ENABLED = PREFIX + "async_rollback.enabled"; + public static final String COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED = + PREFIX + "coordinator.write_omission_on_read_only.enabled"; + public static final String PARALLEL_IMPLICIT_PRE_READ = PREFIX + "parallel_implicit_pre_read.enabled"; @@ -73,10 +76,12 @@ public class ConsensusCommitConfig { private final boolean asyncCommitEnabled; private final boolean asyncRollbackEnabled; - private final boolean isIncludeMetadataEnabled; + private final boolean coordinatorWriteOmissionOnReadOnlyEnabled; private final boolean parallelImplicitPreReadEnabled; + private final boolean isIncludeMetadataEnabled; + private final boolean coordinatorGroupCommitEnabled; private final int coordinatorGroupCommitSlotCapacity; private final int coordinatorGroupCommitGroupSizeFixTimeoutMillis; @@ -92,7 +97,9 @@ public ConsensusCommitConfig(DatabaseConfig databaseConfig) { DatabaseConfig.TRANSACTION_MANAGER + " should be '" + TRANSACTION_MANAGER_NAME + "'"); } - if (databaseConfig.getProperties().containsKey("scalar.db.isolation_level")) { + Properties properties = databaseConfig.getProperties(); + + if (properties.containsKey("scalar.db.isolation_level")) { logger.warn( "The property \"scalar.db.isolation_level\" is deprecated and will be removed in 5.0.0. " + "Please use \"" @@ -102,19 +109,17 @@ public ConsensusCommitConfig(DatabaseConfig databaseConfig) { isolation = Isolation.valueOf( getString( - databaseConfig.getProperties(), + properties, ISOLATION_LEVEL, getString( - databaseConfig.getProperties(), + properties, "scalar.db.isolation_level", // for backward compatibility Isolation.SNAPSHOT.toString())) .toUpperCase(Locale.ROOT)); if (isolation.equals(Isolation.SERIALIZABLE)) { validateCrossPartitionScanConfig(databaseConfig); - if (databaseConfig - .getProperties() - .containsKey("scalar.db.consensus_commit.serializable_strategy")) { + if (properties.containsKey("scalar.db.consensus_commit.serializable_strategy")) { logger.warn( "The property \"scalar.db.consensus_commit.serializable_strategy\" is deprecated and will " + "be removed in 5.0.0. The EXTRA_READ strategy is always used for the SERIALIZABLE " @@ -122,71 +127,60 @@ public ConsensusCommitConfig(DatabaseConfig databaseConfig) { } } - coordinatorNamespace = getString(databaseConfig.getProperties(), COORDINATOR_NAMESPACE, null); + coordinatorNamespace = getString(properties, COORDINATOR_NAMESPACE, null); parallelExecutorCount = - getInt( - databaseConfig.getProperties(), - PARALLEL_EXECUTOR_COUNT, - DEFAULT_PARALLEL_EXECUTOR_COUNT); - parallelPreparationEnabled = - getBoolean(databaseConfig.getProperties(), PARALLEL_PREPARATION_ENABLED, true); - parallelCommitEnabled = - getBoolean(databaseConfig.getProperties(), PARALLEL_COMMIT_ENABLED, true); + getInt(properties, PARALLEL_EXECUTOR_COUNT, DEFAULT_PARALLEL_EXECUTOR_COUNT); + parallelPreparationEnabled = getBoolean(properties, PARALLEL_PREPARATION_ENABLED, true); + parallelCommitEnabled = getBoolean(properties, PARALLEL_COMMIT_ENABLED, true); // Use the value of parallel commit for parallel validation and parallel rollback as default // value parallelValidationEnabled = - getBoolean( - databaseConfig.getProperties(), PARALLEL_VALIDATION_ENABLED, parallelCommitEnabled); + getBoolean(properties, PARALLEL_VALIDATION_ENABLED, parallelCommitEnabled); parallelRollbackEnabled = - getBoolean( - databaseConfig.getProperties(), PARALLEL_ROLLBACK_ENABLED, parallelCommitEnabled); + getBoolean(properties, PARALLEL_ROLLBACK_ENABLED, parallelCommitEnabled); - asyncCommitEnabled = getBoolean(databaseConfig.getProperties(), ASYNC_COMMIT_ENABLED, false); + asyncCommitEnabled = getBoolean(properties, ASYNC_COMMIT_ENABLED, false); // Use the value of async commit for async rollback as default value - asyncRollbackEnabled = - getBoolean(databaseConfig.getProperties(), ASYNC_ROLLBACK_ENABLED, asyncCommitEnabled); + asyncRollbackEnabled = getBoolean(properties, ASYNC_ROLLBACK_ENABLED, asyncCommitEnabled); + + coordinatorWriteOmissionOnReadOnlyEnabled = + getBoolean(properties, COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED, true); - isIncludeMetadataEnabled = - getBoolean(databaseConfig.getProperties(), INCLUDE_METADATA_ENABLED, false); + parallelImplicitPreReadEnabled = getBoolean(properties, PARALLEL_IMPLICIT_PRE_READ, true); - parallelImplicitPreReadEnabled = - getBoolean(databaseConfig.getProperties(), PARALLEL_IMPLICIT_PRE_READ, true); + isIncludeMetadataEnabled = getBoolean(properties, INCLUDE_METADATA_ENABLED, false); - coordinatorGroupCommitEnabled = - getBoolean(databaseConfig.getProperties(), COORDINATOR_GROUP_COMMIT_ENABLED, false); + coordinatorGroupCommitEnabled = getBoolean(properties, COORDINATOR_GROUP_COMMIT_ENABLED, false); coordinatorGroupCommitSlotCapacity = getInt( - databaseConfig.getProperties(), + properties, COORDINATOR_GROUP_COMMIT_SLOT_CAPACITY, DEFAULT_COORDINATOR_GROUP_COMMIT_SLOT_CAPACITY); coordinatorGroupCommitGroupSizeFixTimeoutMillis = getInt( - databaseConfig.getProperties(), + properties, COORDINATOR_GROUP_COMMIT_GROUP_SIZE_FIX_TIMEOUT_MILLIS, DEFAULT_COORDINATOR_GROUP_COMMIT_GROUP_SIZE_FIX_TIMEOUT_MILLIS); coordinatorGroupCommitDelayedSlotMoveTimeoutMillis = getInt( - databaseConfig.getProperties(), + properties, COORDINATOR_GROUP_COMMIT_DELAYED_SLOT_MOVE_TIMEOUT_MILLIS, DEFAULT_COORDINATOR_GROUP_COMMIT_DELAYED_SLOT_MOVE_TIMEOUT_MILLIS); coordinatorGroupCommitOldGroupAbortTimeoutMillis = getInt( - databaseConfig.getProperties(), + properties, COORDINATOR_GROUP_COMMIT_OLD_GROUP_ABORT_TIMEOUT_MILLIS, DEFAULT_COORDINATOR_GROUP_COMMIT_OLD_GROUP_ABORT_TIMEOUT_MILLIS); coordinatorGroupCommitTimeoutCheckIntervalMillis = getInt( - databaseConfig.getProperties(), + properties, COORDINATOR_GROUP_COMMIT_TIMEOUT_CHECK_INTERVAL_MILLIS, DEFAULT_COORDINATOR_GROUP_COMMIT_TIMEOUT_CHECK_INTERVAL_MILLIS); coordinatorGroupCommitMetricsMonitorLogEnabled = - getBoolean( - databaseConfig.getProperties(), - COORDINATOR_GROUP_COMMIT_METRICS_MONITOR_LOG_ENABLED, - false); + getBoolean(properties, COORDINATOR_GROUP_COMMIT_METRICS_MONITOR_LOG_ENABLED, false); } public Isolation getIsolation() { @@ -225,14 +219,18 @@ public boolean isAsyncRollbackEnabled() { return asyncRollbackEnabled; } - public boolean isIncludeMetadataEnabled() { - return isIncludeMetadataEnabled; + public boolean isCoordinatorWriteOmissionOnReadOnlyEnabled() { + return coordinatorWriteOmissionOnReadOnlyEnabled; } public boolean isParallelImplicitPreReadEnabled() { return parallelImplicitPreReadEnabled; } + public boolean isIncludeMetadataEnabled() { + return isIncludeMetadataEnabled; + } + public boolean isCoordinatorGroupCommitEnabled() { return coordinatorGroupCommitEnabled; } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java index 4eea4589eb..2913f11e91 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java @@ -130,9 +130,19 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) { private CommitHandler createCommitHandler() { if (isGroupCommitEnabled()) { return new CommitHandlerWithGroupCommit( - storage, coordinator, tableMetadataManager, parallelExecutor, groupCommitter); + storage, + coordinator, + tableMetadataManager, + parallelExecutor, + config.isCoordinatorWriteOmissionOnReadOnlyEnabled(), + groupCommitter); } else { - return new CommitHandler(storage, coordinator, tableMetadataManager, parallelExecutor); + return new CommitHandler( + storage, + coordinator, + tableMetadataManager, + parallelExecutor, + config.isCoordinatorWriteOmissionOnReadOnlyEnabled()); } } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java index 35d8d74aa9..6d718f2510 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java @@ -74,7 +74,13 @@ public TwoPhaseConsensusCommitManager( coordinator = new Coordinator(storage, config); parallelExecutor = new ParallelExecutor(config); recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager); - commit = new CommitHandler(storage, coordinator, tableMetadataManager, parallelExecutor); + commit = + new CommitHandler( + storage, + coordinator, + tableMetadataManager, + parallelExecutor, + config.isCoordinatorWriteOmissionOnReadOnlyEnabled()); isIncludeMetadataEnabled = config.isIncludeMetadataEnabled(); mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager); } @@ -91,7 +97,13 @@ public TwoPhaseConsensusCommitManager(DatabaseConfig databaseConfig) { coordinator = new Coordinator(storage, config); parallelExecutor = new ParallelExecutor(config); recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager); - commit = new CommitHandler(storage, coordinator, tableMetadataManager, parallelExecutor); + commit = + new CommitHandler( + storage, + coordinator, + tableMetadataManager, + parallelExecutor, + config.isCoordinatorWriteOmissionOnReadOnlyEnabled()); isIncludeMetadataEnabled = config.isIncludeMetadataEnabled(); mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager); } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java index 0c4f13b97f..51df00f89e 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java @@ -73,14 +73,19 @@ protected void extraInitialize() {} protected void extraCleanup() {} - protected CommitHandler createCommitHandler() { - return new CommitHandler(storage, coordinator, tableMetadataManager, parallelExecutor); + protected CommitHandler createCommitHandler(boolean coordinatorWriteOmissionOnReadOnlyEnabled) { + return new CommitHandler( + storage, + coordinator, + tableMetadataManager, + parallelExecutor, + coordinatorWriteOmissionOnReadOnlyEnabled); } @BeforeEach void setUp() throws Exception { parallelExecutor = new ParallelExecutor(config); - handler = spy(createCommitHandler()); + handler = spy(createCommitHandler(true)); extraInitialize(); } @@ -290,6 +295,30 @@ public void commit_InReadOnlyMode_ShouldNotPrepareRecordsAndCommitStateAndCommit verify(handler, never()).onFailureBeforeCommit(any()); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void + commit_NoWritesAndDeletesInSnapshot_CoordinatorWriteOmissionOnReadOnlyDisabled_ShouldNotPrepareRecordsAndCommitRecordsButShouldCommitState( + boolean withSnapshotHook) + throws CommitException, UnknownTransactionStatusException, ExecutionException, + CoordinatorException, ValidationConflictException { + // Arrange + handler = spy(createCommitHandler(false)); + Snapshot snapshot = spy(prepareSnapshotWithoutWrites()); + ReadWriteSets readWriteSets = snapshot.getReadWriteSets(); + setBeforePreparationSnapshotHookIfNeeded(withSnapshotHook); + + // Act + handler.commit(snapshot, false); + + // Assert + verify(storage, never()).mutate(anyList()); + verify(snapshot).toSerializable(storage); + verifyCoordinatorPutState(TransactionState.COMMITTED); + verifySnapshotHook(withSnapshotHook, readWriteSets); + verify(handler, never()).onFailureBeforeCommit(any()); + } + @ParameterizedTest @ValueSource(booleans = {false, true}) public void @@ -314,6 +343,31 @@ public void commit_InReadOnlyMode_ShouldNotPrepareRecordsAndCommitStateAndCommit verify(handler).onFailureBeforeCommit(any()); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void + commit_NoWritesAndDeletesInSnapshot_ValidationFailed_CoordinatorWriteOmissionOnReadOnlyDisabled_ShouldNotPrepareRecordsAndRollbackRecordsButShouldAbortState( + boolean withSnapshotHook) + throws ExecutionException, CoordinatorException, ValidationConflictException { + // Arrange + handler = spy(createCommitHandler(false)); + Snapshot snapshot = spy(prepareSnapshotWithoutWrites()); + ReadWriteSets readWriteSets = snapshot.getReadWriteSets(); + setBeforePreparationSnapshotHookIfNeeded(withSnapshotHook); + doThrow(ValidationConflictException.class).when(snapshot).toSerializable(storage); + + // Act Assert + assertThatThrownBy(() -> handler.commit(snapshot, false)) + .isInstanceOf(CommitConflictException.class); + + // Assert + verify(storage, never()).mutate(anyList()); + verify(snapshot).toSerializable(storage); + verify(coordinator).putState(any()); + verifySnapshotHook(withSnapshotHook, readWriteSets); + verify(handler).onFailureBeforeCommit(any()); + } + @ParameterizedTest @ValueSource(booleans = {false, true}) public void commit_NoReadsInSnapshot_ShouldNotValidateRecords(boolean withSnapshotHook) diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java index fad32b8041..fe79a9db69 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java @@ -60,10 +60,15 @@ private void createGroupCommitterIfNotExists() { } @Override - protected CommitHandler createCommitHandler() { + protected CommitHandler createCommitHandler(boolean coordinatorWriteOmissionOnReadOnlyEnabled) { createGroupCommitterIfNotExists(); return new CommitHandlerWithGroupCommit( - storage, coordinator, tableMetadataManager, parallelExecutor, groupCommitter); + storage, + coordinator, + tableMetadataManager, + parallelExecutor, + coordinatorWriteOmissionOnReadOnlyEnabled, + groupCommitter); } private String anyGroupCommitParentId() { @@ -162,6 +167,22 @@ public void commit_InReadOnlyMode_ShouldNotPrepareRecordsAndCommitStateAndCommit verify(groupCommitter).remove(anyId()); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + @Override + public void + commit_NoWritesAndDeletesInSnapshot_CoordinatorWriteOmissionOnReadOnlyDisabled_ShouldNotPrepareRecordsAndCommitRecordsButShouldCommitState( + boolean withSnapshotHook) + throws CommitException, UnknownTransactionStatusException, ExecutionException, + CoordinatorException, ValidationConflictException { + super + .commit_NoWritesAndDeletesInSnapshot_CoordinatorWriteOmissionOnReadOnlyDisabled_ShouldNotPrepareRecordsAndCommitRecordsButShouldCommitState( + withSnapshotHook); + + // Assert + verify(groupCommitter, never()).remove(anyId()); + } + @ParameterizedTest @ValueSource(booleans = {false, true}) @Override diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfigTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfigTest.java index ff9e41652e..d93fd0ad8a 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfigTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfigTest.java @@ -27,8 +27,9 @@ public void constructor_NoPropertiesGiven_ShouldLoadAsDefaultValues() { assertThat(config.isParallelRollbackEnabled()).isTrue(); assertThat(config.isAsyncCommitEnabled()).isFalse(); assertThat(config.isAsyncRollbackEnabled()).isFalse(); - assertThat(config.isIncludeMetadataEnabled()).isFalse(); assertThat(config.isParallelImplicitPreReadEnabled()).isTrue(); + assertThat(config.isCoordinatorWriteOmissionOnReadOnlyEnabled()).isTrue(); + assertThat(config.isIncludeMetadataEnabled()).isFalse(); } @Test @@ -154,16 +155,18 @@ public void constructor_AsyncExecutionRelatedPropertiesGiven_ShouldLoadProperly( } @Test - public void constructor_PropertiesWithIncludeMetadataEnabledGiven_ShouldLoadProperly() { + public void + constructor_PropertiesWithCoordinatorWriteOmissionOnReadOnlyEnabledGiven_ShouldLoadProperly() { // Arrange Properties props = new Properties(); - props.setProperty(ConsensusCommitConfig.INCLUDE_METADATA_ENABLED, "true"); + props.setProperty( + ConsensusCommitConfig.COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED, "false"); // Act ConsensusCommitConfig config = new ConsensusCommitConfig(new DatabaseConfig(props)); // Assert - assertThat(config.isIncludeMetadataEnabled()).isTrue(); + assertThat(config.isCoordinatorWriteOmissionOnReadOnlyEnabled()).isFalse(); } @Test @@ -178,4 +181,17 @@ public void constructor_PropertiesWithParallelImplicitPreReadEnabledGiven_Should // Assert assertThat(config.isParallelImplicitPreReadEnabled()).isFalse(); } + + @Test + public void constructor_PropertiesWithIncludeMetadataEnabledGiven_ShouldLoadProperly() { + // Arrange + Properties props = new Properties(); + props.setProperty(ConsensusCommitConfig.INCLUDE_METADATA_ENABLED, "true"); + + // Act + ConsensusCommitConfig config = new ConsensusCommitConfig(new DatabaseConfig(props)); + + // Assert + assertThat(config.isIncludeMetadataEnabled()).isTrue(); + } } diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java index 1a84d83d41..67c5e4f204 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java @@ -158,9 +158,9 @@ private CommitHandler createCommitHandler( @Nullable CoordinatorGroupCommitter groupCommitter) { if (groupCommitter != null) { return new CommitHandlerWithGroupCommit( - storage, coordinator, tableMetadataManager, parallelExecutor, groupCommitter); + storage, coordinator, tableMetadataManager, parallelExecutor, true, groupCommitter); } else { - return new CommitHandler(storage, coordinator, tableMetadataManager, parallelExecutor); + return new CommitHandler(storage, coordinator, tableMetadataManager, parallelExecutor, true); } } diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java index 16c797a5fd..159644098a 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java @@ -187,9 +187,9 @@ private CommitHandler createCommitHandler( @Nullable CoordinatorGroupCommitter groupCommitter) { if (groupCommitter != null) { return new CommitHandlerWithGroupCommit( - storage, coordinator, tableMetadataManager, parallelExecutor, groupCommitter); + storage, coordinator, tableMetadataManager, parallelExecutor, true, groupCommitter); } else { - return new CommitHandler(storage, coordinator, tableMetadataManager, parallelExecutor); + return new CommitHandler(storage, coordinator, tableMetadataManager, parallelExecutor, true); } } From e783b136e64f5fafe88a183c8e88b754fcba100c Mon Sep 17 00:00:00 2001 From: brfrn169 Date: Tue, 17 Jun 2025 19:45:16 +0900 Subject: [PATCH 5/6] Fix based on feedback --- .../scalar/db/transaction/consensuscommit/CommitHandler.java | 4 ++-- .../consensuscommit/CommitHandlerWithGroupCommit.java | 4 +--- .../com/scalar/db/transaction/consensuscommit/Snapshot.java | 4 ++-- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java index 6859687fe7..de21b4b500 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java @@ -111,7 +111,7 @@ private void waitBeforePreparationSnapshotHookFuture( public void commit(Snapshot snapshot, boolean readOnly) throws CommitException, UnknownTransactionStatusException { - boolean hasSomeWritesOrDeletesInSnapshot = !readOnly && snapshot.hasSomeWritesOrDeletes(); + boolean hasSomeWritesOrDeletesInSnapshot = !readOnly && snapshot.hasWritesOrDeletes(); Optional> snapshotHookFuture = invokeBeforePreparationSnapshotHook(snapshot); @@ -132,7 +132,7 @@ public void commit(Snapshot snapshot, boolean readOnly) } } - if (snapshot.hasSomeReads()) { + if (snapshot.hasReads()) { try { validate(snapshot); } catch (ValidationException e) { diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java index 2c749519f7..ca7fcdb936 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java @@ -47,9 +47,7 @@ public CommitHandlerWithGroupCommit( @Override public void commit(Snapshot snapshot, boolean readOnly) throws CommitException, UnknownTransactionStatusException { - if (!readOnly - && !snapshot.hasSomeWritesOrDeletes() - && coordinatorWriteOmissionOnReadOnlyEnabled) { + if (!readOnly && !snapshot.hasWritesOrDeletes() && coordinatorWriteOmissionOnReadOnlyEnabled) { cancelGroupCommitIfNeeded(snapshot.getId()); } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java index 4507575932..152ce2a1c5 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java @@ -217,11 +217,11 @@ public boolean containsKeyInGetSet(Get get) { return getSet.containsKey(get); } - public boolean hasSomeWritesOrDeletes() { + public boolean hasWritesOrDeletes() { return !writeSet.isEmpty() || !deleteSet.isEmpty(); } - public boolean hasSomeReads() { + public boolean hasReads() { return !getSet.isEmpty() || !scanSet.isEmpty() || !scannerSet.isEmpty(); } From 2c13b1cc50509e8850b506c50fc98883ebafe6ab Mon Sep 17 00:00:00 2001 From: brfrn169 Date: Wed, 18 Jun 2025 11:40:43 +0900 Subject: [PATCH 6/6] [skip ci] Fix based on feedback --- .../transaction/consensuscommit/CommitHandler.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java index de21b4b500..9268905d53 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java @@ -111,11 +111,11 @@ private void waitBeforePreparationSnapshotHookFuture( public void commit(Snapshot snapshot, boolean readOnly) throws CommitException, UnknownTransactionStatusException { - boolean hasSomeWritesOrDeletesInSnapshot = !readOnly && snapshot.hasWritesOrDeletes(); + boolean hasWritesOrDeletesInSnapshot = !readOnly && snapshot.hasWritesOrDeletes(); Optional> snapshotHookFuture = invokeBeforePreparationSnapshotHook(snapshot); - if (hasSomeWritesOrDeletesInSnapshot) { + if (hasWritesOrDeletesInSnapshot) { try { prepare(snapshot); } catch (PreparationException e) { @@ -140,10 +140,10 @@ public void commit(Snapshot snapshot, boolean readOnly) // If the transaction has no writes and deletes, we don't need to abort-state and // rollback-records since there are no changes to be made. - if (hasSomeWritesOrDeletesInSnapshot || !coordinatorWriteOmissionOnReadOnlyEnabled) { + if (hasWritesOrDeletesInSnapshot || !coordinatorWriteOmissionOnReadOnlyEnabled) { abortState(snapshot.getId()); } - if (hasSomeWritesOrDeletesInSnapshot) { + if (hasWritesOrDeletesInSnapshot) { rollbackRecords(snapshot); } @@ -159,10 +159,10 @@ public void commit(Snapshot snapshot, boolean readOnly) waitBeforePreparationSnapshotHookFuture(snapshot, snapshotHookFuture.orElse(null)); - if (hasSomeWritesOrDeletesInSnapshot || !coordinatorWriteOmissionOnReadOnlyEnabled) { + if (hasWritesOrDeletesInSnapshot || !coordinatorWriteOmissionOnReadOnlyEnabled) { commitState(snapshot); } - if (hasSomeWritesOrDeletesInSnapshot) { + if (hasWritesOrDeletesInSnapshot) { commitRecords(snapshot); } }