From fad32da447fd30ebaf162fb43b4c75af5a81751a Mon Sep 17 00:00:00 2001 From: brfrn169 Date: Tue, 3 Jun 2025 11:30:16 +0900 Subject: [PATCH] Prevent overwriting read set in Consensus Commit --- .../consensuscommit/CrudHandler.java | 14 ++- .../consensuscommit/CrudHandlerTest.java | 39 +++++++++ ...nsusCommitSpecificIntegrationTestBase.java | 87 +++++++++++++++++++ 3 files changed, 131 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java index 9ce5ec289c..edaee36e69 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java @@ -164,11 +164,10 @@ Optional read(@Nullable Snapshot.Key key, Get get) throws Cru } if (result.isPresent() || get.getConjunctions().isEmpty()) { - // Keep the read set latest to create before image by using the latest record (result) - // because another conflicting transaction might have updated the record after this - // transaction read it first. However, we update it only if a get operation has no - // conjunction or the result exists. This is because we don’t know whether the record - // actually exists or not due to the conjunction. + // We put the result into the read set only if a get operation has no conjunction or the + // result exists. This is because we don’t know whether the record actually exists or not + // due to the conjunction. + if (key != null) { putIntoReadSetInSnapshot(key, result); } else { @@ -277,9 +276,6 @@ private Optional processScanResult( } if (ret.isPresent()) { - // We always update the read set to create before image by using the latest record (result) - // because another conflicting transaction might have updated the record after this - // transaction read it first. putIntoReadSetInSnapshot(key, ret); } @@ -319,7 +315,7 @@ public void closeScanners() throws CrudException { private void putIntoReadSetInSnapshot(Snapshot.Key key, Optional result) { // In read-only mode, we don't need to put the result into the read set - if (!readOnly) { + if (!readOnly && !snapshot.containsKeyInReadSet(key)) { snapshot.putIntoReadSet(key, result); } } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java index 5fbf1bf9a5..5098c1d93e 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java @@ -541,6 +541,45 @@ public void get_ForNonExistingTable_ShouldThrowIllegalArgumentException() assertThatThrownBy(() -> handler.get(get)).isInstanceOf(IllegalArgumentException.class); } + @Test + public void get_DifferentGetButSameRecordReturned_ShouldNotOverwriteReadSet() + throws ExecutionException, CrudException { + // Arrange + Get get1 = prepareGet(); + Get get2 = Get.newBuilder(get1).where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)).build(); + Get getForStorage1 = toGetForStorageFrom(get1); + Get getForStorage2 = + Get.newBuilder(get2) + .clearProjections() + .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames()) + .clearConditions() + .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .consistency(Consistency.LINEARIZABLE) + .build(); + Result result = prepareResult(TransactionState.COMMITTED); + Optional expected = Optional.of(new TransactionResult(result)); + Snapshot.Key key = new Snapshot.Key(getForStorage1); + when(snapshot.getResult(any(), any())).thenReturn(expected).thenReturn(expected); + when(snapshot.containsKeyInReadSet(key)).thenReturn(false).thenReturn(true); + when(storage.get(any())).thenReturn(Optional.of(result)); + + // Act + Optional results1 = handler.get(get1); + Optional results2 = handler.get(get2); + + // Assert + assertThat(results1) + .isEqualTo( + Optional.of( + new FilteredResult( + expected.get(), Collections.emptyList(), TABLE_METADATA, false))); + assertThat(results2).isEqualTo(results1); + verify(storage).get(getForStorage1); + verify(storage).get(getForStorage2); + verify(snapshot).putIntoReadSet(key, expected); + } + @ParameterizedTest @EnumSource(ScanType.class) void scanOrGetScanner_ResultGivenFromStorage_ShouldUpdateSnapshotAndReturn(ScanType scanType) diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java index 13c2073c26..7230dc6c0e 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java @@ -1,6 +1,7 @@ package com.scalar.db.transaction.consensuscommit; import static com.scalar.db.api.ConditionBuilder.column; +import static com.scalar.db.api.ConditionBuilder.updateIf; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -45,6 +46,7 @@ import com.scalar.db.exception.transaction.CrudConflictException; import com.scalar.db.exception.transaction.CrudException; import com.scalar.db.exception.transaction.PreparationConflictException; +import com.scalar.db.exception.transaction.RollbackException; import com.scalar.db.exception.transaction.TransactionException; import com.scalar.db.exception.transaction.UnknownTransactionStatusException; import com.scalar.db.io.DataType; @@ -5893,6 +5895,91 @@ public void getScanner_InReadOnlyMode_WithSerializable_ShouldNotThrowAnyExceptio assertThat(results.get(1).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); } + @Test + public void + commit_ConflictingExternalUpdate_DifferentGetButSameRecordReturned_ShouldThrowCommitConflictExceptionAndPreserveExternalChanges() + throws UnknownTransactionStatusException, CrudException, RollbackException { + // Arrange + manager.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + + // Act Assert + DistributedTransaction transaction = manager.begin(); + + // Retrieve the record + Optional result = + transaction.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build()); + + assertThat(result).isPresent(); + assertThat(result.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + // Update the balance of the record + transaction.update( + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .condition(updateIf(column(BALANCE).isEqualToInt(INITIAL_BALANCE)).build()) + .intValue(BALANCE, 100) + .build()); + + // Update the balance of the record by another transaction + manager.update( + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 200) + .build()); + + // Retrieve the record again, but use a different Get object (with a where clause) + result = + transaction.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .where(column(BALANCE).isEqualToInt(200)) + .build()); + + assertThat(result).isNotPresent(); + + assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); + transaction.rollback(); + + // Assert + result = + manager.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build()); + + assertThat(result).isPresent(); + assertThat(result.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result.get().getInt(BALANCE)).isEqualTo(200); + } + @Test public void manager_get_GetGivenForCommittedRecord_WithSerializable_ShouldReturnRecord() throws TransactionException {