Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -76,17 +77,27 @@ public CrudHandler(
public Optional<Result> get(Get originalGet) throws CrudException {
List<String> originalProjections = new ArrayList<>(originalGet.getProjections());
Get get = (Get) prepareStorageSelection(originalGet);
Snapshot.Key key = new Snapshot.Key(get);
readUnread(key, get);

TableMetadata metadata = getTableMetadata(get);

Snapshot.Key key;
if (ScalarDbUtils.isSecondaryIndexSpecified(get, metadata)) {
// In case of a Get with index, we don't know the key until we read the record
key = null;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Get with index, set key to null at this point.

} else {
key = new Snapshot.Key(get);
}

readUnread(key, get);

return snapshot
.getResult(key, get)
.map(r -> new FilteredResult(r, originalProjections, metadata, isIncludeMetadataEnabled));
}

// Only for a Get with index, the argument `key` is null
@VisibleForTesting
void readUnread(Snapshot.Key key, Get get) throws CrudException {
void readUnread(@Nullable Snapshot.Key key, Get get) throws CrudException {
if (!snapshot.containsKeyInGetSet(get)) {
read(key, get);
}
Expand All @@ -95,7 +106,7 @@ void readUnread(Snapshot.Key key, Get get) throws CrudException {
// Although this class is not thread-safe, this method is actually thread-safe, so we call it
// concurrently in the implicit pre-read
@VisibleForTesting
void read(Snapshot.Key key, Get get) throws CrudException {
void read(@Nullable Snapshot.Key key, Get get) throws CrudException {
Optional<TransactionResult> result = getFromStorage(get);
if (!result.isPresent() || result.get().isCommitted()) {
if (result.isPresent() || get.getConjunctions().isEmpty()) {
Expand All @@ -104,7 +115,18 @@ void read(Snapshot.Key key, Get get) throws CrudException {
// transaction read it first. However, we update it only if a get operation has no
// conjunction or the result exists. This is because we don’t know whether the record
// actually exists or not due to the conjunction.
snapshot.putIntoReadSet(key, result);
if (key != null) {
snapshot.putIntoReadSet(key, result);
} else {
// Only for a Get with index, the argument `key` is null

if (result.isPresent()) {
// Only when we can get the record with the Get with index, we can put it into the read
// set
key = new Snapshot.Key(get, result.get());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a Snapshot.Key instance from the result of the Get with index.

snapshot.putIntoReadSet(key, result);
}
}
}
snapshot.putIntoGetSet(get, result); // for re-read and validation
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,13 @@ public Key(Get get) {
this((Operation) get);
}

Copy link

Copilot AI May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Add a JavaDoc comment to this constructor clarifying that it builds a snapshot key from an index-based Get and its corresponding Result, detailing which fields are used.

Suggested change
/**
* Constructs a snapshot key from an index-based {@link Get} operation and its corresponding {@link Result}.
*
* <p>The following fields are used:
* <ul>
* <li>{@code namespace} and {@code table} are extracted from the {@link Get} operation.</li>
* <li>{@code partitionKey} and {@code clusteringKey} are extracted from the {@link Result}.</li>
* </ul>
*
* @param get the {@link Get} operation representing the index-based query
* @param result the {@link Result} containing the actual data for the key
*/

Copilot uses AI. Check for mistakes.
public Key(Get get, Result result) {
this.namespace = get.forNamespace().get();
this.table = get.forTable().get();
this.partitionKey = result.getPartitionKey().get();
this.clusteringKey = result.getClusteringKey();
}

public Key(Put put) {
this((Operation) put);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class CrudHandlerTest {
private static final String ANY_ID_2 = "id2";
private static final String ANY_NAME_1 = "name1";
private static final String ANY_NAME_2 = "name2";
private static final String ANY_NAME_3 = "name3";
private static final String ANY_TEXT_1 = "text1";
private static final String ANY_TEXT_2 = "text2";
private static final String ANY_TEXT_3 = "text3";
Expand All @@ -66,8 +67,10 @@ public class CrudHandlerTest {
TableMetadata.newBuilder()
.addColumn(ANY_NAME_1, DataType.TEXT)
.addColumn(ANY_NAME_2, DataType.TEXT)
.addColumn(ANY_NAME_3, DataType.TEXT)
.addPartitionKey(ANY_NAME_1)
.addClusteringKey(ANY_NAME_2)
.addSecondaryIndex(ANY_NAME_3)
.build());
private static final TransactionTableMetadata TRANSACTION_TABLE_METADATA =
new TransactionTableMetadata(TABLE_METADATA);
Expand Down Expand Up @@ -928,6 +931,7 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods()

// Assert
verify(storage, never()).get(any());
verify(snapshot, never()).putIntoReadSet(any(Snapshot.Key.class), any(Optional.class));
verify(snapshot, never()).putIntoGetSet(any(Get.class), any(Optional.class));
}

Expand Down Expand Up @@ -1014,6 +1018,7 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods()
// Assert
verify(storage).get(any());
verify(snapshot).putIntoReadSet(key, Optional.of(new TransactionResult(result)));
verify(snapshot).putIntoGetSet(getForKey, Optional.of(new TransactionResult(result)));
}

@Test
Expand Down Expand Up @@ -1050,6 +1055,88 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods()
});
}

@Test
public void
readUnread_NullKeyAndGetWithIndexNotContainedInGetSet_EmptyResultReturnedByStorage_ShouldCallAppropriateMethods()
throws CrudException, ExecutionException {
// Arrange
Get getWithIndex =
Get.newBuilder()
.namespace(ANY_NAMESPACE_NAME)
.table(ANY_TABLE_NAME)
.indexKey(Key.ofText(ANY_NAME_3, ANY_TEXT_1))
.build();
when(snapshot.containsKeyInGetSet(getWithIndex)).thenReturn(false);
when(storage.get(any())).thenReturn(Optional.empty());

// Act
handler.readUnread(null, getWithIndex);

// Assert
verify(storage).get(any());
verify(snapshot, never()).putIntoReadSet(any(), any());
verify(snapshot).putIntoGetSet(getWithIndex, Optional.empty());
}

@Test
public void
readUnread_NullKeyAndGetWithIndexNotContainedInGetSet_CommittedRecordReturnedByStorage_ShouldCallAppropriateMethods()
throws CrudException, ExecutionException {
// Arrange
Result result = mock(Result.class);
when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.COMMITTED.get());
when(result.getPartitionKey()).thenReturn(Optional.of(Key.ofText(ANY_NAME_1, ANY_TEXT_1)));
when(result.getClusteringKey()).thenReturn(Optional.of(Key.ofText(ANY_NAME_2, ANY_TEXT_2)));
when(storage.get(any())).thenReturn(Optional.of(result));

Get getWithIndex =
Get.newBuilder()
.namespace(ANY_NAMESPACE_NAME)
.table(ANY_TABLE_NAME)
.indexKey(Key.ofText(ANY_NAME_3, ANY_TEXT_1))
.build();
when(snapshot.containsKeyInGetSet(getWithIndex)).thenReturn(false);

// Act
handler.readUnread(null, getWithIndex);

// Assert
verify(storage).get(any());
verify(snapshot)
.putIntoReadSet(
new Snapshot.Key(getWithIndex, result), Optional.of(new TransactionResult(result)));
verify(snapshot).putIntoGetSet(getWithIndex, Optional.of(new TransactionResult(result)));
}

@Test
public void
readUnread_NullKeyAndGetWithIndexNotContainedInGetSet_UncommittedRecordReturnedByStorage_ShouldThrowUncommittedRecordException()
throws ExecutionException {
// Arrange
Result result = mock(Result.class);
when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.PREPARED.get());
when(storage.get(any())).thenReturn(Optional.of(result));

Get getWithIndex =
Get.newBuilder()
.namespace(ANY_NAMESPACE_NAME)
.table(ANY_TABLE_NAME)
.indexKey(Key.ofText(ANY_NAME_3, ANY_TEXT_1))
.build();
when(snapshot.containsKeyInGetSet(getWithIndex)).thenReturn(false);

// Act Assert
assertThatThrownBy(() -> handler.readUnread(null, getWithIndex))
.isInstanceOf(UncommittedRecordException.class)
.satisfies(
e -> {
UncommittedRecordException exception = (UncommittedRecordException) e;
assertThat(exception.getSelection()).isEqualTo(getWithIndex);
assertThat(exception.getResults().size()).isEqualTo(1);
assertThat(exception.getResults().get(0)).isEqualTo(result);
});
}

@Test
public void readIfImplicitPreReadEnabled_ShouldCallAppropriateMethods() throws CrudException {
// Arrange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4351,6 +4351,149 @@ public void get_GetWithIndexGiven_WithSerializable_ShouldNotThrowAnyException()
assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class);
}

@Test
public void getAndUpdate_GetWithIndexGiven_ShouldUpdate() throws TransactionException {
// Arrange
manager.insert(
Insert.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
.intValue(BALANCE, INITIAL_BALANCE)
.build());

// Act Assert
DistributedTransaction transaction = manager.begin();
Optional<Result> actual =
transaction.get(
Get.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE))
.build());

assertThat(actual).isPresent();
assertThat(actual.get().getInt(ACCOUNT_ID)).isEqualTo(0);
assertThat(actual.get().getInt(ACCOUNT_TYPE)).isEqualTo(0);
assertThat(actual.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE);

transaction.update(
Update.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
.intValue(BALANCE, 1)
.build());

transaction.commit();

transaction = manager.begin();
actual =
transaction.get(
Get.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
.build());
transaction.commit();

assertThat(actual).isPresent();
assertThat(actual.get().getInt(ACCOUNT_ID)).isEqualTo(0);
assertThat(actual.get().getInt(ACCOUNT_TYPE)).isEqualTo(0);
assertThat(actual.get().getInt(BALANCE)).isEqualTo(1);
}

