Skip to content

Commit a45f7a9

Browse files
committed
Handle get with index correctly in CrudHandler
1 parent 63520eb commit a45f7a9

File tree

4 files changed

+264
-5
lines changed

4 files changed

+264
-5
lines changed

core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.List;
2929
import java.util.Optional;
3030
import java.util.stream.Collectors;
31+
import javax.annotation.Nullable;
3132
import javax.annotation.concurrent.NotThreadSafe;
3233
import org.slf4j.Logger;
3334
import org.slf4j.LoggerFactory;
@@ -76,17 +77,27 @@ public CrudHandler(
7677
public Optional<Result> get(Get originalGet) throws CrudException {
7778
List<String> originalProjections = new ArrayList<>(originalGet.getProjections());
7879
Get get = (Get) prepareStorageSelection(originalGet);
79-
Snapshot.Key key = new Snapshot.Key(get);
80-
readUnread(key, get);
8180

8281
TableMetadata metadata = getTableMetadata(get);
82+
83+
Snapshot.Key key;
84+
if (ScalarDbUtils.isSecondaryIndexSpecified(get, metadata)) {
85+
// In case of a Get with index, we don't know the key until we read the record
86+
key = null;
87+
} else {
88+
key = new Snapshot.Key(get);
89+
}
90+
91+
readUnread(key, get);
92+
8393
return snapshot
8494
.getResult(key, get)
8595
.map(r -> new FilteredResult(r, originalProjections, metadata, isIncludeMetadataEnabled));
8696
}
8797

98+
// Only for a Get with index, the argument `key` is null
8899
@VisibleForTesting
89-
void readUnread(Snapshot.Key key, Get get) throws CrudException {
100+
void readUnread(@Nullable Snapshot.Key key, Get get) throws CrudException {
90101
if (!snapshot.containsKeyInGetSet(get)) {
91102
read(key, get);
92103
}
@@ -95,7 +106,7 @@ void readUnread(Snapshot.Key key, Get get) throws CrudException {
95106
// Although this class is not thread-safe, this method is actually thread-safe, so we call it
96107
// concurrently in the implicit pre-read
97108
@VisibleForTesting
98-
void read(Snapshot.Key key, Get get) throws CrudException {
109+
void read(@Nullable Snapshot.Key key, Get get) throws CrudException {
99110
Optional<TransactionResult> result = getFromStorage(get);
100111
if (!result.isPresent() || result.get().isCommitted()) {
101112
if (result.isPresent() || get.getConjunctions().isEmpty()) {
@@ -104,7 +115,18 @@ void read(Snapshot.Key key, Get get) throws CrudException {
104115
// transaction read it first. However, we update it only if a get operation has no
105116
// conjunction or the result exists. This is because we don’t know whether the record
106117
// actually exists or not due to the conjunction.
107-
snapshot.putIntoReadSet(key, result);
118+
if (key != null) {
119+
snapshot.putIntoReadSet(key, result);
120+
} else {
121+
// Only for a Get with index, the argument `key` is null
122+
123+
if (result.isPresent()) {
124+
// Only when we can get the record with the Get with index, we can put it into the read
125+
// set
126+
key = new Snapshot.Key(get, result.get());
127+
snapshot.putIntoReadSet(key, result);
128+
}
129+
}
108130
}
109131
snapshot.putIntoGetSet(get, result); // for re-read and validation
110132
return;

core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -720,6 +720,13 @@ public Key(Get get) {
720720
this((Operation) get);
721721
}
722722

723+
public Key(Get get, Result result) {
724+
this.namespace = get.forNamespace().get();
725+
this.table = get.forTable().get();
726+
this.partitionKey = result.getPartitionKey().get();
727+
this.clusteringKey = result.getClusteringKey();
728+
}
729+
723730
public Key(Put put) {
724731
this((Operation) put);
725732
}

core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public class CrudHandlerTest {
5656
private static final String ANY_ID_2 = "id2";
5757
private static final String ANY_NAME_1 = "name1";
5858
private static final String ANY_NAME_2 = "name2";
59+
private static final String ANY_NAME_3 = "name3";
5960
private static final String ANY_TEXT_1 = "text1";
6061
private static final String ANY_TEXT_2 = "text2";
6162
private static final String ANY_TEXT_3 = "text3";
@@ -66,8 +67,10 @@ public class CrudHandlerTest {
6667
TableMetadata.newBuilder()
6768
.addColumn(ANY_NAME_1, DataType.TEXT)
6869
.addColumn(ANY_NAME_2, DataType.TEXT)
70+
.addColumn(ANY_NAME_3, DataType.TEXT)
6971
.addPartitionKey(ANY_NAME_1)
7072
.addClusteringKey(ANY_NAME_2)
73+
.addSecondaryIndex(ANY_NAME_3)
7174
.build());
7275
private static final TransactionTableMetadata TRANSACTION_TABLE_METADATA =
7376
new TransactionTableMetadata(TABLE_METADATA);
@@ -928,6 +931,7 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods()
928931

929932
// Assert
930933
verify(storage, never()).get(any());
934+
verify(snapshot, never()).putIntoReadSet(any(Snapshot.Key.class), any(Optional.class));
931935
verify(snapshot, never()).putIntoGetSet(any(Get.class), any(Optional.class));
932936
}
933937

@@ -1014,6 +1018,7 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods()
10141018
// Assert
10151019
verify(storage).get(any());
10161020
verify(snapshot).putIntoReadSet(key, Optional.of(new TransactionResult(result)));
1021+
verify(snapshot).putIntoGetSet(getForKey, Optional.of(new TransactionResult(result)));
10171022
}
10181023

10191024
@Test
@@ -1050,6 +1055,88 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods()
10501055
});
10511056
}
10521057

1058+
@Test
1059+
public void
1060+
readUnread_NullKeyAndGetWithIndexNotContainedInGetSet_EmptyResultReturnedByStorage_ShouldCallAppropriateMethods()
1061+
throws CrudException, ExecutionException {
1062+
// Arrange
1063+
Get getWithIndex =
1064+
Get.newBuilder()
1065+
.namespace(ANY_NAMESPACE_NAME)
1066+
.table(ANY_TABLE_NAME)
1067+
.indexKey(Key.ofText(ANY_NAME_3, ANY_TEXT_1))
1068+
.build();
1069+
when(snapshot.containsKeyInGetSet(getWithIndex)).thenReturn(false);
1070+
when(storage.get(any())).thenReturn(Optional.empty());
1071+
1072+
// Act
1073+
handler.readUnread(null, getWithIndex);
1074+
1075+
// Assert
1076+
verify(storage).get(any());
1077+
verify(snapshot, never()).putIntoReadSet(any(), any());
1078+
verify(snapshot).putIntoGetSet(getWithIndex, Optional.empty());
1079+
}
1080+
1081+
@Test
1082+
public void
1083+
readUnread_NullKeyAndGetWithIndexNotContainedInGetSet_CommittedRecordReturnedByStorage_ShouldCallAppropriateMethods()
1084+
throws CrudException, ExecutionException {
1085+
// Arrange
1086+
Result result = mock(Result.class);
1087+
when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.COMMITTED.get());
1088+
when(result.getPartitionKey()).thenReturn(Optional.of(Key.ofText(ANY_NAME_1, ANY_TEXT_1)));
1089+
when(result.getClusteringKey()).thenReturn(Optional.of(Key.ofText(ANY_NAME_2, ANY_TEXT_2)));
1090+
when(storage.get(any())).thenReturn(Optional.of(result));
1091+
1092+
Get getWithIndex =
1093+
Get.newBuilder()
1094+
.namespace(ANY_NAMESPACE_NAME)
1095+
.table(ANY_TABLE_NAME)
1096+
.indexKey(Key.ofText(ANY_NAME_3, ANY_TEXT_1))
1097+
.build();
1098+
when(snapshot.containsKeyInGetSet(getWithIndex)).thenReturn(false);
1099+
1100+
// Act
1101+
handler.readUnread(null, getWithIndex);
1102+
1103+
// Assert
1104+
verify(storage).get(any());
1105+
verify(snapshot)
1106+
.putIntoReadSet(
1107+
new Snapshot.Key(getWithIndex, result), Optional.of(new TransactionResult(result)));
1108+
verify(snapshot).putIntoGetSet(getWithIndex, Optional.of(new TransactionResult(result)));
1109+
}
1110+
1111+
@Test
1112+
public void
1113+
readUnread_NullKeyAndGetWithIndexNotContainedInGetSet_UncommittedRecordReturnedByStorage_ShouldThrowUncommittedRecordException()
1114+
throws ExecutionException {
1115+
// Arrange
1116+
Result result = mock(Result.class);
1117+
when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.PREPARED.get());
1118+
when(storage.get(any())).thenReturn(Optional.of(result));
1119+
1120+
Get getWithIndex =
1121+
Get.newBuilder()
1122+
.namespace(ANY_NAMESPACE_NAME)
1123+
.table(ANY_TABLE_NAME)
1124+
.indexKey(Key.ofText(ANY_NAME_3, ANY_TEXT_1))
1125+
.build();
1126+
when(snapshot.containsKeyInGetSet(getWithIndex)).thenReturn(false);
1127+
1128+
// Act Assert
1129+
assertThatThrownBy(() -> handler.readUnread(null, getWithIndex))
1130+
.isInstanceOf(UncommittedRecordException.class)
1131+
.satisfies(
1132+
e -> {
1133+
UncommittedRecordException exception = (UncommittedRecordException) e;
1134+
assertThat(exception.getSelection()).isEqualTo(getWithIndex);
1135+
assertThat(exception.getResults().size()).isEqualTo(1);
1136+
assertThat(exception.getResults().get(0)).isEqualTo(result);
1137+
});
1138+
}
1139+
10531140
@Test
10541141
public void readIfImplicitPreReadEnabled_ShouldCallAppropriateMethods() throws CrudException {
10551142
// Arrange

integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4351,6 +4351,149 @@ public void get_GetWithIndexGiven_WithSerializable_ShouldNotThrowAnyException()
43514351
assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class);
43524352
}
43534353

4354+
@Test
4355+
public void getAndUpdate_GetWithIndexGiven_ShouldUpdate() throws TransactionException {
4356+
// Arrange
4357+
manager.insert(
4358+
Insert.newBuilder()
4359+
.namespace(namespace1)
4360+
.table(TABLE_1)
4361+
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
4362+
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
4363+
.intValue(BALANCE, INITIAL_BALANCE)
4364+
.build());
4365+
4366+
// Act Assert
4367+
DistributedTransaction transaction = manager.begin();
4368+
Optional<Result> actual =
4369+
transaction.get(
4370+
Get.newBuilder()
4371+
.namespace(namespace1)
4372+
.table(TABLE_1)
4373+
.indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE))
4374+
.build());
4375+
4376+
assertThat(actual).isPresent();
4377+
assertThat(actual.get().getInt(ACCOUNT_ID)).isEqualTo(0);
4378+
assertThat(actual.get().getInt(ACCOUNT_TYPE)).isEqualTo(0);
4379+
assertThat(actual.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE);
4380+
4381+
transaction.update(
4382+
Update.newBuilder()
4383+
.namespace(namespace1)
4384+
.table(TABLE_1)
4385+
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
4386+
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
4387+
.intValue(BALANCE, 1)
4388+
.build());
4389+
4390+
transaction.commit();
4391+
4392+
transaction = manager.begin();
4393+
actual =
4394+
transaction.get(
4395+
Get.newBuilder()
4396+
.namespace(namespace1)
4397+
.table(TABLE_1)
4398+
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
4399+
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
4400+
.build());
4401+
transaction.commit();
4402+
4403+
assertThat(actual).isPresent();
4404+
assertThat(actual.get().getInt(ACCOUNT_ID)).isEqualTo(0);
4405+
assertThat(actual.get().getInt(ACCOUNT_TYPE)).isEqualTo(0);
4406+
assertThat(actual.get().getInt(BALANCE)).isEqualTo(1);
4407+
}
4408+
4409+
@Test
4410+
public void scanAndUpdate_ScanWithIndexGiven_ShouldUpdate() throws TransactionException {
4411+
// Arrange
4412+
manager.mutate(
4413+
Arrays.asList(
4414+
Insert.newBuilder()
4415+
.namespace(namespace1)
4416+
.table(TABLE_1)
4417+
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
4418+
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
4419+
.intValue(BALANCE, INITIAL_BALANCE)
4420+
.build(),
4421+
Insert.newBuilder()
4422+
.namespace(namespace1)
4423+
.table(TABLE_1)
4424+
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
4425+
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1))
4426+
.intValue(BALANCE, INITIAL_BALANCE)
4427+
.build()));
4428+
4429+
// Act Assert
4430+
DistributedTransaction transaction = manager.begin();
4431+
List<Result> actualResults =
4432+
transaction.scan(
4433+
Scan.newBuilder()
4434+
.namespace(namespace1)
4435+
.table(TABLE_1)
4436+
.indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE))
4437+
.build());
4438+
4439+
assertThat(actualResults).hasSize(2);
4440+
Set<Integer> expectedTypes = Sets.newHashSet(0, 1);
4441+
for (Result result : actualResults) {
4442+
assertThat(result.getInt(ACCOUNT_ID)).isEqualTo(0);
4443+
expectedTypes.remove(result.getInt(ACCOUNT_TYPE));
4444+
assertThat(result.getInt(BALANCE)).isEqualTo(INITIAL_BALANCE);
4445+
}
4446+
assertThat(expectedTypes).isEmpty();
4447+
4448+
transaction.update(
4449+
Update.newBuilder()
4450+
.namespace(namespace1)
4451+
.table(TABLE_1)
4452+
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
4453+
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
4454+
.intValue(BALANCE, 1)
4455+
.build());
4456+
transaction.update(
4457+
Update.newBuilder()
4458+
.namespace(namespace1)
4459+
.table(TABLE_1)
4460+
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
4461+
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1))
4462+
.intValue(BALANCE, 2)
4463+
.build());
4464+
4465+
transaction.commit();
4466+
4467+
transaction = manager.begin();
4468+
Optional<Result> actual1 =
4469+
transaction.get(
4470+
Get.newBuilder()
4471+
.namespace(namespace1)
4472+
.table(TABLE_1)
4473+
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
4474+
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
4475+
.build());
4476+
Optional<Result> actual2 =
4477+
transaction.get(
4478+
Get.newBuilder()
4479+
.namespace(namespace1)
4480+
.table(TABLE_1)
4481+
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
4482+
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1))
4483+
.build());
4484+
transaction.commit();
4485+
4486+
assertThat(actual1).isPresent();
4487+
assertThat(actual1.get().getInt(ACCOUNT_ID)).isEqualTo(0);
4488+
assertThat(actual1.get().getInt(ACCOUNT_TYPE)).isEqualTo(0);
4489+
assertThat(actual1.get().getInt(BALANCE)).isEqualTo(1);
4490+
4491+
assertThat(actual2).isPresent();
4492+
assertThat(actual2.get().getInt(ACCOUNT_ID)).isEqualTo(0);
4493+
assertThat(actual2.get().getInt(ACCOUNT_TYPE)).isEqualTo(1);
4494+
assertThat(actual2.get().getInt(BALANCE)).isEqualTo(2);
4495+
}
4496+
43544497
private DistributedTransaction prepareTransfer(
43554498
int fromId,
43564499
String fromNamespace,

0 commit comments

Comments
 (0)