Skip to content
This repository was archived by the owner on Oct 5, 2021. It is now read-only.

Commit 2b13e0c

Browse files
Alexander PatrikalakisAlexander Patrikalakis
authored andcommitted
Allow DynamoDbStoreTransaction to span multiple stores
1 parent 385fe05 commit 2b13e0c

File tree

6 files changed

+44
-70
lines changed

6 files changed

+44
-70
lines changed

src/main/java/com/amazon/janusgraph/diskstorage/dynamodb/AbstractDynamoDbStore.java

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
import java.util.Collections;
1818
import java.util.List;
19-
import java.util.concurrent.Callable;
2019
import java.util.concurrent.ExecutionException;
2120
import java.util.concurrent.TimeUnit;
2221

@@ -48,7 +47,6 @@
4847
import com.google.common.util.concurrent.ExecutionError;
4948
import com.google.common.util.concurrent.UncheckedExecutionException;
5049

51-
import lombok.RequiredArgsConstructor;
5250
import lombok.extern.slf4j.Slf4j;
5351

5452
/**
@@ -67,6 +65,10 @@ public abstract class AbstractDynamoDbStore implements AwsStore {
6765
@Getter
6866
private final String name;
6967
private final boolean forceConsistentRead;
68+
/**
69+
* The key column local lock cache maps key-column pairs to the DynamoDbStoreTransaction that first
70+
* acquired a lock on those key-column pairs.
71+
*/
7072
private final Cache<Pair<StaticBuffer, StaticBuffer>, DynamoDbStoreTransaction> keyColumnLocalLocks;
7173

