Skip to content

Commit c01a3db

Browse files
authored
Merge branch 'master' into handle-get-and-scan-with-conjunctions-correcctly-in-consensus-commit
2 parents 4ec25d6 + 044443f commit c01a3db

File tree

15 files changed

+811
-266
lines changed

15 files changed

+811
-266
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
name: CI for Data Loader CLI
2+
3+
on:
4+
pull_request:
5+
workflow_dispatch:
6+
7+
jobs:
8+
data_loader_ci:
9+
name: Build Data Loader CLI
10+
runs-on: ubuntu-latest
11+
12+
steps:
13+
- uses: actions/checkout@v4
14+
15+
- name: Set up JDK 8
16+
uses: actions/setup-java@v4
17+
with:
18+
distribution: 'temurin'
19+
java-version: 8
20+
21+
- name: Setup Gradle
22+
uses: gradle/actions/setup-gradle@v4
23+
24+
- name: Gradle Data Loader CLI build
25+
run: ./gradlew :data-loader:cli:build --no-daemon --stacktrace
26+

core/src/main/java/com/scalar/db/storage/dynamo/DynamoAdmin.java

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ public class DynamoAdmin implements DistributedStorageAdmin {
9797
public static final String DEFAULT_NO_BACKUP = "false";
9898
public static final String DEFAULT_REQUEST_UNIT = "10";
9999
private static final int DEFAULT_WAITING_DURATION_SECS = 3;
100+
@VisibleForTesting static final int MAX_RETRY_COUNT = 10;
100101

101102
@VisibleForTesting static final String PARTITION_KEY = "concatenatedPartitionKey";
102103
@VisibleForTesting static final String CLUSTERING_KEY = "concatenatedClusteringKey";
@@ -238,15 +239,26 @@ public void createNamespace(String nonPrefixedNamespace, Map<String, String> opt
238239
private void upsertIntoNamespacesTable(Namespace namespace) throws ExecutionException {
239240
Map<String, AttributeValue> itemValues = new HashMap<>();
240241
itemValues.put(NAMESPACES_ATTR_NAME, AttributeValue.builder().s(namespace.prefixed()).build());
241-
try {
242-
client.putItem(
243-
PutItemRequest.builder()
244-
.tableName(ScalarDbUtils.getFullTableName(metadataNamespace, NAMESPACES_TABLE))
245-
.item(itemValues)
246-
.build());
247-
} catch (Exception e) {
248-
throw new ExecutionException(
249-
"Inserting the " + namespace + " namespace into the namespaces table failed", e);
242+
int retryCount = 0;
243+
while (true) {
244+
try {
245+
client.putItem(
246+
PutItemRequest.builder()
247+
.tableName(ScalarDbUtils.getFullTableName(metadataNamespace, NAMESPACES_TABLE))
248+
.item(itemValues)
249+
.build());
250+
return;
251+
} catch (ResourceNotFoundException e) {
252+
if (retryCount >= MAX_RETRY_COUNT) {
253+
throw new ExecutionException(
254+
"Inserting the " + namespace + " namespace into the namespaces table failed", e);
255+
}
256+
Uninterruptibles.sleepUninterruptibly(waitingDurationSecs, TimeUnit.SECONDS);
257+
retryCount++;
258+
} catch (Exception e) {
259+
throw new ExecutionException(
260+
"Inserting the " + namespace + " namespace into the namespaces table failed", e);
261+
}
250262
}
251263
}
252264

@@ -470,15 +482,28 @@ private void upsertTableMetadata(Namespace namespace, String table, TableMetadat
470482
METADATA_ATTR_SECONDARY_INDEX,
471483
AttributeValue.builder().ss(metadata.getSecondaryIndexNames()).build());
472484
}
473-
try {
474-
client.putItem(
475-
PutItemRequest.builder()
476-
.tableName(ScalarDbUtils.getFullTableName(metadataNamespace, METADATA_TABLE))
477-
.item(itemValues)
478-
.build());
479-
} catch (Exception e) {
480-
throw new ExecutionException(
481-
"Adding the metadata for the " + getFullTableName(namespace, table) + " table failed", e);
485+
int retryCount = 0;
486+
while (true) {
487+
try {
488+
client.putItem(
489+
PutItemRequest.builder()
490+
.tableName(ScalarDbUtils.getFullTableName(metadataNamespace, METADATA_TABLE))
491+
.item(itemValues)
492+
.build());
493+
return;
494+
} catch (ResourceNotFoundException e) {
495+
if (retryCount >= MAX_RETRY_COUNT) {
496+
throw new ExecutionException(
497+
"Adding the metadata for the " + getFullTableName(namespace, table) + " table failed",
498+
e);
499+
}
500+
Uninterruptibles.sleepUninterruptibly(waitingDurationSecs, TimeUnit.SECONDS);
501+
retryCount++;
502+
} catch (Exception e) {
503+
throw new ExecutionException(
504+
"Adding the metadata for the " + getFullTableName(namespace, table) + " table failed",
505+
e);
506+
}
482507
}
483508
}
484509

core/src/main/java/com/scalar/db/transaction/consensuscommit/AbstractMutationComposer.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,12 @@
33
import com.google.common.annotations.VisibleForTesting;
44
import com.google.common.collect.ImmutableList;
55
import com.scalar.db.api.Mutation;
6+
import com.scalar.db.api.PutBuilder;
7+
import com.scalar.db.api.TableMetadata;
8+
import com.scalar.db.io.DataType;
69
import java.util.ArrayList;
710
import java.util.List;
11+
import java.util.Set;
812
import javax.annotation.concurrent.NotThreadSafe;
913

1014
@NotThreadSafe
@@ -34,4 +38,50 @@ public AbstractMutationComposer(String id, TransactionTableMetadataManager table
3438
public List<Mutation> get() {
3539
return ImmutableList.copyOf(mutations);
3640
}
41+
42+
static void setBeforeImageColumnsToNull(
43+
PutBuilder.Buildable putBuilder,
44+
Set<String> beforeImageColumnNames,
45+
TableMetadata tableMetadata) {
46+
for (String beforeImageColumnName : beforeImageColumnNames) {
47+
DataType columnDataType = tableMetadata.getColumnDataType(beforeImageColumnName);
48+
switch (columnDataType) {
49+
case BOOLEAN:
50+
putBuilder.booleanValue(beforeImageColumnName, null);
51+
break;
52+
case INT:
53+
putBuilder.intValue(beforeImageColumnName, null);
54+
break;
55+
case BIGINT:
56+
putBuilder.bigIntValue(beforeImageColumnName, null);
57+
break;
58+
case FLOAT:
59+
putBuilder.floatValue(beforeImageColumnName, null);
60+
break;
61+
case DOUBLE:
62+
putBuilder.doubleValue(beforeImageColumnName, null);
63+
break;
64+
case TEXT:
65+
putBuilder.textValue(beforeImageColumnName, null);
66+
break;
67+
case BLOB:
68+
putBuilder.blobValue(beforeImageColumnName, (byte[]) null);
69+
break;
70+
case DATE:
71+
putBuilder.dateValue(beforeImageColumnName, null);
72+
break;
73+
case TIME:
74+
putBuilder.timeValue(beforeImageColumnName, null);
75+
break;
76+
case TIMESTAMP:
77+
putBuilder.timestampValue(beforeImageColumnName, null);
78+
break;
79+
case TIMESTAMPTZ:
80+
putBuilder.timestampTZValue(beforeImageColumnName, null);
81+
break;
82+
default:
83+
throw new AssertionError("Unknown data type: " + columnDataType);
84+
}
85+
}
86+
}
3787
}

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public void commit(Snapshot snapshot, boolean readOnly)
117117

118118
if (hasWritesOrDeletesInSnapshot) {
119119
try {
120-
prepare(snapshot);
120+
prepareRecords(snapshot);
121121
} catch (PreparationException e) {
122122
safelyCallOnFailureBeforeCommit(snapshot);
123123
abortState(snapshot.getId());
@@ -134,7 +134,7 @@ public void commit(Snapshot snapshot, boolean readOnly)
134134

135135
if (snapshot.hasReads()) {
136136
try {
137-
validate(snapshot);
137+
validateRecords(snapshot);
138138
} catch (ValidationException e) {
139139
safelyCallOnFailureBeforeCommit(snapshot);
140140

@@ -194,9 +194,19 @@ protected void handleCommitConflict(Snapshot snapshot, Exception cause)
194194
}
195195
}
196196

197-
public void prepare(Snapshot snapshot) throws PreparationException {
197+
public void prepareRecords(Snapshot snapshot) throws PreparationException {
198198
try {
199-
prepareRecords(snapshot);
199+
PrepareMutationComposer composer =
200+
new PrepareMutationComposer(snapshot.getId(), tableMetadataManager);
201+
snapshot.to(composer);
202+
PartitionedMutations mutations = new PartitionedMutations(composer.get());
203+
204+
ImmutableList<PartitionedMutations.Key> orderedKeys = mutations.getOrderedKeys();
205+
List<ParallelExecutorTask> tasks = new ArrayList<>(orderedKeys.size());
206+
for (PartitionedMutations.Key key : orderedKeys) {
207+
tasks.add(() -> storage.mutate(mutations.get(key)));
208+
}
209+
parallelExecutor.prepareRecords(tasks, snapshot.getId());
200210
} catch (NoMutationException e) {
201211
throw new PreparationConflictException(
202212
CoreError.CONSENSUS_COMMIT_PREPARING_RECORD_EXISTS.buildMessage(), e, snapshot.getId());
@@ -211,22 +221,7 @@ public void prepare(Snapshot snapshot) throws PreparationException {
211221
}
212222
}
213223

214-
private void prepareRecords(Snapshot snapshot)
215-
throws ExecutionException, PreparationConflictException {
216-
PrepareMutationComposer composer =
217-
new PrepareMutationComposer(snapshot.getId(), tableMetadataManager);
218-
snapshot.to(composer);
219-
PartitionedMutations mutations = new PartitionedMutations(composer.get());
220-
221-
ImmutableList<PartitionedMutations.Key> orderedKeys = mutations.getOrderedKeys();
222-
List<ParallelExecutorTask> tasks = new ArrayList<>(orderedKeys.size());
223-
for (PartitionedMutations.Key key : orderedKeys) {
224-
tasks.add(() -> storage.mutate(mutations.get(key)));
225-
}
226-
parallelExecutor.prepare(tasks, snapshot.getId());
227-
}
228-
229-
public void validate(Snapshot snapshot) throws ValidationException {
224+
public void validateRecords(Snapshot snapshot) throws ValidationException {
230225
try {
231226
// validation is executed when SERIALIZABLE is chosen.
232227
snapshot.toSerializable(storage);

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitMutationComposer.java

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,29 @@
11
package com.scalar.db.transaction.consensuscommit;
22

33
import static com.scalar.db.api.ConditionalExpression.Operator;
4+
import static com.scalar.db.transaction.consensuscommit.Attribute.COMMITTED_AT;
45
import static com.scalar.db.transaction.consensuscommit.Attribute.ID;
56
import static com.scalar.db.transaction.consensuscommit.Attribute.STATE;
67
import static com.scalar.db.transaction.consensuscommit.Attribute.toIdValue;
78
import static com.scalar.db.transaction.consensuscommit.Attribute.toStateValue;
89

910
import com.google.common.annotations.VisibleForTesting;
11+
import com.scalar.db.api.ConditionBuilder;
1012
import com.scalar.db.api.ConditionalExpression;
1113
import com.scalar.db.api.Consistency;
1214
import com.scalar.db.api.Delete;
1315
import com.scalar.db.api.DeleteIf;
1416
import com.scalar.db.api.Mutation;
1517
import com.scalar.db.api.Operation;
1618
import com.scalar.db.api.Put;
17-
import com.scalar.db.api.PutIf;
19+
import com.scalar.db.api.PutBuilder;
1820
import com.scalar.db.api.Selection;
21+
import com.scalar.db.api.TableMetadata;
1922
import com.scalar.db.api.TransactionState;
2023
import com.scalar.db.exception.storage.ExecutionException;
2124
import com.scalar.db.io.Key;
2225
import com.scalar.db.util.ScalarDbUtils;
26+
import java.util.LinkedHashSet;
2327
import java.util.Optional;
2428
import javax.annotation.Nullable;
2529
import javax.annotation.concurrent.NotThreadSafe;
@@ -82,17 +86,33 @@ private void add(Selection base, @Nullable TransactionResult result) throws Exec
8286

8387
private Put composePut(Operation base, @Nullable TransactionResult result)
8488
throws ExecutionException {
85-
return new Put(getPartitionKey(base, result), getClusteringKey(base, result).orElse(null))
86-
.forNamespace(base.forNamespace().get())
87-
.forTable(base.forTable().get())
88-
.withConsistency(Consistency.LINEARIZABLE)
89-
.withCondition(
90-
new PutIf(
91-
new ConditionalExpression(ID, toIdValue(id), Operator.EQ),
92-
new ConditionalExpression(
93-
STATE, toStateValue(TransactionState.PREPARED), Operator.EQ)))
94-
.withValue(Attribute.toCommittedAtValue(current))
95-
.withValue(Attribute.toStateValue(TransactionState.COMMITTED));
89+
PutBuilder.Buildable putBuilder =
90+
Put.newBuilder()
91+
.namespace(base.forNamespace().get())
92+
.table(base.forTable().get())
93+
.partitionKey(getPartitionKey(base, result))
94+
.condition(
95+
ConditionBuilder.putIf(ConditionBuilder.column(ID).isEqualToText(id))
96+
.and(
97+
ConditionBuilder.column(STATE)
98+
.isEqualToInt(TransactionState.PREPARED.get()))
99+
.build())
100+
.bigIntValue(COMMITTED_AT, current)
101+
.intValue(STATE, TransactionState.COMMITTED.get())
102+
.consistency(Consistency.LINEARIZABLE);
103+
getClusteringKey(base, result).ifPresent(putBuilder::clusteringKey);
104+
105+
// Set before image columns to null
106+
if (result != null) {
107+
TransactionTableMetadata transactionTableMetadata =
108+
tableMetadataManager.getTransactionTableMetadata(base);
109+
LinkedHashSet<String> beforeImageColumnNames =
110+
transactionTableMetadata.getBeforeImageColumnNames();
111+
TableMetadata tableMetadata = transactionTableMetadata.getTableMetadata();
112+
setBeforeImageColumnsToNull(putBuilder, beforeImageColumnNames, tableMetadata);
113+
}
114+
115+
return putBuilder.build();
96116
}
97117

98118
private Delete composeDelete(Operation base, @Nullable TransactionResult result)

core/src/main/java/com/scalar/db/transaction/consensuscommit/ParallelExecutor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public ParallelExecutor(ConsensusCommitConfig config) {
6060
this.parallelExecutorService = parallelExecutorService;
6161
}
6262

63-
public void prepare(List<ParallelExecutorTask> tasks, String transactionId)
63+
public void prepareRecords(List<ParallelExecutorTask> tasks, String transactionId)
6464
throws ExecutionException {
6565
try {
6666
// When parallel preparation is disabled, we stop running the tasks when one of them fails
@@ -85,7 +85,7 @@ public void prepare(List<ParallelExecutorTask> tasks, String transactionId)
8585
}
8686
}
8787

88-
public void validate(List<ParallelExecutorTask> tasks, String transactionId)
88+
public void validateRecords(List<ParallelExecutorTask> tasks, String transactionId)
8989
throws ExecutionException, ValidationConflictException {
9090
try {
9191
executeTasks(

core/src/main/java/com/scalar/db/transaction/consensuscommit/RollbackMutationComposer.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.scalar.db.api.Put;
1919
import com.scalar.db.api.PutBuilder;
2020
import com.scalar.db.api.Selection;
21+
import com.scalar.db.api.TableMetadata;
2122
import com.scalar.db.api.TransactionState;
2223
import com.scalar.db.exception.storage.ExecutionException;
2324
import com.scalar.db.io.Column;
@@ -77,8 +78,11 @@ private Put composePut(Operation base, TransactionResult result) throws Executio
7778
&& (result.getState().equals(TransactionState.PREPARED)
7879
|| result.getState().equals(TransactionState.DELETED));
7980

80-
TransactionTableMetadata metadata = tableMetadataManager.getTransactionTableMetadata(base);
81-
LinkedHashSet<String> beforeImageColumnNames = metadata.getBeforeImageColumnNames();
81+
TransactionTableMetadata transactionTableMetadata =
82+
tableMetadataManager.getTransactionTableMetadata(base);
83+
LinkedHashSet<String> beforeImageColumnNames =
84+
transactionTableMetadata.getBeforeImageColumnNames();
85+
TableMetadata tableMetadata = transactionTableMetadata.getTableMetadata();
8286

8387
List<Column<?>> columns = new ArrayList<>();
8488
result
@@ -98,9 +102,8 @@ private Put composePut(Operation base, TransactionResult result) throws Executio
98102
}
99103
});
100104

101-
Key partitionKey = ScalarDbUtils.getPartitionKey(result, metadata.getTableMetadata());
102-
Optional<Key> clusteringKey =
103-
ScalarDbUtils.getClusteringKey(result, metadata.getTableMetadata());
105+
Key partitionKey = ScalarDbUtils.getPartitionKey(result, tableMetadata);
106+
Optional<Key> clusteringKey = ScalarDbUtils.getClusteringKey(result, tableMetadata);
104107

105108
PutBuilder.Buildable putBuilder =
106109
Put.newBuilder()
@@ -115,6 +118,9 @@ private Put composePut(Operation base, TransactionResult result) throws Executio
115118
clusteringKey.ifPresent(putBuilder::clusteringKey);
116119
columns.forEach(putBuilder::value);
117120

121+
// Set before image columns to null
122+
setBeforeImageColumnsToNull(putBuilder, beforeImageColumnNames, tableMetadata);
123+
118124
return putBuilder.build();
119125
}
120126

core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ void toSerializable(DistributedStorage storage)
548548
}
549549
}
550550

551-
parallelExecutor.validate(tasks, getId());
551+
parallelExecutor.validateRecords(tasks, getId());
552552
}
553553

554554
/**

0 commit comments

Comments
 (0)