From 59ae1f45aaa9d583f82b8a13d1821c2daa378d8b Mon Sep 17 00:00:00 2001 From: brfrn169 Date: Sun, 15 Jun 2025 03:31:59 +0900 Subject: [PATCH 1/2] Handle Scan with Index correctly during validation logic --- .../transaction/consensuscommit/Snapshot.java | 27 +- .../consensuscommit/SnapshotTest.java | 151 ++++- ...nsusCommitSpecificIntegrationTestBase.java | 515 +++++++++++++++++- 3 files changed, 655 insertions(+), 38 deletions(-) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java index f01b5397d3..9f72ce41b8 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java @@ -640,6 +640,15 @@ private void validateScanResults( // Compare the records of the original scan results and the latest scan results if (!originalResultEntry.getKey().equals(key)) { + if (writeSet.containsKey(originalResultEntry.getKey()) + || deleteSet.containsKey(originalResultEntry.getKey())) { + // The record is inserted/deleted/updated by this transaction + + // Skip the record of the original scan results + originalResultEntry = Iterators.getNext(originalResultIterator, null); + continue; + } + // The record is inserted/deleted by another transaction throwExceptionDueToAntiDependency(); } @@ -653,9 +662,21 @@ private void validateScanResults( originalResultEntry = Iterators.getNext(originalResultIterator, null); } - if (originalResultEntry != null) { - // Some of the records of the scan results are deleted by another transaction - throwExceptionDueToAntiDependency(); + while (originalResultEntry != null) { + if (writeSet.containsKey(originalResultEntry.getKey()) + || deleteSet.containsKey(originalResultEntry.getKey())) { + // The record is inserted/deleted/updated by this transaction + + // Skip the record of the original scan results + originalResultEntry = Iterators.getNext(originalResultIterator, null); + } else { + // The record is inserted/deleted by another transaction + throwExceptionDueToAntiDependency(); + } + } + + if (!latestResult.isPresent()) { + return; } if (scan.getLimit() != 0 && results.size() == scan.getLimit()) { diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java index 06b5d0aa1c..a0f4765ecd 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java @@ -244,14 +244,19 @@ private Scan prepareCrossPartitionScan(String namespace, String table) { } private Put preparePut() { - Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1); - Key clusteringKey = new Key(ANY_NAME_2, ANY_TEXT_2); - return new Put(partitionKey, clusteringKey) - .withConsistency(Consistency.LINEARIZABLE) - .forNamespace(ANY_NAMESPACE_NAME) - .forTable(ANY_TABLE_NAME) - .withValue(ANY_NAME_3, ANY_TEXT_3) - .withValue(ANY_NAME_4, ANY_TEXT_4); + return preparePut(ANY_TEXT_1, ANY_TEXT_2); + } + + private Put preparePut(String partitionKeyColumnValue, String clusteringKeyColumnValue) { + return Put.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, partitionKeyColumnValue)) + .clusteringKey(Key.ofText(ANY_NAME_2, clusteringKeyColumnValue)) + .textValue(ANY_NAME_3, ANY_TEXT_3) + .textValue(ANY_NAME_4, ANY_TEXT_4) + .consistency(Consistency.LINEARIZABLE) + .build(); } private Put prepareAnotherPut() { @@ -301,12 +306,17 @@ private Put preparePutForMergeTest() { } private Delete prepareDelete() { - Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1); - Key clusteringKey = new Key(ANY_NAME_2, ANY_TEXT_2); - return new Delete(partitionKey, clusteringKey) - .withConsistency(Consistency.LINEARIZABLE) - .forNamespace(ANY_NAMESPACE_NAME) - .forTable(ANY_TABLE_NAME); + return prepareDelete(ANY_TEXT_1, ANY_TEXT_2); + } + + private Delete prepareDelete(String partitionKeyColumnValue, String clusteringKeyColumnValue) { + return Delete.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, partitionKeyColumnValue)) + .clusteringKey(Key.ofText(ANY_NAME_2, clusteringKeyColumnValue)) + .consistency(Consistency.LINEARIZABLE) + .build(); } private Delete prepareAnotherDelete() { @@ -1086,7 +1096,7 @@ public void toSerializable_GetSetWithGetWithIndex_ShouldProcessWithoutExceptions @Test public void - toSerializable_GetSetWithGetWithIndex_RecordInsertedByMySelf_ShouldProcessWithoutExceptions() + toSerializable_GetSetWithGetWithIndex_RecordInsertedByMyself_ShouldProcessWithoutExceptions() throws ExecutionException { // Arrange snapshot = prepareSnapshot(Isolation.SERIALIZABLE); @@ -1160,7 +1170,7 @@ public void toSerializable_ScanSetUpdated_ShouldThrowValidationConflictException } @Test - public void toSerializable_ScanSetUpdatedByMySelf_ShouldProcessWithoutExceptions() + public void toSerializable_ScanSetUpdatedByMyself_ShouldProcessWithoutExceptions() throws ExecutionException { // Arrange snapshot = prepareSnapshot(Isolation.SERIALIZABLE); @@ -1237,7 +1247,7 @@ public void toSerializable_ScanSetExtended_ShouldThrowValidationConflictExceptio } @Test - public void toSerializable_ScanSetExtendedByMySelf_ShouldProcessWithoutExceptions() + public void toSerializable_ScanSetExtendedByMyself_ShouldProcessWithoutExceptions() throws ExecutionException { // Arrange snapshot = prepareSnapshot(Isolation.SERIALIZABLE); @@ -1261,7 +1271,7 @@ public void toSerializable_ScanSetExtendedByMySelf_ShouldProcessWithoutException @Test public void - toSerializable_ScanSetWithMultipleRecordsExtendedByMySelf_ShouldProcessWithoutExceptions() + toSerializable_ScanSetWithMultipleRecordsExtendedByMyself_ShouldProcessWithoutExceptions() throws ExecutionException { // Arrange snapshot = prepareSnapshot(Isolation.SERIALIZABLE); @@ -1526,7 +1536,7 @@ public void toSerializable_ScanWithLimitInScanSet_ShouldProcessWithoutExceptions @Test public void - toSerializable_ScanWithLimitInScanSet_WhenInsertingFirstRecordIntoScanRangeByMySelf_ShouldProcessWithoutExceptions() + toSerializable_ScanWithLimitInScanSet_WhenInsertingFirstRecordIntoScanRangeByMyself_ShouldProcessWithoutExceptions() throws ExecutionException { // Arrange snapshot = prepareSnapshot(Isolation.SERIALIZABLE); @@ -1589,7 +1599,7 @@ public void toSerializable_ScanWithLimitInScanSet_ShouldProcessWithoutExceptions @Test public void - toSerializable_ScanWithLimitInScanSet_WhenInsertingLastRecordIntoScanRangeByMySelf_ShouldProcessWithoutExceptions() + toSerializable_ScanWithLimitInScanSet_WhenInsertingLastRecordIntoScanRangeByMyself_ShouldProcessWithoutExceptions() throws ExecutionException { // Arrange snapshot = prepareSnapshot(Isolation.SERIALIZABLE); @@ -1619,6 +1629,107 @@ public void toSerializable_ScanWithLimitInScanSet_ShouldProcessWithoutExceptions verify(storage).scan(scanWithProjectionsWithoutLimit); } + @Test + public void + toSerializable_ScanWithIndexInScanSet_WhenUpdatingRecords_ShouldThrowValidationConflictException() + throws ExecutionException { + // Arrange + snapshot = prepareSnapshot(Isolation.SERIALIZABLE); + Scan scan = prepareScanWithIndex(); + TransactionResult result1 = prepareResult(ANY_ID + "x", ANY_TEXT_1, ANY_TEXT_1); + TransactionResult result2 = prepareResult(ANY_ID + "x", ANY_TEXT_2, ANY_TEXT_1); + TransactionResult result3 = prepareResult(ANY_ID + "x", ANY_TEXT_3, ANY_TEXT_1); + Snapshot.Key key1 = new Snapshot.Key(scan, result1); + Snapshot.Key key2 = new Snapshot.Key(scan, result2); + Snapshot.Key key3 = new Snapshot.Key(scan, result3); + snapshot.putIntoScanSet( + scan, Maps.newLinkedHashMap(ImmutableMap.of(key1, result1, key2, result2, key3, result3))); + + // Simulate that the first and second records were updated by another transaction + Scanner scanner = mock(Scanner.class); + when(scanner.one()).thenReturn(Optional.of(result2)).thenReturn(Optional.empty()); + + DistributedStorage storage = mock(DistributedStorage.class); + Scan scanWithProjections = + Scan.newBuilder(scan).projections(Attribute.ID, ANY_NAME_1, ANY_NAME_2).limit(0).build(); + when(storage.scan(scanWithProjections)).thenReturn(scanner); + + // Act Assert + assertThatThrownBy(() -> snapshot.toSerializable(storage)) + .isInstanceOf(ValidationConflictException.class); + + // Assert + verify(storage).scan(scanWithProjections); + } + + @Test + public void + toSerializable_ScanWithIndexInScanSet_WhenUpdatingRecordsByMyself_ShouldProcessWithoutExceptions() + throws ExecutionException { + // Arrange + snapshot = prepareSnapshot(Isolation.SERIALIZABLE); + Scan scan = prepareScanWithIndex(); + TransactionResult result1 = prepareResult(ANY_ID + "x", ANY_TEXT_1, ANY_TEXT_1); + TransactionResult result2 = prepareResult(ANY_ID + "x", ANY_TEXT_2, ANY_TEXT_1); + TransactionResult result3 = prepareResult(ANY_ID + "x", ANY_TEXT_3, ANY_TEXT_1); + Snapshot.Key key1 = new Snapshot.Key(scan, result1); + Snapshot.Key key2 = new Snapshot.Key(scan, result2); + Snapshot.Key key3 = new Snapshot.Key(scan, result3); + snapshot.putIntoScanSet( + scan, Maps.newLinkedHashMap(ImmutableMap.of(key1, result1, key2, result2, key3, result3))); + + // Simulate that the first and second records were updated by myself + snapshot.putIntoWriteSet(key1, preparePut(ANY_TEXT_1, ANY_TEXT_1)); + snapshot.putIntoWriteSet(key3, preparePut(ANY_TEXT_3, ANY_TEXT_1)); + Scanner scanner = mock(Scanner.class); + when(scanner.one()).thenReturn(Optional.of(result2)).thenReturn(Optional.empty()); + + DistributedStorage storage = mock(DistributedStorage.class); + Scan scanWithProjections = + Scan.newBuilder(scan).projections(Attribute.ID, ANY_NAME_1, ANY_NAME_2).limit(0).build(); + when(storage.scan(scanWithProjections)).thenReturn(scanner); + + // Act Assert + assertThatCode(() -> snapshot.toSerializable(storage)).doesNotThrowAnyException(); + + // Assert + verify(storage).scan(scanWithProjections); + } + + @Test + public void + toSerializable_ScanWithIndexInScanSet_WhenDeletingRecordsByMyself_ShouldProcessWithoutExceptions() + throws ExecutionException { + // Arrange + snapshot = prepareSnapshot(Isolation.SERIALIZABLE); + Scan scan = prepareScanWithIndex(); + TransactionResult result1 = prepareResult(ANY_ID + "x", ANY_TEXT_1, ANY_TEXT_1); + TransactionResult result2 = prepareResult(ANY_ID + "x", ANY_TEXT_2, ANY_TEXT_1); + TransactionResult result3 = prepareResult(ANY_ID + "x", ANY_TEXT_3, ANY_TEXT_1); + Snapshot.Key key1 = new Snapshot.Key(scan, result1); + Snapshot.Key key2 = new Snapshot.Key(scan, result2); + Snapshot.Key key3 = new Snapshot.Key(scan, result3); + snapshot.putIntoScanSet( + scan, Maps.newLinkedHashMap(ImmutableMap.of(key1, result1, key2, result2, key3, result3))); + + // Simulate that the first and second records were deleted by myself + snapshot.putIntoDeleteSet(key1, prepareDelete(ANY_TEXT_1, ANY_TEXT_1)); + snapshot.putIntoDeleteSet(key3, prepareDelete(ANY_TEXT_3, ANY_TEXT_1)); + Scanner scanner = mock(Scanner.class); + when(scanner.one()).thenReturn(Optional.of(result2)).thenReturn(Optional.empty()); + + DistributedStorage storage = mock(DistributedStorage.class); + Scan scanWithProjections = + Scan.newBuilder(scan).projections(Attribute.ID, ANY_NAME_1, ANY_NAME_2).limit(0).build(); + when(storage.scan(scanWithProjections)).thenReturn(scanner); + + // Act Assert + assertThatCode(() -> snapshot.toSerializable(storage)).doesNotThrowAnyException(); + + // Assert + verify(storage).scan(scanWithProjections); + } + @Test public void toSerializable_ScannerSetNotChanged_ShouldProcessWithoutExceptions() throws ExecutionException { diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java index de122da131..17f2c99908 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java @@ -73,7 +73,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.condition.EnabledIf; @@ -4628,7 +4627,7 @@ void scan_RecordUpdatedByAnotherTransaction_WithSerializable_ShouldThrowCommitCo } @Test - void scan_RecordUpdatedByMySelf_WithSerializable_ShouldNotThrowAnyException() + void scan_RecordUpdatedByMyself_WithSerializable_ShouldNotThrowAnyException() throws TransactionException { // Arrange ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); @@ -4723,7 +4722,7 @@ void scan_RecordUpdatedByMySelf_WithSerializable_ShouldNotThrowAnyException() } @Test - void scan_FirstRecordInsertedByMySelf_WithSerializable_ShouldNotThrowAnyException() + void scan_FirstRecordInsertedByMyself_WithSerializable_ShouldNotThrowAnyException() throws TransactionException { // Arrange ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); @@ -4818,7 +4817,7 @@ void scan_FirstRecordInsertedByMySelf_WithSerializable_ShouldNotThrowAnyExceptio } @Test - void scan_LastRecordInsertedByMySelf_WithSerializable_ShouldNotThrowAnyException() + void scan_LastRecordInsertedByMyself_WithSerializable_ShouldNotThrowAnyException() throws TransactionException { // Arrange ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); @@ -4912,7 +4911,7 @@ void scan_LastRecordInsertedByMySelf_WithSerializable_ShouldNotThrowAnyException } @Test - void scan_FirstRecordDeletedByMySelf_WithSerializable_ShouldNotThrowAnyException() + void scan_FirstRecordDeletedByMyself_WithSerializable_ShouldNotThrowAnyException() throws TransactionException { // Arrange ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); @@ -5111,7 +5110,7 @@ void scan_ScanWithLimitGiven_WithSerializable_ShouldNotThrowAnyException() @Test void - scan_ScanWithLimitGiven_FirstRecordInsertedByMySelf_WithSerializable_ShouldNotThrowAnyException() + scan_ScanWithLimitGiven_FirstRecordInsertedByMyself_WithSerializable_ShouldNotThrowAnyException() throws TransactionException { // Arrange ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); @@ -5219,7 +5218,7 @@ void scan_ScanWithLimitGiven_WithSerializable_ShouldNotThrowAnyException() @Test void - scan_ScanWithLimitGiven_LastRecordInsertedByMySelf_WithSerializable_ShouldNotThrowAnyException() + scan_ScanWithLimitGiven_LastRecordInsertedByMyself_WithSerializable_ShouldNotThrowAnyException() throws TransactionException { // Arrange ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); @@ -5458,6 +5457,308 @@ void scan_ScanWithIndexGiven_WithSerializable_ShouldNotThrowAnyException() assertThatCode(transaction::commit).doesNotThrowAnyException(); } + @Test + void + scan_ScanWithIndexGiven_RecordUpdatedByAnotherTransaction_WithSerializable_ShouldThrowCommitConflictException() + throws TransactionException { + // Arrange + ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 2)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 3)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 4)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + // Act Assert + DistributedTransaction transaction = manager.begin(); + List results = + transaction.scan( + Scan.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) + .build()); + + assertThat(results).hasSize(5); + + Set expectedIds = Sets.newHashSet(0, 1, 2, 3, 4); + for (Result result : results) { + expectedIds.remove(result.getInt(ACCOUNT_ID)); + assertThat(result.getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result.getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + } + assertThat(expectedIds).isEmpty(); + + // The record is updated by another transaction + manager.update( + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 100) + .build()); + + assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); + } + + @Test + void scan_ScanWithIndexGiven_RecordUpdatedByMyself_WithSerializable_ShouldNotThrowAnyException() + throws TransactionException { + // Arrange + ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 2)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 3)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 4)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + // Act Assert + DistributedTransaction transaction = manager.begin(); + List results = + transaction.scan( + Scan.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) + .build()); + + assertThat(results).hasSize(5); + + Set expectedIds = Sets.newHashSet(0, 1, 2, 3, 4); + for (Result result : results) { + expectedIds.remove(result.getInt(ACCOUNT_ID)); + assertThat(result.getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result.getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + } + assertThat(expectedIds).isEmpty(); + + transaction.update( + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 100) + .build()); + + assertThatCode(transaction::commit).doesNotThrowAnyException(); + } + + @Test + void + scan_ScanWithIndexGiven_RecordDeletedByAnotherTransaction_WithSerializable_ShouldThrowCommitConflictException() + throws TransactionException { + // Arrange + ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 2)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 3)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 4)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + // Act Assert + DistributedTransaction transaction = manager.begin(); + List results = + transaction.scan( + Scan.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) + .build()); + + assertThat(results).hasSize(5); + + Set expectedIds = Sets.newHashSet(0, 1, 2, 3, 4); + for (Result result : results) { + expectedIds.remove(result.getInt(ACCOUNT_ID)); + assertThat(result.getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result.getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + } + assertThat(expectedIds).isEmpty(); + + // The record is deleted by another transaction + manager.delete( + Delete.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build()); + + assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); + } + + @Test + void scan_ScanWithIndexGiven_RecordDeletedByMyself_WithSerializable_ShouldNotThrowAnyException() + throws TransactionException { + // Arrange + ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 2)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 3)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 4)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + // Act Assert + DistributedTransaction transaction = manager.begin(); + List results = + transaction.scan( + Scan.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) + .build()); + + assertThat(results).hasSize(5); + + Set expectedIds = Sets.newHashSet(0, 1, 2, 3, 4); + for (Result result : results) { + expectedIds.remove(result.getInt(ACCOUNT_ID)); + assertThat(result.getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result.getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + } + assertThat(expectedIds).isEmpty(); + + transaction.delete( + Delete.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build()); + + assertThatCode(transaction::commit).doesNotThrowAnyException(); + } + @Test void scan_ScanWithIndexWithLimitGiven_WithSerializable_ShouldNotThrowAnyException() throws TransactionException { @@ -5557,6 +5858,172 @@ void get_GetWithIndexGiven_WithSerializable_ShouldNotThrowAnyException() assertThatCode(transaction::commit).doesNotThrowAnyException(); } + @Test + void + get_GetWithIndexGiven_RecordUpdatedByAnotherTransaction_WithSerializable_ShouldThrowCommitConflictException() + throws TransactionException { + // Arrange + ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); + manager.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + + // Act Assert + DistributedTransaction transaction = manager.begin(); + Optional actual = + transaction.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) + .build()); + + assertThat(actual).isPresent(); + assertThat(actual.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(actual.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(actual.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + // The record is updated by another transaction + manager.update( + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 100) + .build()); + + assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); + } + + @Test + void get_GetWithIndexGiven_RecordUpdatedByMyself_WithSerializable_ShouldNotThrowAnyException() + throws TransactionException { + // Arrange + ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); + manager.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + + // Act Assert + DistributedTransaction transaction = manager.begin(); + Optional actual = + transaction.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) + .build()); + + assertThat(actual).isPresent(); + assertThat(actual.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(actual.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(actual.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + transaction.update( + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 100) + .build()); + + assertThatCode(transaction::commit).doesNotThrowAnyException(); + } + + @Test + void + get_GetWithIndexGiven_RecordDeletedByAnotherTransaction_WithSerializable_ShouldThrowCommitConflictException() + throws TransactionException { + // Arrange + ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); + manager.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + + // Act Assert + DistributedTransaction transaction = manager.begin(); + Optional actual = + transaction.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) + .build()); + + assertThat(actual).isPresent(); + assertThat(actual.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(actual.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(actual.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + // The record is deleted by another transaction + manager.delete( + Delete.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build()); + + assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); + } + + @Test + void get_GetWithIndexGiven_RecordDeletedByMyself_WithSerializable_ShouldNotThrowAnyException() + throws TransactionException { + // Arrange + ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); + manager.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + + // Act Assert + DistributedTransaction transaction = manager.begin(); + Optional actual = + transaction.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) + .build()); + + assertThat(actual).isPresent(); + assertThat(actual.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(actual.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(actual.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + transaction.delete( + Delete.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build()); + + assertThatCode(transaction::commit).doesNotThrowAnyException(); + } + @Test void getScanner_WithSerializable_ShouldNotThrowAnyException() throws TransactionException { // Arrange @@ -5797,7 +6264,7 @@ void get_GetWithIndexGiven_NoRecordsInIndexRange_WithSerializable_ShouldNotThrow @Test void - get_GetWithIndexGiven_RecordInsertedIntoIndexRangeByMySelf_WithSerializable_ShouldNotThrowAnyException() + get_GetWithIndexGiven_RecordInsertedIntoIndexRangeByMyself_WithSerializable_ShouldNotThrowAnyException() throws TransactionException { // Arrange ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); @@ -5883,7 +6350,7 @@ void get_GetWithIndexGiven_NoRecordsInIndexRange_WithSerializable_ShouldNotThrow @Test void - get_GetWithIndexGiven_NoRecordsInIndexRange_RecordInsertedIntoIndexRangeByMySelf_WithSerializable_ShouldNotThrowAnyException() + get_GetWithIndexGiven_NoRecordsInIndexRange_RecordInsertedIntoIndexRangeByMyself_WithSerializable_ShouldNotThrowAnyException() throws TransactionException { // Arrange ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); @@ -5945,7 +6412,6 @@ void get_GetWithIndexGiven_NoRecordsInIndexRange_WithSerializable_ShouldNotThrow assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); } - @Disabled("Fix later") @ParameterizedTest @EnumSource(Isolation.class) void getAndUpdate_GetWithIndexGiven_ShouldUpdate(Isolation isolation) @@ -6001,7 +6467,6 @@ void getAndUpdate_GetWithIndexGiven_ShouldUpdate(Isolation isolation) assertThat(actual.get().getInt(BALANCE)).isEqualTo(1); } - @Disabled("Fix later") @ParameterizedTest @EnumSource(Isolation.class) void scanAndUpdate_ScanWithIndexGiven_ShouldUpdate(Isolation isolation) @@ -6023,6 +6488,13 @@ void scanAndUpdate_ScanWithIndexGiven_ShouldUpdate(Isolation isolation) .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) + .intValue(BALANCE, INITIAL_BALANCE) .build())); // Act Assert @@ -6035,8 +6507,8 @@ void scanAndUpdate_ScanWithIndexGiven_ShouldUpdate(Isolation isolation) .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) .build()); - assertThat(results).hasSize(2); - Set expectedTypes = Sets.newHashSet(0, 1); + assertThat(results).hasSize(3); + Set expectedTypes = Sets.newHashSet(0, 1, 2); for (Result result : results) { assertThat(result.getInt(ACCOUNT_ID)).isEqualTo(0); expectedTypes.remove(result.getInt(ACCOUNT_TYPE)); @@ -6057,7 +6529,7 @@ void scanAndUpdate_ScanWithIndexGiven_ShouldUpdate(Isolation isolation) .namespace(namespace1) .table(TABLE_1) .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) .intValue(BALANCE, 2) .build()); @@ -6080,6 +6552,14 @@ void scanAndUpdate_ScanWithIndexGiven_ShouldUpdate(Isolation isolation) .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) .build()); + Optional actual3 = + transaction.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) + .build()); transaction.commit(); assertThat(actual1).isPresent(); @@ -6090,7 +6570,12 @@ void scanAndUpdate_ScanWithIndexGiven_ShouldUpdate(Isolation isolation) assertThat(actual2).isPresent(); assertThat(actual2.get().getInt(ACCOUNT_ID)).isEqualTo(0); assertThat(actual2.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); - assertThat(actual2.get().getInt(BALANCE)).isEqualTo(2); + assertThat(actual2.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + assertThat(actual3).isPresent(); + assertThat(actual3.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(actual3.get().getInt(ACCOUNT_TYPE)).isEqualTo(2); + assertThat(actual3.get().getInt(BALANCE)).isEqualTo(2); } @ParameterizedTest From 3b82c928131edef04353c8e66353b193e45a06c0 Mon Sep 17 00:00:00 2001 From: brfrn169 Date: Fri, 20 Jun 2025 15:45:40 +0900 Subject: [PATCH 2/2] [skip ci] Fix --- .../scalar/db/transaction/consensuscommit/SnapshotTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java index a0f4765ecd..b0fc3896e9 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java @@ -1645,7 +1645,7 @@ public void toSerializable_ScanWithLimitInScanSet_ShouldProcessWithoutExceptions snapshot.putIntoScanSet( scan, Maps.newLinkedHashMap(ImmutableMap.of(key1, result1, key2, result2, key3, result3))); - // Simulate that the first and second records were updated by another transaction + // Simulate that the first and third records were updated by another transaction Scanner scanner = mock(Scanner.class); when(scanner.one()).thenReturn(Optional.of(result2)).thenReturn(Optional.empty()); @@ -1678,7 +1678,7 @@ public void toSerializable_ScanWithLimitInScanSet_ShouldProcessWithoutExceptions snapshot.putIntoScanSet( scan, Maps.newLinkedHashMap(ImmutableMap.of(key1, result1, key2, result2, key3, result3))); - // Simulate that the first and second records were updated by myself + // Simulate that the first and third records were updated by myself snapshot.putIntoWriteSet(key1, preparePut(ANY_TEXT_1, ANY_TEXT_1)); snapshot.putIntoWriteSet(key3, preparePut(ANY_TEXT_3, ANY_TEXT_1)); Scanner scanner = mock(Scanner.class); @@ -1712,7 +1712,7 @@ public void toSerializable_ScanWithLimitInScanSet_ShouldProcessWithoutExceptions snapshot.putIntoScanSet( scan, Maps.newLinkedHashMap(ImmutableMap.of(key1, result1, key2, result2, key3, result3))); - // Simulate that the first and second records were deleted by myself + // Simulate that the first and third records were deleted by myself snapshot.putIntoDeleteSet(key1, prepareDelete(ANY_TEXT_1, ANY_TEXT_1)); snapshot.putIntoDeleteSet(key3, prepareDelete(ANY_TEXT_3, ANY_TEXT_1)); Scanner scanner = mock(Scanner.class);