Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,15 @@ private void validateScanResults(

// Compare the records of the original scan results and the latest scan results
if (!originalResultEntry.getKey().equals(key)) {
if (writeSet.containsKey(originalResultEntry.getKey())
|| deleteSet.containsKey(originalResultEntry.getKey())) {
// The record is inserted/deleted/updated by this transaction

// Skip the record of the original scan results
originalResultEntry = Iterators.getNext(originalResultIterator, null);
continue;
}

// The record is inserted/deleted by another transaction
throwExceptionDueToAntiDependency();
}
Expand All @@ -653,9 +662,21 @@ private void validateScanResults(
originalResultEntry = Iterators.getNext(originalResultIterator, null);
}

if (originalResultEntry != null) {
// Some of the records of the scan results are deleted by another transaction
throwExceptionDueToAntiDependency();
while (originalResultEntry != null) {
if (writeSet.containsKey(originalResultEntry.getKey())
|| deleteSet.containsKey(originalResultEntry.getKey())) {
// The record is inserted/deleted/updated by this transaction

// Skip the record of the original scan results
originalResultEntry = Iterators.getNext(originalResultIterator, null);
} else {
// The record is inserted/deleted by another transaction
throwExceptionDueToAntiDependency();
}
}

if (!latestResult.isPresent()) {
return;
}

if (scan.getLimit() != 0 && results.size() == scan.getLimit()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,19 @@ private Scan prepareCrossPartitionScan(String namespace, String table) {
}

private Put preparePut() {
Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1);
Key clusteringKey = new Key(ANY_NAME_2, ANY_TEXT_2);
return new Put(partitionKey, clusteringKey)
.withConsistency(Consistency.LINEARIZABLE)
.forNamespace(ANY_NAMESPACE_NAME)
.forTable(ANY_TABLE_NAME)
.withValue(ANY_NAME_3, ANY_TEXT_3)
.withValue(ANY_NAME_4, ANY_TEXT_4);
return preparePut(ANY_TEXT_1, ANY_TEXT_2);
}

private Put preparePut(String partitionKeyColumnValue, String clusteringKeyColumnValue) {
return Put.newBuilder()
.namespace(ANY_NAMESPACE_NAME)
.table(ANY_TABLE_NAME)
.partitionKey(Key.ofText(ANY_NAME_1, partitionKeyColumnValue))
.clusteringKey(Key.ofText(ANY_NAME_2, clusteringKeyColumnValue))
.textValue(ANY_NAME_3, ANY_TEXT_3)
.textValue(ANY_NAME_4, ANY_TEXT_4)
.consistency(Consistency.LINEARIZABLE)
.build();
}

private Put prepareAnotherPut() {
Expand Down Expand Up @@ -301,12 +306,17 @@ private Put preparePutForMergeTest() {
}

private Delete prepareDelete() {
Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1);
Key clusteringKey = new Key(ANY_NAME_2, ANY_TEXT_2);
return new Delete(partitionKey, clusteringKey)
.withConsistency(Consistency.LINEARIZABLE)
.forNamespace(ANY_NAMESPACE_NAME)
.forTable(ANY_TABLE_NAME);
return prepareDelete(ANY_TEXT_1, ANY_TEXT_2);
}

private Delete prepareDelete(String partitionKeyColumnValue, String clusteringKeyColumnValue) {
return Delete.newBuilder()
.namespace(ANY_NAMESPACE_NAME)
.table(ANY_TABLE_NAME)
.partitionKey(Key.ofText(ANY_NAME_1, partitionKeyColumnValue))
.clusteringKey(Key.ofText(ANY_NAME_2, clusteringKeyColumnValue))
.consistency(Consistency.LINEARIZABLE)
.build();
}

private Delete prepareAnotherDelete() {
Expand Down Expand Up @@ -1086,7 +1096,7 @@ public void toSerializable_GetSetWithGetWithIndex_ShouldProcessWithoutExceptions

@Test
public void
toSerializable_GetSetWithGetWithIndex_RecordInsertedByMySelf_ShouldProcessWithoutExceptions()
toSerializable_GetSetWithGetWithIndex_RecordInsertedByMyself_ShouldProcessWithoutExceptions()
throws ExecutionException {
// Arrange
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
Expand Down Expand Up @@ -1160,7 +1170,7 @@ public void toSerializable_ScanSetUpdated_ShouldThrowValidationConflictException
}

@Test
public void toSerializable_ScanSetUpdatedByMySelf_ShouldProcessWithoutExceptions()
public void toSerializable_ScanSetUpdatedByMyself_ShouldProcessWithoutExceptions()
throws ExecutionException {
// Arrange
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
Expand Down Expand Up @@ -1237,7 +1247,7 @@ public void toSerializable_ScanSetExtended_ShouldThrowValidationConflictExceptio
}

@Test
public void toSerializable_ScanSetExtendedByMySelf_ShouldProcessWithoutExceptions()
public void toSerializable_ScanSetExtendedByMyself_ShouldProcessWithoutExceptions()
throws ExecutionException {
// Arrange
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
Expand All @@ -1261,7 +1271,7 @@ public void toSerializable_ScanSetExtendedByMySelf_ShouldProcessWithoutException

@Test
public void
toSerializable_ScanSetWithMultipleRecordsExtendedByMySelf_ShouldProcessWithoutExceptions()
toSerializable_ScanSetWithMultipleRecordsExtendedByMyself_ShouldProcessWithoutExceptions()
throws ExecutionException {
// Arrange
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
Expand Down Expand Up @@ -1526,7 +1536,7 @@ public void toSerializable_ScanWithLimitInScanSet_ShouldProcessWithoutExceptions

@Test
public void
toSerializable_ScanWithLimitInScanSet_WhenInsertingFirstRecordIntoScanRangeByMySelf_ShouldProcessWithoutExceptions()
toSerializable_ScanWithLimitInScanSet_WhenInsertingFirstRecordIntoScanRangeByMyself_ShouldProcessWithoutExceptions()
throws ExecutionException {
// Arrange
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
Expand Down Expand Up @@ -1589,7 +1599,7 @@ public void toSerializable_ScanWithLimitInScanSet_ShouldProcessWithoutExceptions

@Test
public void
toSerializable_ScanWithLimitInScanSet_WhenInsertingLastRecordIntoScanRangeByMySelf_ShouldProcessWithoutExceptions()
toSerializable_ScanWithLimitInScanSet_WhenInsertingLastRecordIntoScanRangeByMyself_ShouldProcessWithoutExceptions()
throws ExecutionException {
// Arrange
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
Expand Down Expand Up @@ -1619,6 +1629,107 @@ public void toSerializable_ScanWithLimitInScanSet_ShouldProcessWithoutExceptions
verify(storage).scan(scanWithProjectionsWithoutLimit);
}

@Test
public void
toSerializable_ScanWithIndexInScanSet_WhenUpdatingRecords_ShouldThrowValidationConflictException()
throws ExecutionException {
// Arrange
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
Scan scan = prepareScanWithIndex();
TransactionResult result1 = prepareResult(ANY_ID + "x", ANY_TEXT_1, ANY_TEXT_1);
TransactionResult result2 = prepareResult(ANY_ID + "x", ANY_TEXT_2, ANY_TEXT_1);
TransactionResult result3 = prepareResult(ANY_ID + "x", ANY_TEXT_3, ANY_TEXT_1);
Snapshot.Key key1 = new Snapshot.Key(scan, result1);
Snapshot.Key key2 = new Snapshot.Key(scan, result2);
Snapshot.Key key3 = new Snapshot.Key(scan, result3);
snapshot.putIntoScanSet(
scan, Maps.newLinkedHashMap(ImmutableMap.of(key1, result1, key2, result2, key3, result3)));

// Simulate that the first and third records were updated by another transaction
Scanner scanner = mock(Scanner.class);
when(scanner.one()).thenReturn(Optional.of(result2)).thenReturn(Optional.empty());

DistributedStorage storage = mock(DistributedStorage.class);
Scan scanWithProjections =
Scan.newBuilder(scan).projections(Attribute.ID, ANY_NAME_1, ANY_NAME_2).limit(0).build();
when(storage.scan(scanWithProjections)).thenReturn(scanner);

// Act Assert
assertThatThrownBy(() -> snapshot.toSerializable(storage))
.isInstanceOf(ValidationConflictException.class);

// Assert
verify(storage).scan(scanWithProjections);
}

@Test
public void
toSerializable_ScanWithIndexInScanSet_WhenUpdatingRecordsByMyself_ShouldProcessWithoutExceptions()
throws ExecutionException {
// Arrange
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
Scan scan = prepareScanWithIndex();
TransactionResult result1 = prepareResult(ANY_ID + "x", ANY_TEXT_1, ANY_TEXT_1);
TransactionResult result2 = prepareResult(ANY_ID + "x", ANY_TEXT_2, ANY_TEXT_1);
TransactionResult result3 = prepareResult(ANY_ID + "x", ANY_TEXT_3, ANY_TEXT_1);
Snapshot.Key key1 = new Snapshot.Key(scan, result1);
Snapshot.Key key2 = new Snapshot.Key(scan, result2);
Snapshot.Key key3 = new Snapshot.Key(scan, result3);
snapshot.putIntoScanSet(
scan, Maps.newLinkedHashMap(ImmutableMap.of(key1, result1, key2, result2, key3, result3)));

// Simulate that the first and third records were updated by myself
snapshot.putIntoWriteSet(key1, preparePut(ANY_TEXT_1, ANY_TEXT_1));
snapshot.putIntoWriteSet(key3, preparePut(ANY_TEXT_3, ANY_TEXT_1));
Scanner scanner = mock(Scanner.class);
when(scanner.one()).thenReturn(Optional.of(result2)).thenReturn(Optional.empty());

DistributedStorage storage = mock(DistributedStorage.class);
Scan scanWithProjections =
Scan.newBuilder(scan).projections(Attribute.ID, ANY_NAME_1, ANY_NAME_2).limit(0).build();
when(storage.scan(scanWithProjections)).thenReturn(scanner);

// Act Assert
assertThatCode(() -> snapshot.toSerializable(storage)).doesNotThrowAnyException();

// Assert
verify(storage).scan(scanWithProjections);
}

@Test
public void
toSerializable_ScanWithIndexInScanSet_WhenDeletingRecordsByMyself_ShouldProcessWithoutExceptions()
throws ExecutionException {
// Arrange
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
Scan scan = prepareScanWithIndex();
TransactionResult result1 = prepareResult(ANY_ID + "x", ANY_TEXT_1, ANY_TEXT_1);
TransactionResult result2 = prepareResult(ANY_ID + "x", ANY_TEXT_2, ANY_TEXT_1);
TransactionResult result3 = prepareResult(ANY_ID + "x", ANY_TEXT_3, ANY_TEXT_1);
Snapshot.Key key1 = new Snapshot.Key(scan, result1);
Snapshot.Key key2 = new Snapshot.Key(scan, result2);
Snapshot.Key key3 = new Snapshot.Key(scan, result3);
snapshot.putIntoScanSet(
scan, Maps.newLinkedHashMap(ImmutableMap.of(key1, result1, key2, result2, key3, result3)));

// Simulate that the first and third records were deleted by myself
snapshot.putIntoDeleteSet(key1, prepareDelete(ANY_TEXT_1, ANY_TEXT_1));
snapshot.putIntoDeleteSet(key3, prepareDelete(ANY_TEXT_3, ANY_TEXT_1));
Scanner scanner = mock(Scanner.class);
when(scanner.one()).thenReturn(Optional.of(result2)).thenReturn(Optional.empty());

DistributedStorage storage = mock(DistributedStorage.class);
Scan scanWithProjections =
Scan.newBuilder(scan).projections(Attribute.ID, ANY_NAME_1, ANY_NAME_2).limit(0).build();
when(storage.scan(scanWithProjections)).thenReturn(scanner);

// Act Assert
assertThatCode(() -> snapshot.toSerializable(storage)).doesNotThrowAnyException();

// Assert
verify(storage).scan(scanWithProjections);
}

@Test
public void toSerializable_ScannerSetNotChanged_ShouldProcessWithoutExceptions()
throws ExecutionException {
Expand Down
Loading