Skip to content

Commit 669501a

Browse files
committed
Apply suggestions
1 parent 7228c1a commit 669501a

File tree

9 files changed

+123
-185
lines changed

9 files changed

+123
-185
lines changed

core/src/main/java/com/scalar/db/common/CoreError.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -901,6 +901,12 @@ public enum CoreError implements ScalarDbError {
901901
"The value of the column %s in the primary key contains an illegal character.",
902902
"",
903903
""),
904+
OBJECT_STORAGE_CONDITION_NOT_SATISFIED(
905+
Category.USER_ERROR,
906+
"0258",
907+
"The specified condition is not satisfied. Conditional expression: '%s', Targeting column(s): '%s'",
908+
"",
909+
""),
904910

905911
//
906912
// Errors for the concurrency error category

core/src/main/java/com/scalar/db/storage/objectstorage/MutateStatementHandler.java

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,8 @@
88
import com.scalar.db.common.TableMetadataManager;
99
import com.scalar.db.exception.storage.ExecutionException;
1010
import com.scalar.db.exception.storage.RetriableExecutionException;
11-
import java.util.ArrayList;
1211
import java.util.Collections;
13-
import java.util.HashMap;
1412
import java.util.List;
15-
import java.util.Map;
1613

1714
public class MutateStatementHandler extends StatementHandler {
1815
public MutateStatementHandler(
@@ -33,23 +30,19 @@ public void handle(Mutation mutation) throws ExecutionException {
3330
}
3431

3532
public void handle(List<? extends Mutation> mutations) throws ExecutionException {
36-
Map<String, List<Mutation>> mutationPerPartition = new HashMap<>();
37-
for (Mutation mutation : mutations) {
38-
TableMetadata tableMetadata = metadataManager.getTableMetadata(mutation);
39-
ObjectStorageMutation objectStorageMutation =
40-
new ObjectStorageMutation(mutation, tableMetadata);
41-
String partitionKey = objectStorageMutation.getConcatenatedPartitionKey();
42-
String objectKey =
43-
ObjectStoragePartition.getObjectKey(
44-
getNamespace(mutation), getTable(mutation), partitionKey);
45-
mutationPerPartition.computeIfAbsent(objectKey, k -> new ArrayList<>()).add(mutation);
46-
}
47-
for (Map.Entry<String, List<Mutation>> entry : mutationPerPartition.entrySet()) {
48-
mutate(entry.getKey(), entry.getValue());
49-
}
33+
// mutations assumed to be for the same partition
34+
TableMetadata tableMetadata = metadataManager.getTableMetadata(mutations.get(0));
35+
ObjectStorageMutation objectStorageMutation =
36+
new ObjectStorageMutation(mutations.get(0), tableMetadata);
37+
String partitionKey = objectStorageMutation.getConcatenatedPartitionKey();
38+
String objectKey =
39+
ObjectStoragePartition.getObjectKey(
40+
getNamespace(mutations.get(0)), getTable(mutations.get(0)), partitionKey);
41+
mutate(objectKey, mutations);
5042
}
5143

52-
private void mutate(String objectKey, List<Mutation> mutations) throws ExecutionException {
44+
private void mutate(String objectKey, List<? extends Mutation> mutations)
45+
throws ExecutionException {
5346
ObjectStoragePartitionSnapshot snapshot = getPartition(objectKey);
5447
for (Mutation mutation : mutations) {
5548
TableMetadata tableMetadata = metadataManager.getTableMetadata(mutation);

core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageMutation.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.scalar.db.storage.objectstorage;
22

3-
import com.scalar.db.api.Delete;
43
import com.scalar.db.api.Mutation;
54
import com.scalar.db.api.Put;
65
import com.scalar.db.api.TableMetadata;
@@ -22,9 +21,7 @@ public class ObjectStorageMutation extends ObjectStorageOperation {
2221
public ObjectStorageRecord makeRecord() {
2322
Mutation mutation = (Mutation) getOperation();
2423

25-
if (mutation instanceof Delete) {
26-
throw new IllegalStateException("Delete mutation should not make a new record.");
27-
}
24+
assert mutation instanceof Put;
2825
Put put = (Put) getOperation();
2926

3027
return ObjectStorageRecord.newBuilder()
@@ -40,9 +37,7 @@ public ObjectStorageRecord makeRecord() {
4037
public ObjectStorageRecord makeRecord(ObjectStorageRecord existingRecord) {
4138
Mutation mutation = (Mutation) getOperation();
4239

43-
if (mutation instanceof Delete) {
44-
throw new IllegalStateException("Delete mutation should not make a new record.");
45-
}
40+
assert mutation instanceof Put;
4641
Put put = (Put) mutation;
4742

4843
Map<String, Object> newValues = new HashMap<>(existingRecord.getValues());

core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageOperation.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.scalar.db.storage.objectstorage;
22

3-
import com.google.common.base.Joiner;
43
import com.scalar.db.api.Operation;
54
import com.scalar.db.api.TableMetadata;
65
import com.scalar.db.io.Column;
@@ -58,20 +57,4 @@ public String getRecordId() {
5857
}
5958
return getConcatenatedPartitionKey();
6059
}
61-
62-
@SafeVarargs
63-
public final void checkArgument(Class<? extends Operation>... expected) {
64-
for (Class<? extends Operation> e : expected) {
65-
if (e.isInstance(operation)) {
66-
return;
67-
}
68-
}
69-
throw new IllegalArgumentException(
70-
Joiner.on(" ")
71-
.join(
72-
new String[] {
73-
operation.getClass().toString(), "is passed where something like",
74-
expected[0].toString(), "is expected"
75-
}));
76-
}
7760
}

core/src/main/java/com/scalar/db/storage/objectstorage/SelectStatementHandler.java

Lines changed: 25 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,10 @@
1616
import com.scalar.db.util.ScalarDbUtils;
1717
import java.util.ArrayList;
1818
import java.util.Collections;
19-
import java.util.HashSet;
19+
import java.util.Comparator;
2020
import java.util.Iterator;
2121
import java.util.List;
2222
import java.util.Optional;
23-
import java.util.Set;
2423
import java.util.stream.Collectors;
2524
import javax.annotation.Nonnull;
2625
import javax.annotation.concurrent.ThreadSafe;
@@ -35,19 +34,15 @@ public SelectStatementHandler(
3534
@Nonnull
3635
public Scanner handle(Selection selection) throws ExecutionException {
3736
TableMetadata tableMetadata = metadataManager.getTableMetadata(selection);
37+
if (ScalarDbUtils.isSecondaryIndexSpecified(selection, tableMetadata)) {
38+
throw new UnsupportedOperationException(
39+
CoreError.OBJECT_STORAGE_INDEX_NOT_SUPPORTED.buildMessage());
40+
}
3841
if (selection instanceof Get) {
39-
if (ScalarDbUtils.isSecondaryIndexSpecified(selection, tableMetadata)) {
40-
throw new UnsupportedOperationException(
41-
CoreError.OBJECT_STORAGE_INDEX_NOT_SUPPORTED.buildMessage());
42-
} else {
43-
return executeGet((Get) selection, tableMetadata);
44-
}
42+
return executeGet((Get) selection, tableMetadata);
4543
} else {
4644
if (selection instanceof ScanAll) {
4745
return executeScanAll((ScanAll) selection, tableMetadata);
48-
} else if (ScalarDbUtils.isSecondaryIndexSpecified(selection, tableMetadata)) {
49-
throw new UnsupportedOperationException(
50-
CoreError.OBJECT_STORAGE_INDEX_NOT_SUPPORTED.buildMessage());
5146
} else {
5247
return executeScan((Scan) selection, tableMetadata);
5348
}
@@ -56,7 +51,6 @@ public Scanner handle(Selection selection) throws ExecutionException {
5651

5752
private Scanner executeGet(Get get, TableMetadata metadata) throws ExecutionException {
5853
ObjectStorageOperation operation = new ObjectStorageOperation(get, metadata);
59-
operation.checkArgument(Get.class);
6054
ObjectStoragePartition partition =
6155
getPartition(getNamespace(get), getTable(get), operation.getConcatenatedPartitionKey());
6256
if (!partition.getRecord(operation.getRecordId()).isPresent()) {
@@ -71,18 +65,17 @@ private Scanner executeGet(Get get, TableMetadata metadata) throws ExecutionExce
7165

7266
private Scanner executeScan(Scan scan, TableMetadata metadata) throws ExecutionException {
7367
ObjectStorageOperation operation = new ObjectStorageOperation(scan, metadata);
74-
operation.checkArgument(Scan.class);
7568
ObjectStoragePartition partition =
7669
getPartition(getNamespace(scan), getTable(scan), operation.getConcatenatedPartitionKey());
7770
List<ObjectStorageRecord> records = new ArrayList<>(partition.getRecords().values());
7871

79-
records.sort(
80-
(o1, o2) ->
81-
new ClusteringKeyComparator(metadata)
82-
.compare(o1.getClusteringKey(), o2.getClusteringKey()));
72+
ClusteringKeyComparator clusteringKeyComparator = new ClusteringKeyComparator(metadata);
73+
Comparator<ObjectStorageRecord> cmp =
74+
Comparator.comparing(ObjectStorageRecord::getClusteringKey, clusteringKeyComparator);
8375
if (isReverseOrder(scan, metadata)) {
84-
Collections.reverse(records);
76+
cmp = cmp.reversed();
8577
}
78+
records.sort(cmp);
8679

8780
// If the scan is for DESC clustering order, use the end clustering key as a start key and the
8881
// start clustering key as an end key
@@ -118,16 +111,16 @@ private Scanner executeScan(Scan scan, TableMetadata metadata) throws ExecutionE
118111
}
119112

120113
private Scanner executeScanAll(ScanAll scan, TableMetadata metadata) throws ExecutionException {
121-
ObjectStorageOperation operation = new ObjectStorageOperation(scan, metadata);
122-
operation.checkArgument(ScanAll.class);
123-
Set<ObjectStorageRecord> records = getRecordsInTable(getNamespace(scan), getTable(scan));
124-
if (scan.getLimit() > 0) {
125-
records = records.stream().limit(scan.getLimit()).collect(Collectors.toSet());
114+
try {
115+
List<String> partitionKeys = getPartitionKeysInTable(getNamespace(scan), getTable(scan));
116+
StreamingRecordIterator iterator =
117+
new StreamingRecordIterator(wrapper, getNamespace(scan), getTable(scan), partitionKeys);
118+
return new ScannerImpl(
119+
iterator, new ResultInterpreter(scan.getProjections(), metadata), scan.getLimit());
120+
} catch (Exception e) {
121+
throw new ExecutionException(
122+
CoreError.OBJECT_STORAGE_ERROR_OCCURRED_IN_SELECTION.buildMessage(e.getMessage()), e);
126123
}
127-
return new ScannerImpl(
128-
records.iterator(),
129-
new ResultInterpreter(scan.getProjections(), metadata),
130-
scan.getLimit());
131124
}
132125

133126
private ObjectStoragePartition getPartition(
@@ -145,22 +138,13 @@ private ObjectStoragePartition getPartition(
145138
}
146139
}
147140

148-
private Set<ObjectStorageRecord> getRecordsInTable(String namespaceName, String tableName)
141+
private List<String> getPartitionKeysInTable(String namespaceName, String tableName)
149142
throws ExecutionException {
150143
try {
151-
Set<String> partitionKeys =
152-
wrapper.getKeys(ObjectStorageUtils.getObjectKey(namespaceName, tableName, "")).stream()
153-
.map(
154-
key ->
155-
key.substring(key.lastIndexOf(ObjectStorageUtils.OBJECT_KEY_DELIMITER) + 1))
156-
.filter(partition -> !partition.isEmpty())
157-
.collect(Collectors.toSet());
158-
Set<ObjectStorageRecord> records = new HashSet<>();
159-
for (String partitionKey : partitionKeys) {
160-
ObjectStoragePartition partition = getPartition(namespaceName, tableName, partitionKey);
161-
records.addAll(partition.getRecords().values());
162-
}
163-
return records;
144+
return wrapper.getKeys(ObjectStorageUtils.getObjectKey(namespaceName, tableName, "")).stream()
145+
.map(key -> key.substring(key.lastIndexOf(ObjectStorageUtils.OBJECT_KEY_DELIMITER) + 1))
146+
.filter(partition -> !partition.isEmpty())
147+
.collect(Collectors.toList());
164148
} catch (Exception e) {
165149
throw new ExecutionException(
166150
CoreError.OBJECT_STORAGE_ERROR_OCCURRED_IN_SELECTION.buildMessage(e.getMessage()), e);

core/src/main/java/com/scalar/db/storage/objectstorage/StatementHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.scalar.db.api.ConditionalExpression;
77
import com.scalar.db.api.Operation;
88
import com.scalar.db.api.TableMetadata;
9+
import com.scalar.db.common.CoreError;
910
import com.scalar.db.common.TableMetadataManager;
1011
import com.scalar.db.exception.storage.ExecutionException;
1112
import com.scalar.db.io.Column;
@@ -126,8 +127,7 @@ protected static void validateConditions(
126127
}
127128
if (validationFailed) {
128129
throw new ExecutionException(
129-
String.format(
130-
"A condition failed. ConditionalExpression: %s, Column: %s",
130+
CoreError.OBJECT_STORAGE_CONDITION_NOT_SATISFIED.buildMessage(
131131
expression, actualColumn));
132132
}
133133
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package com.scalar.db.storage.objectstorage;
2+
3+
import com.scalar.db.common.CoreError;
4+
import com.scalar.db.exception.storage.ExecutionException;
5+
import java.util.Collections;
6+
import java.util.Iterator;
7+
import java.util.List;
8+
import java.util.Optional;
9+
10+
/**
11+
* Iterator that streams records from partitions in a lazy manner, loading partitions on-demand
12+
* instead of loading all records into memory at once.
13+
*/
14+
public class StreamingRecordIterator implements Iterator<ObjectStorageRecord> {
15+
private final ObjectStorageWrapper wrapper;
16+
private final String namespaceName;
17+
private final String tableName;
18+
private final Iterator<String> partitionKeyIterator;
19+
private Iterator<ObjectStorageRecord> partitionRecordIterator;
20+
21+
public StreamingRecordIterator(
22+
ObjectStorageWrapper wrapper,
23+
String namespaceName,
24+
String tableName,
25+
List<String> partitionKeys) {
26+
this.wrapper = wrapper;
27+
this.namespaceName = namespaceName;
28+
this.tableName = tableName;
29+
this.partitionKeyIterator = partitionKeys.iterator();
30+
this.partitionRecordIterator = Collections.emptyIterator();
31+
}
32+
33+
@Override
34+
public boolean hasNext() {
35+
while (!partitionRecordIterator.hasNext() && partitionKeyIterator.hasNext()) {
36+
loadNextPartition();
37+
}
38+
return partitionRecordIterator.hasNext();
39+
}
40+
41+
@Override
42+
public ObjectStorageRecord next() {
43+
if (!hasNext()) {
44+
throw new java.util.NoSuchElementException();
45+
}
46+
return partitionRecordIterator.next();
47+
}
48+
49+
private void loadNextPartition() {
50+
try {
51+
String partitionKey = partitionKeyIterator.next();
52+
ObjectStoragePartition partition = getPartition(partitionKey);
53+
partitionRecordIterator = partition.getRecords().values().iterator();
54+
} catch (ExecutionException e) {
55+
throw new RuntimeException(
56+
CoreError.OBJECT_STORAGE_ERROR_OCCURRED_IN_SELECTION.buildMessage(e.getMessage()), e);
57+
}
58+
}
59+
60+
private ObjectStoragePartition getPartition(String partitionKey) throws ExecutionException {
61+
try {
62+
Optional<ObjectStorageWrapperResponse> response =
63+
wrapper.get(ObjectStorageUtils.getObjectKey(namespaceName, tableName, partitionKey));
64+
if (!response.isPresent()) {
65+
return new ObjectStoragePartition(Collections.emptyMap());
66+
}
67+
return ObjectStoragePartition.deserialize(response.get().getPayload());
68+
} catch (ObjectStorageWrapperException e) {
69+
throw new ExecutionException(
70+
CoreError.OBJECT_STORAGE_ERROR_OCCURRED_IN_SELECTION.buildMessage(e.getMessage()), e);
71+
}
72+
}
73+
}

0 commit comments

Comments
 (0)