@Test
public void scanAndUpdate_ScanWithIndexGiven_ShouldUpdate() throws TransactionException {
// Arrange
manager.mutate(
Arrays.asList(
Copy link

Copilot AI May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Since this is Java 9+, consider using List.of(...) instead of Arrays.asList(...) for brevity and to get an immutable list by default.

Suggested change
Arrays.asList(
List.of(

Copilot uses AI. Check for mistakes.
Insert.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
.intValue(BALANCE, INITIAL_BALANCE)
.build(),
Insert.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1))
.intValue(BALANCE, INITIAL_BALANCE)
.build()));

// Act Assert
DistributedTransaction transaction = manager.begin();
List<Result> actualResults =
transaction.scan(
Scan.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE))
.build());

assertThat(actualResults).hasSize(2);
Set<Integer> expectedTypes = Sets.newHashSet(0, 1);
for (Result result : actualResults) {
assertThat(result.getInt(ACCOUNT_ID)).isEqualTo(0);
expectedTypes.remove(result.getInt(ACCOUNT_TYPE));
assertThat(result.getInt(BALANCE)).isEqualTo(INITIAL_BALANCE);
}
assertThat(expectedTypes).isEmpty();

transaction.update(
Update.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
.intValue(BALANCE, 1)
.build());
transaction.update(
Update.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1))
.intValue(BALANCE, 2)
.build());

transaction.commit();

transaction = manager.begin();
Optional<Result> actual1 =
transaction.get(
Get.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
.build());
Optional<Result> actual2 =
transaction.get(
Get.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1))
.build());
transaction.commit();

assertThat(actual1).isPresent();
assertThat(actual1.get().getInt(ACCOUNT_ID)).isEqualTo(0);
assertThat(actual1.get().getInt(ACCOUNT_TYPE)).isEqualTo(0);
assertThat(actual1.get().getInt(BALANCE)).isEqualTo(1);

assertThat(actual2).isPresent();
assertThat(actual2.get().getInt(ACCOUNT_ID)).isEqualTo(0);
assertThat(actual2.get().getInt(ACCOUNT_TYPE)).isEqualTo(1);
assertThat(actual2.get().getInt(BALANCE)).isEqualTo(2);
}

private DistributedTransaction prepareTransfer(
int fromId,
String fromNamespace,
Expand Down