-
Notifications
You must be signed in to change notification settings - Fork 40
Handle Scan with Index correctly during validation logic in Consensus Commit #2808
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -640,6 +640,15 @@ private void validateScanResults( | |||||||||||
|
|
||||||||||||
| // Compare the records of the original scan results and the latest scan results | ||||||||||||
| if (!originalResultEntry.getKey().equals(key)) { | ||||||||||||
| if (writeSet.containsKey(originalResultEntry.getKey()) | ||||||||||||
| || deleteSet.containsKey(originalResultEntry.getKey())) { | ||||||||||||
| // The record is inserted/deleted/updated by this transaction | ||||||||||||
|
|
||||||||||||
| // Skip the record of the original scan results | ||||||||||||
| originalResultEntry = Iterators.getNext(originalResultIterator, null); | ||||||||||||
|
Comment on lines
+646
to
+648
|
||||||||||||
| // Skip the record of the original scan results | |
| originalResultEntry = Iterators.getNext(originalResultIterator, null); | |
| SkipResult skipResult = skipOwnWritesDeletes(latestResult, scanner, originalResultEntry, originalResultIterator, null); | |
| originalResultEntry = skipResult.getOriginalResultEntry(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto. We can also skip the key in writeSet or deleteSet.
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -244,14 +244,19 @@ private Scan prepareCrossPartitionScan(String namespace, String table) { | |||||||
| } | ||||||||
|
|
||||||||
| private Put preparePut() { | ||||||||
| Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1); | ||||||||
| Key clusteringKey = new Key(ANY_NAME_2, ANY_TEXT_2); | ||||||||
| return new Put(partitionKey, clusteringKey) | ||||||||
| .withConsistency(Consistency.LINEARIZABLE) | ||||||||
| .forNamespace(ANY_NAMESPACE_NAME) | ||||||||
| .forTable(ANY_TABLE_NAME) | ||||||||
| .withValue(ANY_NAME_3, ANY_TEXT_3) | ||||||||
| .withValue(ANY_NAME_4, ANY_TEXT_4); | ||||||||
| return preparePut(ANY_TEXT_1, ANY_TEXT_2); | ||||||||
| } | ||||||||
|
|
||||||||
| private Put preparePut(String partitionKeyColumnValue, String clusteringKeyColumnValue) { | ||||||||
| return Put.newBuilder() | ||||||||
| .namespace(ANY_NAMESPACE_NAME) | ||||||||
| .table(ANY_TABLE_NAME) | ||||||||
| .partitionKey(Key.ofText(ANY_NAME_1, partitionKeyColumnValue)) | ||||||||
| .clusteringKey(Key.ofText(ANY_NAME_2, clusteringKeyColumnValue)) | ||||||||
| .textValue(ANY_NAME_3, ANY_TEXT_3) | ||||||||
| .textValue(ANY_NAME_4, ANY_TEXT_4) | ||||||||
| .consistency(Consistency.LINEARIZABLE) | ||||||||
| .build(); | ||||||||
| } | ||||||||
|
|
||||||||
| private Put prepareAnotherPut() { | ||||||||
|
|
@@ -301,12 +306,17 @@ private Put preparePutForMergeTest() { | |||||||
| } | ||||||||
|
|
||||||||
| private Delete prepareDelete() { | ||||||||
| Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1); | ||||||||
| Key clusteringKey = new Key(ANY_NAME_2, ANY_TEXT_2); | ||||||||
| return new Delete(partitionKey, clusteringKey) | ||||||||
| .withConsistency(Consistency.LINEARIZABLE) | ||||||||
| .forNamespace(ANY_NAMESPACE_NAME) | ||||||||
| .forTable(ANY_TABLE_NAME); | ||||||||
| return prepareDelete(ANY_TEXT_1, ANY_TEXT_2); | ||||||||
| } | ||||||||
|
|
||||||||
| private Delete prepareDelete(String partitionKeyColumnValue, String clusteringKeyColumnValue) { | ||||||||
| return Delete.newBuilder() | ||||||||
| .namespace(ANY_NAMESPACE_NAME) | ||||||||
| .table(ANY_TABLE_NAME) | ||||||||
| .partitionKey(Key.ofText(ANY_NAME_1, partitionKeyColumnValue)) | ||||||||
| .clusteringKey(Key.ofText(ANY_NAME_2, clusteringKeyColumnValue)) | ||||||||
| .consistency(Consistency.LINEARIZABLE) | ||||||||
| .build(); | ||||||||
| } | ||||||||
|
|
||||||||
| private Delete prepareAnotherDelete() { | ||||||||
|
|
@@ -1086,7 +1096,7 @@ public void toSerializable_GetSetWithGetWithIndex_ShouldProcessWithoutExceptions | |||||||
|
|
||||||||
| @Test | ||||||||
| public void | ||||||||
| toSerializable_GetSetWithGetWithIndex_RecordInsertedByMySelf_ShouldProcessWithoutExceptions() | ||||||||
| toSerializable_GetSetWithGetWithIndex_RecordInsertedByMyself_ShouldProcessWithoutExceptions() | ||||||||
| throws ExecutionException { | ||||||||
| // Arrange | ||||||||
| snapshot = prepareSnapshot(Isolation.SERIALIZABLE); | ||||||||
|
|
@@ -1160,7 +1170,7 @@ public void toSerializable_ScanSetUpdated_ShouldThrowValidationConflictException | |||||||
| } | ||||||||
|
|
||||||||
| @Test | ||||||||
| public void toSerializable_ScanSetUpdatedByMySelf_ShouldProcessWithoutExceptions() | ||||||||
| public void toSerializable_ScanSetUpdatedByMyself_ShouldProcessWithoutExceptions() | ||||||||
| throws ExecutionException { | ||||||||
| // Arrange | ||||||||
| snapshot = prepareSnapshot(Isolation.SERIALIZABLE); | ||||||||
|
|
@@ -1237,7 +1247,7 @@ public void toSerializable_ScanSetExtended_ShouldThrowValidationConflictExceptio | |||||||
| } | ||||||||
|
|
||||||||
| @Test | ||||||||
| public void toSerializable_ScanSetExtendedByMySelf_ShouldProcessWithoutExceptions() | ||||||||
| public void toSerializable_ScanSetExtendedByMyself_ShouldProcessWithoutExceptions() | ||||||||
| throws ExecutionException { | ||||||||
| // Arrange | ||||||||
| snapshot = prepareSnapshot(Isolation.SERIALIZABLE); | ||||||||
|
|
@@ -1261,7 +1271,7 @@ public void toSerializable_ScanSetExtendedByMySelf_ShouldProcessWithoutException | |||||||
|
|
||||||||
| @Test | ||||||||
| public void | ||||||||
| toSerializable_ScanSetWithMultipleRecordsExtendedByMySelf_ShouldProcessWithoutExceptions() | ||||||||
| toSerializable_ScanSetWithMultipleRecordsExtendedByMyself_ShouldProcessWithoutExceptions() | ||||||||
| throws ExecutionException { | ||||||||
| // Arrange | ||||||||
| snapshot = prepareSnapshot(Isolation.SERIALIZABLE); | ||||||||
|
|
@@ -1526,7 +1536,7 @@ public void toSerializable_ScanWithLimitInScanSet_ShouldProcessWithoutExceptions | |||||||
|
|
||||||||
| @Test | ||||||||
| public void | ||||||||
| toSerializable_ScanWithLimitInScanSet_WhenInsertingFirstRecordIntoScanRangeByMySelf_ShouldProcessWithoutExceptions() | ||||||||
| toSerializable_ScanWithLimitInScanSet_WhenInsertingFirstRecordIntoScanRangeByMyself_ShouldProcessWithoutExceptions() | ||||||||
| throws ExecutionException { | ||||||||
| // Arrange | ||||||||
| snapshot = prepareSnapshot(Isolation.SERIALIZABLE); | ||||||||
|
|
@@ -1589,7 +1599,7 @@ public void toSerializable_ScanWithLimitInScanSet_ShouldProcessWithoutExceptions | |||||||
|
|
||||||||
| @Test | ||||||||
| public void | ||||||||
| toSerializable_ScanWithLimitInScanSet_WhenInsertingLastRecordIntoScanRangeByMySelf_ShouldProcessWithoutExceptions() | ||||||||
| toSerializable_ScanWithLimitInScanSet_WhenInsertingLastRecordIntoScanRangeByMyself_ShouldProcessWithoutExceptions() | ||||||||
| throws ExecutionException { | ||||||||
| // Arrange | ||||||||
| snapshot = prepareSnapshot(Isolation.SERIALIZABLE); | ||||||||
|
|
@@ -1619,6 +1629,107 @@ public void toSerializable_ScanWithLimitInScanSet_ShouldProcessWithoutExceptions | |||||||
| verify(storage).scan(scanWithProjectionsWithoutLimit); | ||||||||
| } | ||||||||
|
|
||||||||
| @Test | ||||||||
| public void | ||||||||
| toSerializable_ScanWithIndexInScanSet_WhenUpdatingRecords_ShouldThrowValidationConflictException() | ||||||||
| throws ExecutionException { | ||||||||
| // Arrange | ||||||||
| snapshot = prepareSnapshot(Isolation.SERIALIZABLE); | ||||||||
| Scan scan = prepareScanWithIndex(); | ||||||||
| TransactionResult result1 = prepareResult(ANY_ID + "x", ANY_TEXT_1, ANY_TEXT_1); | ||||||||
| TransactionResult result2 = prepareResult(ANY_ID + "x", ANY_TEXT_2, ANY_TEXT_1); | ||||||||
| TransactionResult result3 = prepareResult(ANY_ID + "x", ANY_TEXT_3, ANY_TEXT_1); | ||||||||
| Snapshot.Key key1 = new Snapshot.Key(scan, result1); | ||||||||
| Snapshot.Key key2 = new Snapshot.Key(scan, result2); | ||||||||
| Snapshot.Key key3 = new Snapshot.Key(scan, result3); | ||||||||
| snapshot.putIntoScanSet( | ||||||||
| scan, Maps.newLinkedHashMap(ImmutableMap.of(key1, result1, key2, result2, key3, result3))); | ||||||||
|
|
||||||||
| // Simulate that the first and third records were updated by another transaction | ||||||||
| Scanner scanner = mock(Scanner.class); | ||||||||
| when(scanner.one()).thenReturn(Optional.of(result2)).thenReturn(Optional.empty()); | ||||||||
|
|
||||||||
| DistributedStorage storage = mock(DistributedStorage.class); | ||||||||
| Scan scanWithProjections = | ||||||||
| Scan.newBuilder(scan).projections(Attribute.ID, ANY_NAME_1, ANY_NAME_2).limit(0).build(); | ||||||||
| when(storage.scan(scanWithProjections)).thenReturn(scanner); | ||||||||
|
|
||||||||
| // Act Assert | ||||||||
| assertThatThrownBy(() -> snapshot.toSerializable(storage)) | ||||||||
| .isInstanceOf(ValidationConflictException.class); | ||||||||
|
|
||||||||
| // Assert | ||||||||
| verify(storage).scan(scanWithProjections); | ||||||||
| } | ||||||||
|
|
||||||||
| @Test | ||||||||
| public void | ||||||||
| toSerializable_ScanWithIndexInScanSet_WhenUpdatingRecordsByMyself_ShouldProcessWithoutExceptions() | ||||||||
| throws ExecutionException { | ||||||||
| // Arrange | ||||||||
| snapshot = prepareSnapshot(Isolation.SERIALIZABLE); | ||||||||
| Scan scan = prepareScanWithIndex(); | ||||||||
| TransactionResult result1 = prepareResult(ANY_ID + "x", ANY_TEXT_1, ANY_TEXT_1); | ||||||||
| TransactionResult result2 = prepareResult(ANY_ID + "x", ANY_TEXT_2, ANY_TEXT_1); | ||||||||
| TransactionResult result3 = prepareResult(ANY_ID + "x", ANY_TEXT_3, ANY_TEXT_1); | ||||||||
| Snapshot.Key key1 = new Snapshot.Key(scan, result1); | ||||||||
| Snapshot.Key key2 = new Snapshot.Key(scan, result2); | ||||||||
| Snapshot.Key key3 = new Snapshot.Key(scan, result3); | ||||||||
| snapshot.putIntoScanSet( | ||||||||
| scan, Maps.newLinkedHashMap(ImmutableMap.of(key1, result1, key2, result2, key3, result3))); | ||||||||
|
|
||||||||
| // Simulate that the first and third records were updated by myself | ||||||||
| snapshot.putIntoWriteSet(key1, preparePut(ANY_TEXT_1, ANY_TEXT_1)); | ||||||||
|
||||||||
| snapshot.putIntoWriteSet(key1, preparePut(ANY_TEXT_1, ANY_TEXT_1)); | |
| snapshot.putIntoWriteSet(key1, preparePut(ANY_TEXT_1, ANY_TEXT_1)); | |
| snapshot.putIntoWriteSet(key2, preparePut(ANY_TEXT_2, ANY_TEXT_1)); |
Copilot
AI
Jun 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The deletion test’s comment says it simulates deletes of the first and second records by myself, but it only deletes key1 and key3. Add key2 to the deleteSet (or update the comment) so the test setup matches its description.
| snapshot.putIntoDeleteSet(key1, prepareDelete(ANY_TEXT_1, ANY_TEXT_1)); | |
| snapshot.putIntoDeleteSet(key1, prepareDelete(ANY_TEXT_1, ANY_TEXT_1)); | |
| snapshot.putIntoDeleteSet(key2, prepareDelete(ANY_TEXT_2, ANY_TEXT_1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also skip the key in writeSet or deleteSet.