7274
private enum ReportingRemovalListener implements RemovalListener<Pair<StaticBuffer, StaticBuffer>, DynamoDbStoreTransaction> {
@@ -152,28 +154,14 @@ public final void deleteStore() throws BackendException {
152154
client.getDelegate().ensureTableDeleted(getTableSchema().getTableName());
153155
}
154156

155-
@RequiredArgsConstructor
156-
private class SetStoreIfTxMappingDoesntExist implements Callable<DynamoDbStoreTransaction> {
157-
private final DynamoDbStoreTransaction tx;
158-
private final Pair<StaticBuffer, StaticBuffer> keyColumn;
159-
@Override
160-
public DynamoDbStoreTransaction call() throws Exception {
161-
tx.setStore(AbstractDynamoDbStore.this);
162-
log.trace(String.format("acquiring lock on %s at " + System.nanoTime(), keyColumn.toString()));
163-
// do not extend the expiry of an existing lock by the tx passed in this method
164-
return tx;
165-
}
166-
}
167-
168157
@Override
169158
public void acquireLock(final StaticBuffer key, final StaticBuffer column, final StaticBuffer expectedValue, final StoreTransaction txh) throws BackendException {
170159
final DynamoDbStoreTransaction tx = DynamoDbStoreTransaction.getTx(txh);
171160
final Pair<StaticBuffer, StaticBuffer> keyColumn = Pair.of(key, column);
172161

173162
final DynamoDbStoreTransaction existing;
174163
try {
175-
existing = keyColumnLocalLocks.get(keyColumn,
176-
new SetStoreIfTxMappingDoesntExist(tx, keyColumn));
164+
existing = keyColumnLocalLocks.get(keyColumn, () -> tx);
177165
} catch (ExecutionException | UncheckedExecutionException | ExecutionError e) {
178166
throw new TemporaryLockingException("Unable to acquire lock", e);
179167
}
@@ -182,17 +170,15 @@ public void acquireLock(final StaticBuffer key, final StaticBuffer column, final
182170
}
183171

184172
// Titan's locking expects that only the first expectedValue for a given key/column should be used
185-
if (!tx.contains(key, column)) {
186-
tx.put(key, column, expectedValue);
187-
}
173+
tx.putKeyColumnOnlyIfItIsNotYetChangedInTx(this, key, column, expectedValue);
188174
}
189175

190176
@Override
191177
public void close() throws BackendException {
192178
log.debug("Closing table:{}", tableName);
193179
}
194180

195-
protected String encodeKeyForLog(final StaticBuffer key) {
181+
String encodeKeyForLog(final StaticBuffer key) {
196182
if (null == key) {
197183
return "";
198184
}

src/main/java/com/amazon/janusgraph/diskstorage/dynamodb/DynamoDbSingleRowStore.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,8 @@ public Collection<MutateWorker> createMutationWorkers(final Map<StaticBuffer, KC
204204

205205
// Using ExpectedAttributeValue map to handle large mutations in a single request
206206
// Large mutations would require multiple requests using expressions
207-
final Map<String, ExpectedAttributeValue> expected = new SingleExpectedAttributeValueBuilder().key(hashKey)
208-
.transaction(txh)
209-
.build(mutation);
207+
final Map<String, ExpectedAttributeValue> expected =
208+
new SingleExpectedAttributeValueBuilder(this, txh, hashKey).build(mutation);
210209

211210
final Map<String, AttributeValueUpdate> attributeValueUpdates =
212211
new SingleUpdateBuilder().deletions(mutation.getDeletions())

src/main/java/com/amazon/janusgraph/diskstorage/dynamodb/DynamoDbStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ private Collection<MutateWorker> createWorkersForAdditions(final StaticBuffer ha
268268
.rangeKey(rangeKey)
269269
.build();
270270

271-
final Expression updateExpression = new MultiUpdateExpressionBuilder(txh).hashKey(hashKey)
271+
final Expression updateExpression = new MultiUpdateExpressionBuilder(this, txh).hashKey(hashKey)
272272
.rangeKey(rangeKey)
273273
.value(addition.getValue())
274274
.build();
@@ -290,7 +290,7 @@ private Collection<MutateWorker> createWorkersForDeletions(final StaticBuffer ha
290290
.rangeKey(rangeKey)
291291
.build();
292292

293-
final Expression updateExpression = new MultiUpdateExpressionBuilder(txh).hashKey(hashKey)
293+
final Expression updateExpression = new MultiUpdateExpressionBuilder(this, txh).hashKey(hashKey)
294294
.rangeKey(rangeKey)
295295
.build();
296296

src/main/java/com/amazon/janusgraph/diskstorage/dynamodb/DynamoDbStoreTransaction.java

Lines changed: 20 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
*/
1515
package com.amazon.janusgraph.diskstorage.dynamodb;
1616

17+
import java.util.HashMap;
1718
import java.util.Map;
1819

1920
import org.apache.commons.lang.builder.EqualsBuilder;
@@ -39,7 +40,7 @@
3940
*
4041
*/
4142
@Slf4j
42-
public class DynamoDbStoreTransaction extends AbstractStoreTransaction { //CHECKSTYLE SUPPRESS - for the DB in DynamoDB
43+
public class DynamoDbStoreTransaction extends AbstractStoreTransaction {
4344

4445
public static DynamoDbStoreTransaction getTx(@NonNull final StoreTransaction txh) {
4546
Preconditions
@@ -51,8 +52,7 @@ public static DynamoDbStoreTransaction getTx(@NonNull final StoreTransaction txh
5152
* This is only used for toString for debugging purposes.
5253
*/
5354
private final String id;
54-
private final Map<StaticBuffer, Map<StaticBuffer, StaticBuffer>> expectedValues = Maps.newHashMap();
55-
private AbstractDynamoDbStore store;
55+
private final Map<AbstractDynamoDbStore, Map<StaticBuffer, Map<StaticBuffer, StaticBuffer>>> expectedValues = Maps.newHashMap();
5656

5757
/**
5858
* Creates a DynamoDB Store transaction.
@@ -73,12 +73,9 @@ public void commit() throws BackendException {
7373
}
7474

7575
private void releaseLocks() {
76-
for (final Map.Entry<StaticBuffer, Map<StaticBuffer, StaticBuffer>> entry : expectedValues.entrySet()) {
77-
final StaticBuffer key = entry.getKey();
78-
for (final StaticBuffer column : entry.getValue().keySet()) {
79-
store.releaseLock(key, column);
80-
}
81-
}
76+
expectedValues.forEach((store, kcMap) -> kcMap.forEach((key, columnValueMap) -> {
77+
columnValueMap.forEach((column, valueIgnored) -> store.releaseLock(key, column));
78+
}));
8279
}
8380

8481
/**
@@ -87,8 +84,10 @@ private void releaseLocks() {
8784
* @param column column to check for existence
8885
* @return true if both the key and column combination are in this transaction and false otherwise.
8986
*/
90-
public boolean contains(final StaticBuffer key, final StaticBuffer column) {
91-
return expectedValues.containsKey(key) && expectedValues.get(key).containsKey(column);
87+
public boolean contains(final AbstractDynamoDbStore store, final StaticBuffer key, final StaticBuffer column) {
88+
return expectedValues.containsKey(store)
89+
&& expectedValues.get(store).containsKey(key)
90+
&& expectedValues.get(store).get(key).containsKey(column);
9291
}
9392

9493
@Override
@@ -112,35 +111,29 @@ public int hashCode() {
112111

113112
/**
114113
* Gets the expected value for a particular key and column, if any
114+
* @param store the store to put the expected key column value
115115
* @param key the key to get the expected value for
116116
* @param column the column to get the expected value for
117117
* @return the expected value of the given key-column pair, if any.
118118
*/
119-
public StaticBuffer get(final StaticBuffer key, final StaticBuffer column) {
119+
public StaticBuffer get(final AbstractDynamoDbStore store, final StaticBuffer key, final StaticBuffer column) {
120120
// This method assumes the caller has called contains(..) and received a positive response
121-
return expectedValues.get(key)
122-
.get(column);
121+
return expectedValues.get(store).get(key).get(column);
123122
}
124123

125124
/**
126125
* Puts the expected value for a particular key and column
126+
* @param store the store to put the expected key column value
127127
* @param key the key to put the expected value for
128128
* @param column the column to put the expected value for
129129
* @param expectedValue the expected value to put
130130
*/
131-
public void put(final StaticBuffer key, final StaticBuffer column, final StaticBuffer expectedValue) {
132-
final Map<StaticBuffer, StaticBuffer> valueMap;
133-
if (expectedValues.containsKey(key)) {
134-
valueMap = expectedValues.get(key);
135-
} else {
136-
valueMap = Maps.newHashMap();
137-
expectedValues.put(key, valueMap);
138-
}
139-
140-
// Ignore any calls to put if we already have an expected value
141-
if (!valueMap.containsKey(column)) {
142-
valueMap.put(column, expectedValue);
143-
}
131+
public void putKeyColumnOnlyIfItIsNotYetChangedInTx(final AbstractDynamoDbStore store, final StaticBuffer key, final StaticBuffer column,
132+
final StaticBuffer expectedValue) {
133+
expectedValues.computeIfAbsent(store, s -> new HashMap<>());
134+
expectedValues.get(store).computeIfAbsent(key, k -> new HashMap<>());
135+
// Ignore any calls to putKeyColumnOnlyIfItIsNotYetChangedInTx if we already have an expected value
136+
expectedValues.get(store).get(key).putIfAbsent(column, expectedValue);
144137
}
145138

146139
@Override
@@ -155,8 +148,4 @@ public void rollback() throws BackendException {
155148
public String toString() {
156149
return new ToStringBuilder(this).append(id).append(expectedValues).toString();
157150
}
158-
159-
public void setStore(final AbstractDynamoDbStore abstractDynamoDbStore) {
160-
this.store = abstractDynamoDbStore;
161-
}
162151
}

src/main/java/com/amazon/janusgraph/diskstorage/dynamodb/builder/MultiUpdateExpressionBuilder.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@
1414
*/
1515
package com.amazon.janusgraph.diskstorage.dynamodb.builder;
1616

17-
import java.util.Collections;
1817
import java.util.Map;
1918

2019
import org.janusgraph.diskstorage.StaticBuffer;
2120

2221
import com.amazon.janusgraph.diskstorage.dynamodb.Constants;
22+
import com.amazon.janusgraph.diskstorage.dynamodb.DynamoDbStore;
2323
import com.amazon.janusgraph.diskstorage.dynamodb.DynamoDbStoreTransaction;
2424
import com.amazon.janusgraph.diskstorage.dynamodb.Expression;
2525
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
@@ -47,8 +47,8 @@ public class MultiUpdateExpressionBuilder extends AbstractBuilder {
4747
private static final String EXPECTED_VALUE_EXPR = String.format("%s = %s", Constants.JANUSGRAPH_VALUE, EXPECTED_VALUE_LABEL);
4848
private static final String SET_VALUE_EXPR = String.format("SET %s = %s", Constants.JANUSGRAPH_VALUE, VALUE_LABEL);
4949

50-
private static final Map<String, String> EMPTY_ARGUMENT_NAMES = Collections.emptyMap();
51-
50+
@NonNull
51+
private final DynamoDbStore store;
5252
@NonNull
5353
private final DynamoDbStoreTransaction transaction;
5454
@Setter
@@ -75,8 +75,8 @@ public Expression build() {
7575

7676
// Condition expression and attribute value
7777
String conditionExpression = null;
78-
if (transaction.contains(hashKey, rangeKey)) {
79-
final StaticBuffer expectedValue = transaction.get(hashKey, rangeKey);
78+
if (transaction.contains(store, hashKey, rangeKey)) {
79+
final StaticBuffer expectedValue = transaction.get(store, hashKey, rangeKey);
8080
if (expectedValue == null) {
8181
conditionExpression = MISSING_VALUE_EXPR;
8282
} else {

src/main/java/com/amazon/janusgraph/diskstorage/dynamodb/builder/SingleExpectedAttributeValueBuilder.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,15 @@
2020
import org.janusgraph.diskstorage.StaticBuffer;
2121
import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation;
2222

23+
import com.amazon.janusgraph.diskstorage.dynamodb.DynamoDbSingleRowStore;
2324
import com.amazon.janusgraph.diskstorage.dynamodb.DynamoDbStoreTransaction;
2425
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
2526
import com.amazonaws.services.dynamodbv2.model.ComparisonOperator;
2627
import com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue;
2728
import com.google.common.base.Preconditions;
2829
import com.google.common.collect.Maps;
2930

30-
import lombok.Setter;
31-
import lombok.experimental.Accessors;
31+
import lombok.RequiredArgsConstructor;
3232

3333

3434
/**
@@ -38,12 +38,12 @@
3838
* @author Michael Rodaitis
3939
* @author Alexander Patrikalakis
4040
*/
41-
@Setter
42-
@Accessors(fluent = true, chain = true)
41+
@RequiredArgsConstructor
4342
public class SingleExpectedAttributeValueBuilder extends AbstractBuilder {
4443

45-
private DynamoDbStoreTransaction transaction = null;
46-
private StaticBuffer key = null;
44+
private final DynamoDbSingleRowStore store;
45+
private final DynamoDbStoreTransaction transaction;
46+
private final StaticBuffer key;
4747

4848
public Map<String, ExpectedAttributeValue> build(final KCVMutation mutation) {
4949
Preconditions.checkState(transaction != null, "Transaction must not be null");
@@ -70,8 +70,8 @@ private void addExpectedValueIfPresent(final StaticBuffer column, final Map<Stri
7070
return;
7171
}
7272

73-
if (transaction.contains(key, column)) {
74-
final StaticBuffer expectedValue = transaction.get(key, column);
73+
if (transaction.contains(store, key, column)) {
74+
final StaticBuffer expectedValue = transaction.get(store, key, column);
7575
final ExpectedAttributeValue expectedAttributeValue;
7676
if (expectedValue == null) {
7777
expectedAttributeValue = new ExpectedAttributeValue().withExists(false);

0 commit comments

Comments
 (0)