Skip to content

Commit 1c2dadb

Browse files
Backport to branch(3) : Prevent overwriting read set in Consensus Commit (#2799)
Co-authored-by: Toshihiro Suzuki <[email protected]>
1 parent 58e1044 commit 1c2dadb

File tree

3 files changed

+131
-9
lines changed

3 files changed

+131
-9
lines changed

core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -164,11 +164,10 @@ Optional<TransactionResult> read(@Nullable Snapshot.Key key, Get get) throws Cru
164164
}
165165

166166
if (result.isPresent() || get.getConjunctions().isEmpty()) {
167-
// Keep the read set latest to create before image by using the latest record (result)
168-
// because another conflicting transaction might have updated the record after this
169-
// transaction read it first. However, we update it only if a get operation has no
170-
// conjunction or the result exists. This is because we don’t know whether the record
171-
// actually exists or not due to the conjunction.
167+
// We put the result into the read set only if a get operation has no conjunction or the
168+
// result exists. This is because we don’t know whether the record actually exists or not
169+
// due to the conjunction.
170+
172171
if (key != null) {
173172
putIntoReadSetInSnapshot(key, result);
174173
} else {
@@ -277,9 +276,6 @@ private Optional<TransactionResult> processScanResult(
277276
}
278277

279278
if (ret.isPresent()) {
280-
// We always update the read set to create before image by using the latest record (result)
281-
// because another conflicting transaction might have updated the record after this
282-
// transaction read it first.
283279
putIntoReadSetInSnapshot(key, ret);
284280
}
285281

@@ -319,7 +315,7 @@ public void closeScanners() throws CrudException {
319315

320316
private void putIntoReadSetInSnapshot(Snapshot.Key key, Optional<TransactionResult> result) {
321317
// In read-only mode, we don't need to put the result into the read set
322-
if (!readOnly) {
318+
if (!readOnly && !snapshot.containsKeyInReadSet(key)) {
323319
snapshot.putIntoReadSet(key, result);
324320
}
325321
}

core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,45 @@ public void get_ForNonExistingTable_ShouldThrowIllegalArgumentException()
541541
assertThatThrownBy(() -> handler.get(get)).isInstanceOf(IllegalArgumentException.class);
542542
}
543543

544+
@Test
545+
public void get_DifferentGetButSameRecordReturned_ShouldNotOverwriteReadSet()
546+
throws ExecutionException, CrudException {
547+
// Arrange
548+
Get get1 = prepareGet();
549+
Get get2 = Get.newBuilder(get1).where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)).build();
550+
Get getForStorage1 = toGetForStorageFrom(get1);
551+
Get getForStorage2 =
552+
Get.newBuilder(get2)
553+
.clearProjections()
554+
.projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames())
555+
.clearConditions()
556+
.where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3))
557+
.or(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3))
558+
.consistency(Consistency.LINEARIZABLE)
559+
.build();
560+
Result result = prepareResult(TransactionState.COMMITTED);
561+
Optional<TransactionResult> expected = Optional.of(new TransactionResult(result));
562+
Snapshot.Key key = new Snapshot.Key(getForStorage1);
563+
when(snapshot.getResult(any(), any())).thenReturn(expected).thenReturn(expected);
564+
when(snapshot.containsKeyInReadSet(key)).thenReturn(false).thenReturn(true);
565+
when(storage.get(any())).thenReturn(Optional.of(result));
566+
567+
// Act
568+
Optional<Result> results1 = handler.get(get1);
569+
Optional<Result> results2 = handler.get(get2);
570+
571+
// Assert
572+
assertThat(results1)
573+
.isEqualTo(
574+
Optional.of(
575+
new FilteredResult(
576+
expected.get(), Collections.emptyList(), TABLE_METADATA, false)));
577+
assertThat(results2).isEqualTo(results1);
578+
verify(storage).get(getForStorage1);
579+
verify(storage).get(getForStorage2);
580+
verify(snapshot).putIntoReadSet(key, expected);
581+
}
582+
544583
@ParameterizedTest
545584
@EnumSource(ScanType.class)
546585
void scanOrGetScanner_ResultGivenFromStorage_ShouldUpdateSnapshotAndReturn(ScanType scanType)

integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.scalar.db.transaction.consensuscommit;
22

33
import static com.scalar.db.api.ConditionBuilder.column;
4+
import static com.scalar.db.api.ConditionBuilder.updateIf;
45
import static org.assertj.core.api.Assertions.assertThat;
56
import static org.assertj.core.api.Assertions.assertThatCode;
67
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -45,6 +46,7 @@
4546
import com.scalar.db.exception.transaction.CrudConflictException;
4647
import com.scalar.db.exception.transaction.CrudException;
4748
import com.scalar.db.exception.transaction.PreparationConflictException;
49+
import com.scalar.db.exception.transaction.RollbackException;
4850
import com.scalar.db.exception.transaction.TransactionException;
4951
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
5052
import com.scalar.db.io.DataType;
@@ -6073,6 +6075,91 @@ public void getScanner_InReadOnlyMode_WithSerializable_ShouldNotThrowAnyExceptio
60736075
assertThat(results.get(1).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE);
60746076
}
60756077

6078+
@Test
6079+
public void
6080+
commit_ConflictingExternalUpdate_DifferentGetButSameRecordReturned_ShouldThrowCommitConflictExceptionAndPreserveExternalChanges()
6081+
throws UnknownTransactionStatusException, CrudException, RollbackException {
6082+
// Arrange
6083+
manager.insert(
6084+
Insert.newBuilder()
6085+
.namespace(namespace1)
6086+
.table(TABLE_1)
6087+
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
6088+
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
6089+
.intValue(BALANCE, INITIAL_BALANCE)
6090+
.build());
6091+
6092+
// Act Assert
6093+
DistributedTransaction transaction = manager.begin();
6094+
6095+
// Retrieve the record
6096+
Optional<Result> result =
6097+
transaction.get(
6098+
Get.newBuilder()
6099+
.namespace(namespace1)
6100+
.table(TABLE_1)
6101+
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
6102+
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
6103+
.build());
6104+
6105+
assertThat(result).isPresent();
6106+
assertThat(result.get().getInt(ACCOUNT_ID)).isEqualTo(0);
6107+
assertThat(result.get().getInt(ACCOUNT_TYPE)).isEqualTo(0);
6108+
assertThat(result.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE);
6109+
6110+
// Update the balance of the record
6111+
transaction.update(
6112+
Update.newBuilder()
6113+
.namespace(namespace1)
6114+
.table(TABLE_1)
6115+
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
6116+
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
6117+
.condition(updateIf(column(BALANCE).isEqualToInt(INITIAL_BALANCE)).build())
6118+
.intValue(BALANCE, 100)
6119+
.build());
6120+
6121+
// Update the balance of the record by another transaction
6122+
manager.update(
6123+
Update.newBuilder()
6124+
.namespace(namespace1)
6125+
.table(TABLE_1)
6126+
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
6127+
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
6128+
.intValue(BALANCE, 200)
6129+
.build());
6130+
6131+
// Retrieve the record again, but use a different Get object (with a where clause)
6132+
result =
6133+
transaction.get(
6134+
Get.newBuilder()
6135+
.namespace(namespace1)
6136+
.table(TABLE_1)
6137+
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
6138+
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
6139+
.where(column(BALANCE).isEqualToInt(200))
6140+
.build());
6141+
6142+
assertThat(result).isNotPresent();
6143+
6144+
assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class);
6145+
transaction.rollback();
6146+
6147+
// Assert
6148+
result =
6149+
manager.get(
6150+
Get.newBuilder()
6151+
.namespace(namespace1)
6152+
.table(TABLE_1)
6153+
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
6154+
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
6155+
.build());
6156+
6157+
assertThat(result).isPresent();
6158+
assertThat(result.get().getInt(ACCOUNT_ID)).isEqualTo(0);
6159+
assertThat(result.get().getInt(ACCOUNT_TYPE)).isEqualTo(0);
6160+
assertThat(result.get().getInt(BALANCE)).isEqualTo(200);
6161+
}
6162+
60766163
@Test
60776164
public void manager_get_GetGivenForCommittedRecord_WithSerializable_ShouldReturnRecord()
60786165
throws TransactionException {

0 commit comments

Comments
 (0)