-
Notifications
You must be signed in to change notification settings - Fork 40
Prevent overwriting read set in Consensus Commit #2797
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -164,11 +164,10 @@ Optional<TransactionResult> read(@Nullable Snapshot.Key key, Get get) throws Cru | |
| } | ||
|
|
||
| if (result.isPresent() || get.getConjunctions().isEmpty()) { | ||
| // Keep the read set latest to create before image by using the latest record (result) | ||
| // because another conflicting transaction might have updated the record after this | ||
| // transaction read it first. However, we update it only if a get operation has no | ||
| // conjunction or the result exists. This is because we don’t know whether the record | ||
| // actually exists or not due to the conjunction. | ||
| // We put the result into the read set only if a get operation has no conjunction or the | ||
| // result exists. This is because we don’t know whether the record actually exists or not | ||
| // due to the conjunction. | ||
|
|
||
| if (key != null) { | ||
| putIntoReadSetInSnapshot(key, result); | ||
| } else { | ||
|
|
@@ -277,9 +276,6 @@ private Optional<TransactionResult> processScanResult( | |
| } | ||
|
|
||
| if (ret.isPresent()) { | ||
| // We always update the read set to create before image by using the latest record (result) | ||
| // because another conflicting transaction might have updated the record after this | ||
| // transaction read it first. | ||
| putIntoReadSetInSnapshot(key, ret); | ||
| } | ||
|
|
||
|
|
@@ -319,7 +315,7 @@ public void closeScanners() throws CrudException { | |
|
|
||
| private void putIntoReadSetInSnapshot(Snapshot.Key key, Optional<TransactionResult> result) { | ||
| // In read-only mode, we don't need to put the result into the read set | ||
| if (!readOnly) { | ||
| if (!readOnly && !snapshot.containsKeyInReadSet(key)) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC, we can use the latest result if read-modify-write hasn't occurred on the key, like as follows? if (!readOnly && !(snapshot.containsKeyInReadSet(key) && snapshot.containsKeyInWriteOrDeleteSet(key)) {This question is mainly for confirmation and the current code looks good to me, though.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, what do you mean by
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for the unclear comment. My question was as follows:
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. We might be able to rescue such a transaction, but I'm really not sure if it's good for our snapshot definition. One approach is to make it a safer side (like the current PR) for now, and then improve it after the deep discussion and confirmation.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @komamitsu Indeed, we might be able to rescue such a transaction. However, since we don’t have enough time to test it, let’s make it the safer side for now as @jnmt suggested. Thanks! |
||
| snapshot.putIntoReadSet(key, result); | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| package com.scalar.db.transaction.consensuscommit; | ||
|
|
||
| import static com.scalar.db.api.ConditionBuilder.column; | ||
| import static com.scalar.db.api.ConditionBuilder.updateIf; | ||
| import static org.assertj.core.api.Assertions.assertThat; | ||
| import static org.assertj.core.api.Assertions.assertThatCode; | ||
| import static org.assertj.core.api.Assertions.assertThatThrownBy; | ||
|
|
@@ -45,6 +46,7 @@ | |
| import com.scalar.db.exception.transaction.CrudConflictException; | ||
| import com.scalar.db.exception.transaction.CrudException; | ||
| import com.scalar.db.exception.transaction.PreparationConflictException; | ||
| import com.scalar.db.exception.transaction.RollbackException; | ||
| import com.scalar.db.exception.transaction.TransactionException; | ||
| import com.scalar.db.exception.transaction.UnknownTransactionStatusException; | ||
| import com.scalar.db.io.DataType; | ||
|
|
@@ -5893,6 +5895,91 @@ public void getScanner_InReadOnlyMode_WithSerializable_ShouldNotThrowAnyExceptio | |
| assertThat(results.get(1).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); | ||
| } | ||
|
|
||
| @Test | ||
| public void | ||
| commit_ConflictingExternalUpdate_DifferentGetButSameRecordReturned_ShouldThrowCommitConflictExceptionAndPreserveExternalChanges() | ||
| throws UnknownTransactionStatusException, CrudException, RollbackException { | ||
| // Arrange | ||
| manager.insert( | ||
| Insert.newBuilder() | ||
| .namespace(namespace1) | ||
| .table(TABLE_1) | ||
| .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) | ||
| .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) | ||
| .intValue(BALANCE, INITIAL_BALANCE) | ||
| .build()); | ||
|
|
||
| // Act Assert | ||
| DistributedTransaction transaction = manager.begin(); | ||
|
|
||
| // Retrieve the record | ||
| Optional<Result> result = | ||
| transaction.get( | ||
| Get.newBuilder() | ||
| .namespace(namespace1) | ||
| .table(TABLE_1) | ||
| .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) | ||
| .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) | ||
| .build()); | ||
|
|
||
| assertThat(result).isPresent(); | ||
| assertThat(result.get().getInt(ACCOUNT_ID)).isEqualTo(0); | ||
| assertThat(result.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); | ||
| assertThat(result.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); | ||
|
|
||
| // Update the balance of the record | ||
| transaction.update( | ||
| Update.newBuilder() | ||
| .namespace(namespace1) | ||
| .table(TABLE_1) | ||
| .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) | ||
| .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) | ||
| .condition(updateIf(column(BALANCE).isEqualToInt(INITIAL_BALANCE)).build()) | ||
| .intValue(BALANCE, 100) | ||
| .build()); | ||
|
|
||
| // Update the balance of the record by another transaction | ||
| manager.update( | ||
| Update.newBuilder() | ||
| .namespace(namespace1) | ||
| .table(TABLE_1) | ||
| .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) | ||
| .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) | ||
| .intValue(BALANCE, 200) | ||
| .build()); | ||
|
|
||
| // Retrieve the record again, but use a different Get object (with a where clause) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| result = | ||
| transaction.get( | ||
| Get.newBuilder() | ||
| .namespace(namespace1) | ||
| .table(TABLE_1) | ||
| .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) | ||
| .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) | ||
| .where(column(BALANCE).isEqualToInt(200)) | ||
| .build()); | ||
|
|
||
| assertThat(result).isNotPresent(); | ||
|
|
||
| assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); | ||
| transaction.rollback(); | ||
|
|
||
| // Assert | ||
| result = | ||
| manager.get( | ||
| Get.newBuilder() | ||
| .namespace(namespace1) | ||
| .table(TABLE_1) | ||
| .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) | ||
| .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) | ||
| .build()); | ||
|
|
||
| assertThat(result).isPresent(); | ||
| assertThat(result.get().getInt(ACCOUNT_ID)).isEqualTo(0); | ||
| assertThat(result.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); | ||
| assertThat(result.get().getInt(BALANCE)).isEqualTo(200); | ||
| } | ||
|
|
||
| @Test | ||
| public void manager_get_GetGivenForCommittedRecord_WithSerializable_ShouldReturnRecord() | ||
| throws TransactionException { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, my bad. This part was a completely wrong idea. Probably, I was trying to avoid aborts as much as possible, but paid little attention to the case for reading a record multiple times. I need to think about how we can avoid these kinds of concurrency bugs…