Skip to content

Commit 977c6af

Browse files
authored
Merge branch 'master' into fix/data-loader/logger-rename
2 parents 570ab38 + 63520eb commit 977c6af

File tree

5 files changed

+373
-32
lines changed

5 files changed

+373
-32
lines changed

.github/dependabot.yml

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ updates:
99
directory: "/"
1010
schedule:
1111
interval: "weekly"
12-
reviewers:
13-
- "scalar-labs/scalardb"
1412
ignore:
1513
- dependency-name: "*"
1614
update-types: [ "version-update:semver-major" ]
@@ -26,8 +24,6 @@ updates:
2624
directory: "/"
2725
schedule:
2826
interval: "weekly"
29-
reviewers:
30-
- "scalar-labs/scalardb"
3127

3228
- package-ecosystem: "github-actions"
3329
target-branch: "3"
@@ -38,8 +34,6 @@ updates:
3834
directory: "/"
3935
schedule:
4036
interval: "weekly"
41-
reviewers:
42-
- "scalar-labs/scalardb"
4337

4438
- package-ecosystem: "github-actions"
4539
target-branch: "3.15"
@@ -50,8 +44,6 @@ updates:
5044
directory: "/"
5145
schedule:
5246
interval: "weekly"
53-
reviewers:
54-
- "scalar-labs/scalardb"
5547

5648
- package-ecosystem: "github-actions"
5749
target-branch: "3.14"
@@ -62,8 +54,6 @@ updates:
6254
directory: "/"
6355
schedule:
6456
interval: "weekly"
65-
reviewers:
66-
- "scalar-labs/scalardb"
6757

6858
- package-ecosystem: "github-actions"
6959
target-branch: "3.13"
@@ -74,8 +64,6 @@ updates:
7464
directory: "/"
7565
schedule:
7666
interval: "weekly"
77-
reviewers:
78-
- "scalar-labs/scalardb"
7967

8068
- package-ecosystem: "github-actions"
8169
target-branch: "3.12"
@@ -86,8 +74,6 @@ updates:
8674
directory: "/"
8775
schedule:
8876
interval: "weekly"
89-
reviewers:
90-
- "scalar-labs/scalardb"
9177

9278
- package-ecosystem: "github-actions"
9379
target-branch: "3.11"
@@ -98,5 +84,3 @@ updates:
9884
directory: "/"
9985
schedule:
10086
interval: "weekly"
101-
reviewers:
102-
- "scalar-labs/scalardb"
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
name: Assign reviewer to dependabot PRs
2+
# This action assigns the ScalarDB team as reviewers when Dependabot opens a pull request.
3+
4+
on:
5+
pull_request:
6+
types: [opened]
7+
8+
jobs:
9+
assign-reviewer:
10+
runs-on: ubuntu-latest
11+
if: github.actor == 'dependabot[bot]'
12+
13+
permissions:
14+
pull-requests: write
15+
16+
steps:
17+
- name: Assign ScalarDB team as reviewers for Dependabot PRs
18+
env:
19+
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
20+
PR_NUMBER: ${{ github.event.pull_request.number }}
21+
REPOSITORY: ${{ github.repository }}
22+
run: gh pr edit $PR_NUMBER --repo $REPOSITORY --add-reviewer scalar-labs/scalardb
23+

core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java

Lines changed: 56 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.google.common.base.MoreObjects;
88
import com.google.common.collect.ComparisonChain;
99
import com.google.common.collect.Iterators;
10+
import com.scalar.db.api.ConditionSetBuilder;
1011
import com.scalar.db.api.Delete;
1112
import com.scalar.db.api.DistributedStorage;
1213
import com.scalar.db.api.Get;
@@ -43,6 +44,7 @@
4344
import java.util.Set;
4445
import java.util.concurrent.ConcurrentHashMap;
4546
import java.util.concurrent.ConcurrentMap;
47+
import java.util.stream.Collectors;
4648
import javax.annotation.Nonnull;
4749
import javax.annotation.concurrent.Immutable;
4850
import javax.annotation.concurrent.NotThreadSafe;
@@ -489,24 +491,20 @@ void toSerializable(DistributedStorage storage)
489491
// Get set is re-validated to check if there is no anti-dependency
490492
for (Map.Entry<Get, Optional<TransactionResult>> entry : getSet.entrySet()) {
491493
Get get = entry.getKey();
492-
Key key = new Key(get);
493-
if (writeSet.containsKey(key) || deleteSet.containsKey(key)) {
494-
continue;
495-
}
496494

497-
tasks.add(
498-
() -> {
499-
Optional<TransactionResult> originalResult = getSet.get(get);
500-
// Only get the tx_id column because we use only them to compare
501-
get.clearProjections();
502-
get.withProjection(Attribute.ID);
495+
if (ScalarDbUtils.isSecondaryIndexSpecified(get, getTableMetadata(get))) {
496+
// For Get with index
497+
tasks.add(() -> validateGetWithIndexResult(storage, get, entry.getValue()));
498+
} else {
499+
// For other Get
500+
501+
Key key = new Key(get);
502+
if (writeSet.containsKey(key) || deleteSet.containsKey(key)) {
503+
continue;
504+
}
503505

504-
// Check if a read record is not changed
505-
Optional<TransactionResult> latestResult = storage.get(get).map(TransactionResult::new);
506-
if (isChanged(latestResult, originalResult)) {
507-
throwExceptionDueToAntiDependency();
508-
}
509-
});
506+
tasks.add(() -> validateGetResult(storage, get, entry.getValue()));
507+
}
510508
}
511509

