Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,15 @@ private void validateScanResults(

// Compare the records of the original scan results and the latest scan results
if (!originalResultEntry.getKey().equals(key)) {
if (writeSet.containsKey(originalResultEntry.getKey())
|| deleteSet.containsKey(originalResultEntry.getKey())) {
Comment on lines +643 to +644
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also skip the key in writeSet or deleteSet.

// The record is inserted/deleted/updated by this transaction

// Skip the record of the original scan results
originalResultEntry = Iterators.getNext(originalResultIterator, null);
Comment on lines +646 to +648
Copy link

Copilot AI Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The skip logic for own writes/deletes appears twice and uses manual iterator advancement plus continue. Consider extracting this into a helper method or restructuring the loop to avoid manual getNext calls for clearer control flow.

Suggested change
// Skip the record of the original scan results
originalResultEntry = Iterators.getNext(originalResultIterator, null);
SkipResult skipResult = skipOwnWritesDeletes(latestResult, scanner, originalResultEntry, originalResultIterator, null);
originalResultEntry = skipResult.getOriginalResultEntry();

Copilot uses AI. Check for mistakes.
continue;
}

// The record is inserted/deleted by another transaction
throwExceptionDueToAntiDependency();
}
Expand All @@ -653,9 +662,21 @@ private void validateScanResults(
originalResultEntry = Iterators.getNext(originalResultIterator, null);
}

if (originalResultEntry != null) {
// Some of the records of the scan results are deleted by another transaction
throwExceptionDueToAntiDependency();
while (originalResultEntry != null) {
if (writeSet.containsKey(originalResultEntry.getKey())
|| deleteSet.containsKey(originalResultEntry.getKey())) {
Comment on lines +666 to +667
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. We can also skip the key in writeSet or deleteSet.

// The record is inserted/deleted/updated by this transaction

// Skip the record of the original scan results
originalResultEntry = Iterators.getNext(originalResultIterator, null);
} else {
// The record is inserted/deleted by another transaction
throwExceptionDueToAntiDependency();
}
}

if (!latestResult.isPresent()) {
return;
}

if (scan.getLimit() != 0 && results.size() == scan.getLimit()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,19 @@ private Scan prepareCrossPartitionScan(String namespace, String table) {
}

private Put preparePut() {
Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1);
Key clusteringKey = new Key(ANY_NAME_2, ANY_TEXT_2);
return new Put(partitionKey, clusteringKey)
.withConsistency(Consistency.LINEARIZABLE)
.forNamespace(ANY_NAMESPACE_NAME)
.forTable(ANY_TABLE_NAME)
.withValue(ANY_NAME_3, ANY_TEXT_3)
.withValue(ANY_NAME_4, ANY_TEXT_4);
return preparePut(ANY_TEXT_1, ANY_TEXT_2);
}

private Put preparePut(String partitionKeyColumnValue, String clusteringKeyColumnValue) {
return Put.newBuilder()
.namespace(ANY_NAMESPACE_NAME)
.table(ANY_TABLE_NAME)
.partitionKey(Key.ofText(ANY_NAME_1, partitionKeyColumnValue))
.clusteringKey(Key.ofText(ANY_NAME_2, clusteringKeyColumnValue))
.textValue(ANY_NAME_3, ANY_TEXT_3)
.textValue(ANY_NAME_4, ANY_TEXT_4)
.consistency(Consistency.LINEARIZABLE)
.build();
}

private Put prepareAnotherPut() {
Expand Down Expand Up @@ -301,12 +306,17 @@ private Put preparePutForMergeTest() {
}

private Delete prepareDelete() {
Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1);
Key clusteringKey = new Key(ANY_NAME_2, ANY_TEXT_2);
return new Delete(partitionKey, clusteringKey)
.withConsistency(Consistency.LINEARIZABLE)
.forNamespace(ANY_NAMESPACE_NAME)
.forTable(ANY_TABLE_NAME);
return prepareDelete(ANY_TEXT_1, ANY_TEXT_2);
}

private Delete prepareDelete(String partitionKeyColumnValue, String clusteringKeyColumnValue) {
return Delete.newBuilder()
.namespace(ANY_NAMESPACE_NAME)
.table(ANY_TABLE_NAME)
.partitionKey(Key.ofText(ANY_NAME_1, partitionKeyColumnValue))
.clusteringKey(Key.ofText(ANY_NAME_2, clusteringKeyColumnValue))
.consistency(Consistency.LINEARIZABLE)
.build();
}

private Delete prepareAnotherDelete() {
Expand Down Expand Up @@ -1086,7 +1096,7 @@ public void toSerializable_GetSetWithGetWithIndex_ShouldProcessWithoutExceptions

@Test
public void
toSerializable_GetSetWithGetWithIndex_RecordInsertedByMySelf_ShouldProcessWithoutExceptions()
toSerializable_GetSetWithGetWithIndex_RecordInsertedByMyself_ShouldProcessWithoutExceptions()
throws ExecutionException {
// Arrange
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
Expand Down Expand Up @@ -1160,7 +1170,7 @@ public void toSerializable_ScanSetUpdated_ShouldThrowValidationConflictException
}

@Test
public void toSerializable_ScanSetUpdatedByMySelf_ShouldProcessWithoutExceptions()
public void toSerializable_ScanSetUpdatedByMyself_ShouldProcessWithoutExceptions()
throws ExecutionException {
// Arrange
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
Expand Down Expand Up @@ -1237,7 +1247,7 @@ public void toSerializable_ScanSetExtended_ShouldThrowValidationConflictExceptio
}

@Test
public void toSerializable_ScanSetExtendedByMySelf_ShouldProcessWithoutExceptions()
public void toSerializable_ScanSetExtendedByMyself_ShouldProcessWithoutExceptions()
throws ExecutionException {
// Arrange
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
Expand All @@ -1261,7 +1271,7 @@ public void toSerializable_ScanSetExtendedByMySelf_ShouldProcessWithoutException

@Test
public void
toSerializable_ScanSetWithMultipleRecordsExtendedByMySelf_ShouldProcessWithoutExceptions()
toSerializable_ScanSetWithMultipleRecordsExtendedByMyself_ShouldProcessWithoutExceptions()
throws ExecutionException {
// Arrange
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
Expand Down Expand Up @@ -1526,7 +1536,7 @@ public void toSerializable_ScanWithLimitInScanSet_ShouldProcessWithoutExceptions

@Test
public void
toSerializable_ScanWithLimitInScanSet_WhenInsertingFirstRecordIntoScanRangeByMySelf_ShouldProcessWithoutExceptions()
toSerializable_ScanWithLimitInScanSet_WhenInsertingFirstRecordIntoScanRangeByMyself_ShouldProcessWithoutExceptions()
throws ExecutionException {
// Arrange
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
Expand Down Expand Up @@ -1589,7 +1599,7 @@ public void toSerializable_ScanWithLimitInScanSet_ShouldProcessWithoutExceptions

@Test
public void
toSerializable_ScanWithLimitInScanSet_WhenInsertingLastRecordIntoScanRangeByMySelf_ShouldProcessWithoutExceptions()
toSerializable_ScanWithLimitInScanSet_WhenInsertingLastRecordIntoScanRangeByMyself_ShouldProcessWithoutExceptions()
throws ExecutionException {
// Arrange
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
Expand Down Expand Up @@ -1619,6 +1629,107 @@ public void toSerializable_ScanWithLimitInScanSet_ShouldProcessWithoutExceptions
verify(storage).scan(scanWithProjectionsWithoutLimit);
}

@Test
public void
toSerializable_ScanWithIndexInScanSet_WhenUpdatingRecords_ShouldThrowValidationConflictException()
throws ExecutionException {
// Arrange
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
Scan scan = prepareScanWithIndex();
TransactionResult result1 = prepareResult(ANY_ID + "x", ANY_TEXT_1, ANY_TEXT_1);
TransactionResult result2 = prepareResult(ANY_ID + "x", ANY_TEXT_2, ANY_TEXT_1);
TransactionResult result3 = prepareResult(ANY_ID + "x", ANY_TEXT_3, ANY_TEXT_1);
Snapshot.Key key1 = new Snapshot.Key(scan, result1);
Snapshot.Key key2 = new Snapshot.Key(scan, result2);
Snapshot.Key key3 = new Snapshot.Key(scan, result3);
snapshot.putIntoScanSet(
scan, Maps.newLinkedHashMap(ImmutableMap.of(key1, result1, key2, result2, key3, result3)));

// Simulate that the first and third records were updated by another transaction
Scanner scanner = mock(Scanner.class);
when(scanner.one()).thenReturn(Optional.of(result2)).thenReturn(Optional.empty());

DistributedStorage storage = mock(DistributedStorage.class);
Scan scanWithProjections =
Scan.newBuilder(scan).projections(Attribute.ID, ANY_NAME_1, ANY_NAME_2).limit(0).build();
when(storage.scan(scanWithProjections)).thenReturn(scanner);

// Act Assert
assertThatThrownBy(() -> snapshot.toSerializable(storage))
.isInstanceOf(ValidationConflictException.class);

// Assert
verify(storage).scan(scanWithProjections);
}

@Test
public void
toSerializable_ScanWithIndexInScanSet_WhenUpdatingRecordsByMyself_ShouldProcessWithoutExceptions()
throws ExecutionException {
// Arrange
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
Scan scan = prepareScanWithIndex();
TransactionResult result1 = prepareResult(ANY_ID + "x", ANY_TEXT_1, ANY_TEXT_1);
TransactionResult result2 = prepareResult(ANY_ID + "x", ANY_TEXT_2, ANY_TEXT_1);
TransactionResult result3 = prepareResult(ANY_ID + "x", ANY_TEXT_3, ANY_TEXT_1);
Snapshot.Key key1 = new Snapshot.Key(scan, result1);
Snapshot.Key key2 = new Snapshot.Key(scan, result2);
Snapshot.Key key3 = new Snapshot.Key(scan, result3);
snapshot.putIntoScanSet(
scan, Maps.newLinkedHashMap(ImmutableMap.of(key1, result1, key2, result2, key3, result3)));

// Simulate that the first and third records were updated by myself
snapshot.putIntoWriteSet(key1, preparePut(ANY_TEXT_1, ANY_TEXT_1));
Copy link

Copilot AI Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test claims to simulate updates by myself for the first and second records, but it only puts key1 and key3 into the writeSet. You should also add key2 to writeSet (or adjust the comment) so the test accurately reflects the intended scenario.

Suggested change
snapshot.putIntoWriteSet(key1, preparePut(ANY_TEXT_1, ANY_TEXT_1));
snapshot.putIntoWriteSet(key1, preparePut(ANY_TEXT_1, ANY_TEXT_1));
snapshot.putIntoWriteSet(key2, preparePut(ANY_TEXT_2, ANY_TEXT_1));

Copilot uses AI. Check for mistakes.
snapshot.putIntoWriteSet(key3, preparePut(ANY_TEXT_3, ANY_TEXT_1));
Scanner scanner = mock(Scanner.class);
when(scanner.one()).thenReturn(Optional.of(result2)).thenReturn(Optional.empty());

DistributedStorage storage = mock(DistributedStorage.class);
Scan scanWithProjections =
Scan.newBuilder(scan).projections(Attribute.ID, ANY_NAME_1, ANY_NAME_2).limit(0).build();
when(storage.scan(scanWithProjections)).thenReturn(scanner);

// Act Assert
assertThatCode(() -> snapshot.toSerializable(storage)).doesNotThrowAnyException();

// Assert
verify(storage).scan(scanWithProjections);
}

@Test
public void
toSerializable_ScanWithIndexInScanSet_WhenDeletingRecordsByMyself_ShouldProcessWithoutExceptions()
throws ExecutionException {
// Arrange
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
Scan scan = prepareScanWithIndex();
TransactionResult result1 = prepareResult(ANY_ID + "x", ANY_TEXT_1, ANY_TEXT_1);
TransactionResult result2 = prepareResult(ANY_ID + "x", ANY_TEXT_2, ANY_TEXT_1);
TransactionResult result3 = prepareResult(ANY_ID + "x", ANY_TEXT_3, ANY_TEXT_1);
Snapshot.Key key1 = new Snapshot.Key(scan, result1);
Snapshot.Key key2 = new Snapshot.Key(scan, result2);
Snapshot.Key key3 = new Snapshot.Key(scan, result3);
snapshot.putIntoScanSet(
scan, Maps.newLinkedHashMap(ImmutableMap.of(key1, result1, key2, result2, key3, result3)));

// Simulate that the first and third records were deleted by myself
snapshot.putIntoDeleteSet(key1, prepareDelete(ANY_TEXT_1, ANY_TEXT_1));
Copy link

Copilot AI Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deletion test’s comment says it simulates deletes of the first and second records by myself, but it only deletes key1 and key3. Add key2 to the deleteSet (or update the comment) so the test setup matches its description.

Suggested change
snapshot.putIntoDeleteSet(key1, prepareDelete(ANY_TEXT_1, ANY_TEXT_1));
snapshot.putIntoDeleteSet(key1, prepareDelete(ANY_TEXT_1, ANY_TEXT_1));
snapshot.putIntoDeleteSet(key2, prepareDelete(ANY_TEXT_2, ANY_TEXT_1));

Copilot uses AI. Check for mistakes.
snapshot.putIntoDeleteSet(key3, prepareDelete(ANY_TEXT_3, ANY_TEXT_1));
Scanner scanner = mock(Scanner.class);
when(scanner.one()).thenReturn(Optional.of(result2)).thenReturn(Optional.empty());

DistributedStorage storage = mock(DistributedStorage.class);
Scan scanWithProjections =
Scan.newBuilder(scan).projections(Attribute.ID, ANY_NAME_1, ANY_NAME_2).limit(0).build();
when(storage.scan(scanWithProjections)).thenReturn(scanner);

// Act Assert
assertThatCode(() -> snapshot.toSerializable(storage)).doesNotThrowAnyException();

// Assert
verify(storage).scan(scanWithProjections);
}

@Test
public void toSerializable_ScannerSetNotChanged_ShouldProcessWithoutExceptions()
throws ExecutionException {
Expand Down
Loading