From a45f7a9db3d5904699c0cf85c5fab9a1acbc9023 Mon Sep 17 00:00:00 2001 From: brfrn169 Date: Tue, 27 May 2025 22:44:50 +0900 Subject: [PATCH] Handle get with index correctly in CrudHandler --- .../consensuscommit/CrudHandler.java | 32 +++- .../transaction/consensuscommit/Snapshot.java | 7 + .../consensuscommit/CrudHandlerTest.java | 87 +++++++++++ ...nsusCommitSpecificIntegrationTestBase.java | 143 ++++++++++++++++++ 4 files changed, 264 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java index 6a8a75024d..c66595ca81 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,17 +77,27 @@ public CrudHandler( public Optional get(Get originalGet) throws CrudException { List originalProjections = new ArrayList<>(originalGet.getProjections()); Get get = (Get) prepareStorageSelection(originalGet); - Snapshot.Key key = new Snapshot.Key(get); - readUnread(key, get); TableMetadata metadata = getTableMetadata(get); + + Snapshot.Key key; + if (ScalarDbUtils.isSecondaryIndexSpecified(get, metadata)) { + // In case of a Get with index, we don't know the key until we read the record + key = null; + } else { + key = new Snapshot.Key(get); + } + + readUnread(key, get); + return snapshot .getResult(key, get) .map(r -> new FilteredResult(r, originalProjections, metadata, isIncludeMetadataEnabled)); } + // Only for a Get with index, the argument `key` is null @VisibleForTesting - void readUnread(Snapshot.Key key, Get get) throws CrudException { + void readUnread(@Nullable Snapshot.Key key, Get get) throws CrudException { if (!snapshot.containsKeyInGetSet(get)) { read(key, get); } @@ -95,7 +106,7 @@ void readUnread(Snapshot.Key key, Get get) throws CrudException { // Although this class is not thread-safe, this method is actually thread-safe, so we call it // concurrently in the implicit pre-read @VisibleForTesting - void read(Snapshot.Key key, Get get) throws CrudException { + void read(@Nullable Snapshot.Key key, Get get) throws CrudException { Optional result = getFromStorage(get); if (!result.isPresent() || result.get().isCommitted()) { if (result.isPresent() || get.getConjunctions().isEmpty()) { @@ -104,7 +115,18 @@ void read(Snapshot.Key key, Get get) throws CrudException { // transaction read it first. However, we update it only if a get operation has no // conjunction or the result exists. This is because we don’t know whether the record // actually exists or not due to the conjunction. - snapshot.putIntoReadSet(key, result); + if (key != null) { + snapshot.putIntoReadSet(key, result); + } else { + // Only for a Get with index, the argument `key` is null + + if (result.isPresent()) { + // Only when we can get the record with the Get with index, we can put it into the read + // set + key = new Snapshot.Key(get, result.get()); + snapshot.putIntoReadSet(key, result); + } + } } snapshot.putIntoGetSet(get, result); // for re-read and validation return; diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java index d656080a24..cf535281ab 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java @@ -720,6 +720,13 @@ public Key(Get get) { this((Operation) get); } + public Key(Get get, Result result) { + this.namespace = get.forNamespace().get(); + this.table = get.forTable().get(); + this.partitionKey = result.getPartitionKey().get(); + this.clusteringKey = result.getClusteringKey(); + } + public Key(Put put) { this((Operation) put); } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java index 59bf47af65..933d40b5d6 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java @@ -56,6 +56,7 @@ public class CrudHandlerTest { private static final String ANY_ID_2 = "id2"; private static final String ANY_NAME_1 = "name1"; private static final String ANY_NAME_2 = "name2"; + private static final String ANY_NAME_3 = "name3"; private static final String ANY_TEXT_1 = "text1"; private static final String ANY_TEXT_2 = "text2"; private static final String ANY_TEXT_3 = "text3"; @@ -66,8 +67,10 @@ public class CrudHandlerTest { TableMetadata.newBuilder() .addColumn(ANY_NAME_1, DataType.TEXT) .addColumn(ANY_NAME_2, DataType.TEXT) + .addColumn(ANY_NAME_3, DataType.TEXT) .addPartitionKey(ANY_NAME_1) .addClusteringKey(ANY_NAME_2) + .addSecondaryIndex(ANY_NAME_3) .build()); private static final TransactionTableMetadata TRANSACTION_TABLE_METADATA = new TransactionTableMetadata(TABLE_METADATA); @@ -928,6 +931,7 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods() // Assert verify(storage, never()).get(any()); + verify(snapshot, never()).putIntoReadSet(any(Snapshot.Key.class), any(Optional.class)); verify(snapshot, never()).putIntoGetSet(any(Get.class), any(Optional.class)); } @@ -1014,6 +1018,7 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods() // Assert verify(storage).get(any()); verify(snapshot).putIntoReadSet(key, Optional.of(new TransactionResult(result))); + verify(snapshot).putIntoGetSet(getForKey, Optional.of(new TransactionResult(result))); } @Test @@ -1050,6 +1055,88 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods() }); } + @Test + public void + readUnread_NullKeyAndGetWithIndexNotContainedInGetSet_EmptyResultReturnedByStorage_ShouldCallAppropriateMethods() + throws CrudException, ExecutionException { + // Arrange + Get getWithIndex = + Get.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .indexKey(Key.ofText(ANY_NAME_3, ANY_TEXT_1)) + .build(); + when(snapshot.containsKeyInGetSet(getWithIndex)).thenReturn(false); + when(storage.get(any())).thenReturn(Optional.empty()); + + // Act + handler.readUnread(null, getWithIndex); + + // Assert + verify(storage).get(any()); + verify(snapshot, never()).putIntoReadSet(any(), any()); + verify(snapshot).putIntoGetSet(getWithIndex, Optional.empty()); + } + + @Test + public void + readUnread_NullKeyAndGetWithIndexNotContainedInGetSet_CommittedRecordReturnedByStorage_ShouldCallAppropriateMethods() + throws CrudException, ExecutionException { + // Arrange + Result result = mock(Result.class); + when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.COMMITTED.get()); + when(result.getPartitionKey()).thenReturn(Optional.of(Key.ofText(ANY_NAME_1, ANY_TEXT_1))); + when(result.getClusteringKey()).thenReturn(Optional.of(Key.ofText(ANY_NAME_2, ANY_TEXT_2))); + when(storage.get(any())).thenReturn(Optional.of(result)); + + Get getWithIndex = + Get.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .indexKey(Key.ofText(ANY_NAME_3, ANY_TEXT_1)) + .build(); + when(snapshot.containsKeyInGetSet(getWithIndex)).thenReturn(false); + + // Act + handler.readUnread(null, getWithIndex); + + // Assert + verify(storage).get(any()); + verify(snapshot) + .putIntoReadSet( + new Snapshot.Key(getWithIndex, result), Optional.of(new TransactionResult(result))); + verify(snapshot).putIntoGetSet(getWithIndex, Optional.of(new TransactionResult(result))); + } + + @Test + public void + readUnread_NullKeyAndGetWithIndexNotContainedInGetSet_UncommittedRecordReturnedByStorage_ShouldThrowUncommittedRecordException() + throws ExecutionException { + // Arrange + Result result = mock(Result.class); + when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.PREPARED.get()); + when(storage.get(any())).thenReturn(Optional.of(result)); + + Get getWithIndex = + Get.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .indexKey(Key.ofText(ANY_NAME_3, ANY_TEXT_1)) + .build(); + when(snapshot.containsKeyInGetSet(getWithIndex)).thenReturn(false); + + // Act Assert + assertThatThrownBy(() -> handler.readUnread(null, getWithIndex)) + .isInstanceOf(UncommittedRecordException.class) + .satisfies( + e -> { + UncommittedRecordException exception = (UncommittedRecordException) e; + assertThat(exception.getSelection()).isEqualTo(getWithIndex); + assertThat(exception.getResults().size()).isEqualTo(1); + assertThat(exception.getResults().get(0)).isEqualTo(result); + }); + } + @Test public void readIfImplicitPreReadEnabled_ShouldCallAppropriateMethods() throws CrudException { // Arrange diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java index 43618bf434..d552d704e3 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java @@ -4351,6 +4351,149 @@ public void get_GetWithIndexGiven_WithSerializable_ShouldNotThrowAnyException() assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); } + @Test + public void getAndUpdate_GetWithIndexGiven_ShouldUpdate() throws TransactionException { + // Arrange + manager.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + + // Act Assert + DistributedTransaction transaction = manager.begin(); + Optional actual = + transaction.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) + .build()); + + assertThat(actual).isPresent(); + assertThat(actual.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(actual.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(actual.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + transaction.update( + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 1) + .build()); + + transaction.commit(); + + transaction = manager.begin(); + actual = + transaction.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build()); + transaction.commit(); + + assertThat(actual).isPresent(); + assertThat(actual.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(actual.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(actual.get().getInt(BALANCE)).isEqualTo(1); + } + + @Test + public void scanAndUpdate_ScanWithIndexGiven_ShouldUpdate() throws TransactionException { + // Arrange + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + // Act Assert + DistributedTransaction transaction = manager.begin(); + List actualResults = + transaction.scan( + Scan.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) + .build()); + + assertThat(actualResults).hasSize(2); + Set expectedTypes = Sets.newHashSet(0, 1); + for (Result result : actualResults) { + assertThat(result.getInt(ACCOUNT_ID)).isEqualTo(0); + expectedTypes.remove(result.getInt(ACCOUNT_TYPE)); + assertThat(result.getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + } + assertThat(expectedTypes).isEmpty(); + + transaction.update( + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 1) + .build()); + transaction.update( + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .intValue(BALANCE, 2) + .build()); + + transaction.commit(); + + transaction = manager.begin(); + Optional actual1 = + transaction.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build()); + Optional actual2 = + transaction.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .build()); + transaction.commit(); + + assertThat(actual1).isPresent(); + assertThat(actual1.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(actual1.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(actual1.get().getInt(BALANCE)).isEqualTo(1); + + assertThat(actual2).isPresent(); + assertThat(actual2.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(actual2.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(actual2.get().getInt(BALANCE)).isEqualTo(2); + } + private DistributedTransaction prepareTransfer( int fromId, String fromNamespace,