512510
parallelExecutor.validate(tasks, getId());
@@ -631,6 +629,48 @@ private void validateScanResults(
631629
}
632630
}
633631

632+
private void validateGetWithIndexResult(
633+
DistributedStorage storage, Get get, Optional<TransactionResult> originalResult)
634+
throws ExecutionException, ValidationConflictException {
635+
assert get.forNamespace().isPresent() && get.forTable().isPresent();
636+
637+
// If this transaction or another transaction inserts records into the index range,
638+
// the Get with index operation may retrieve multiple records, which would result in
639+
// an IllegalArgumentException. Therefore, we use Scan with index instead.
640+
Scan scanWithIndex =
641+
Scan.newBuilder()
642+
.namespace(get.forNamespace().get())
643+
.table(get.forTable().get())
644+
.indexKey(get.getPartitionKey())
645+
.whereOr(
646+
get.getConjunctions().stream()
647+
.map(c -> ConditionSetBuilder.andConditionSet(c.getConditions()).build())
648+
.collect(Collectors.toSet()))
649+
.consistency(get.getConsistency())
650+
.attributes(get.getAttributes())
651+
.build();
652+
653+
LinkedHashMap<Key, TransactionResult> results = new LinkedHashMap<>(1);
654+
originalResult.ifPresent(r -> results.put(new Snapshot.Key(scanWithIndex, r), r));
655+
656+
// Validate the result to check if there is no anti-dependency
657+
validateScanResults(storage, scanWithIndex, results);
658+
}
659+
660+
private void validateGetResult(
661+
DistributedStorage storage, Get get, Optional<TransactionResult> originalResult)
662+
throws ExecutionException, ValidationConflictException {
663+
// Only get the tx_id column because we use only them to compare
664+
get.clearProjections();
665+
get.withProjection(Attribute.ID);
666+
667+
// Check if a read record is not changed
668+
Optional<TransactionResult> latestResult = storage.get(get).map(TransactionResult::new);
669+
if (isChanged(latestResult, originalResult)) {
670+
throwExceptionDueToAntiDependency();
671+
}
672+
}
673+
634674
private TableMetadata getTableMetadata(Operation operation) throws ExecutionException {
635675
TransactionTableMetadata metadata = tableMetadataManager.getTransactionTableMetadata(operation);
636676
if (metadata == null) {

core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public class SnapshotTest {
8989
.addColumn(ANY_NAME_4, DataType.TEXT)
9090
.addPartitionKey(ANY_NAME_1)
9191
.addClusteringKey(ANY_NAME_2)
92+
.addSecondaryIndex(ANY_NAME_4)
9293
.build());
9394

9495
private Snapshot snapshot;
@@ -186,6 +187,15 @@ private Get prepareAnotherGet() {
186187
.forTable(ANY_TABLE_NAME);
187188
}
188189

190+
private Get prepareGetWithIndex() {
191+
Key indexKey = new Key(ANY_NAME_4, ANY_TEXT_1);
192+
return Get.newBuilder()
193+
.namespace(ANY_NAMESPACE_NAME)
194+
.table(ANY_TABLE_NAME)
195+
.indexKey(indexKey)
196+
.build();
197+
}
198+
189199
private Scan prepareScan() {
190200
Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1);
191201
Key clusteringKey = new Key(ANY_NAME_2, ANY_TEXT_2);
@@ -206,6 +216,15 @@ private Scan prepareScanWithLimit(int limit) {
206216
.build();
207217
}
208218

219+
private Scan prepareScanWithIndex() {
220+
Key indexKey = new Key(ANY_NAME_4, ANY_TEXT_1);
221+
return Scan.newBuilder()
222+
.namespace(ANY_NAMESPACE_NAME)
223+
.table(ANY_TABLE_NAME)
224+
.indexKey(indexKey)
225+
.build();
226+
}
227+
209228
private Scan prepareCrossPartitionScan() {
210229
return prepareCrossPartitionScan(ANY_NAMESPACE_NAME, ANY_TABLE_NAME);
211230
}
@@ -1010,6 +1029,83 @@ public void toSerializable_ReadSetExtended_ShouldThrowValidationConflictExceptio
10101029
verify(storage).get(getWithProjections);
10111030
}
10121031

1032+
@Test
1033+
public void toSerializable_GetSetWithGetWithIndex_ShouldProcessWithoutExceptions()
1034+
throws ExecutionException {
1035+
// Arrange
1036+
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
1037+
Get getWithIndex = prepareGetWithIndex();
1038+
TransactionResult txResult = prepareResult(ANY_ID + "x");
1039+
snapshot.putIntoGetSet(getWithIndex, Optional.of(txResult));
1040+
DistributedStorage storage = mock(DistributedStorage.class);
1041+
Scan scanWithIndex =
1042+
prepareScanWithIndex().withProjections(Arrays.asList(Attribute.ID, ANY_NAME_1, ANY_NAME_2));
1043+
Scanner scanner = mock(Scanner.class);
1044+
when(scanner.one()).thenReturn(Optional.of(txResult)).thenReturn(Optional.empty());
1045+
when(storage.scan(scanWithIndex)).thenReturn(scanner);
1046+
1047+
// Act Assert
1048+
assertThatCode(() -> snapshot.toSerializable(storage)).doesNotThrowAnyException();
1049+
1050+
// Assert
1051+
verify(storage).scan(scanWithIndex);
1052+
}
1053+
1054+
@Test
1055+
public void
1056+
toSerializable_GetSetWithGetWithIndex_RecordInserted_ShouldThrowValidationConflictException()
1057+
throws ExecutionException {
1058+
// Arrange
1059+
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
1060+
Get getWithIndex = prepareGetWithIndex();
1061+
TransactionResult result1 = prepareResult(ANY_ID + "x", ANY_TEXT_1, ANY_TEXT_2);
1062+
TransactionResult result2 = prepareResult(ANY_ID + "xx", ANY_TEXT_1, ANY_TEXT_3);
1063+
snapshot.putIntoGetSet(getWithIndex, Optional.of(result1));
1064+
DistributedStorage storage = mock(DistributedStorage.class);
1065+
Scan scanWithIndex =
1066+
prepareScanWithIndex().withProjections(Arrays.asList(Attribute.ID, ANY_NAME_1, ANY_NAME_2));
1067+
Scanner scanner = mock(Scanner.class);
1068+
when(scanner.one())
1069+
.thenReturn(Optional.of(result1))
1070+
.thenReturn(Optional.of(result2))
1071+
.thenReturn(Optional.empty());
1072+
when(storage.scan(scanWithIndex)).thenReturn(scanner);
1073+
1074+
// Act Assert
1075+
assertThatThrownBy(() -> snapshot.toSerializable(storage))
1076+
.isInstanceOf(ValidationConflictException.class);
1077+
1078+
// Assert
1079+
verify(storage).scan(scanWithIndex);
1080+
}
1081+
1082+
@Test
1083+
public void
1084+
toSerializable_GetSetWithGetWithIndex_RecordInsertedByMySelf_ShouldProcessWithoutExceptions()
1085+
throws ExecutionException {
1086+
// Arrange
1087+
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
1088+
Get getWithIndex = prepareGetWithIndex();
1089+
TransactionResult result1 = prepareResult(ANY_ID + "x", ANY_TEXT_1, ANY_TEXT_2);
1090+
TransactionResult result2 = prepareResult(ANY_ID, ANY_TEXT_1, ANY_TEXT_3);
1091+
snapshot.putIntoGetSet(getWithIndex, Optional.of(result1));
1092+
DistributedStorage storage = mock(DistributedStorage.class);
1093+
Scan scanWithIndex =
1094+
prepareScanWithIndex().withProjections(Arrays.asList(Attribute.ID, ANY_NAME_1, ANY_NAME_2));
1095+
Scanner scanner = mock(Scanner.class);
1096+
when(scanner.one())
1097+
.thenReturn(Optional.of(result1))
1098+
.thenReturn(Optional.of(result2))
1099+
.thenReturn(Optional.empty());
1100+
when(storage.scan(scanWithIndex)).thenReturn(scanner);
1101+
1102+
// Act Assert
1103+
assertThatCode(() -> snapshot.toSerializable(storage)).doesNotThrowAnyException();
1104+
1105+
// Assert
1106+
verify(storage).scan(scanWithIndex);
1107+
}
1108+
10131109
@Test
10141110
public void toSerializable_ScanSetNotChanged_ShouldProcessWithoutExceptions()
10151111
throws ExecutionException {

0 commit comments

Comments
 (0)