Skip to content

Commit ca1cfb6

Browse files
authored
Add configuration option to enable/disable active transaction management (#3233)
1 parent 1935d4e commit ca1cfb6

15 files changed

+400
-227
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package com.scalar.db.api;
2+
3+
import com.scalar.db.common.ActiveTransactionManagedDistributedTransactionManager;
4+
import com.scalar.db.common.ActiveTransactionManagedTwoPhaseCommitTransactionManager;
5+
import com.scalar.db.common.StateManagedDistributedTransactionManager;
6+
import com.scalar.db.common.StateManagedTwoPhaseCommitTransactionManager;
7+
import com.scalar.db.config.DatabaseConfig;
8+
import javax.annotation.Nullable;
9+
10+
public abstract class AbstractDistributedTransactionProvider
11+
implements DistributedTransactionProvider {
12+
13+
@Override
14+
public DistributedTransactionManager createDistributedTransactionManager(DatabaseConfig config) {
15+
DistributedTransactionManager transactionManager =
16+
createRawDistributedTransactionManager(config);
17+
18+
// Wrap the transaction manager for state management
19+
transactionManager = new StateManagedDistributedTransactionManager(transactionManager);
20+
21+
if (config.isActiveTransactionManagementEnabled()) {
22+
// Wrap the transaction manager for active transaction management
23+
transactionManager =
24+
new ActiveTransactionManagedDistributedTransactionManager(
25+
transactionManager, config.getActiveTransactionManagementExpirationTimeMillis());
26+
}
27+
28+
return transactionManager;
29+
}
30+
31+
protected abstract DistributedTransactionManager createRawDistributedTransactionManager(
32+
DatabaseConfig config);
33+
34+
@Nullable
35+
@Override
36+
public TwoPhaseCommitTransactionManager createTwoPhaseCommitTransactionManager(
37+
DatabaseConfig config) {
38+
TwoPhaseCommitTransactionManager transactionManager =
39+
createRawTwoPhaseCommitTransactionManager(config);
40+
41+
if (transactionManager == null) {
42+
return null;
43+
}
44+
45+
// Wrap the transaction manager for state management
46+
transactionManager = new StateManagedTwoPhaseCommitTransactionManager(transactionManager);
47+
48+
if (config.isActiveTransactionManagementEnabled()) {
49+
// Wrap the transaction manager for active transaction management
50+
transactionManager =
51+
new ActiveTransactionManagedTwoPhaseCommitTransactionManager(
52+
transactionManager, config.getActiveTransactionManagementExpirationTimeMillis());
53+
}
54+
55+
return transactionManager;
56+
}
57+
58+
protected abstract TwoPhaseCommitTransactionManager createRawTwoPhaseCommitTransactionManager(
59+
DatabaseConfig config);
60+
}

core/src/main/java/com/scalar/db/common/ActiveTransactionManagedDistributedTransactionManager.java

Lines changed: 27 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,9 @@
2020
import com.scalar.db.exception.transaction.TransactionException;
2121
import com.scalar.db.exception.transaction.TransactionNotFoundException;
2222
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
23-
import com.scalar.db.util.ActiveExpiringMap;
2423
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
25-
import java.util.Iterator;
2624
import java.util.List;
2725
import java.util.Optional;
28-
import java.util.concurrent.atomic.AtomicReference;
29-
import java.util.function.BiConsumer;
30-
import javax.annotation.Nonnull;
3126
import javax.annotation.concurrent.ThreadSafe;
3227
import org.slf4j.Logger;
3328
import org.slf4j.LoggerFactory;
@@ -36,53 +31,24 @@
3631
public class ActiveTransactionManagedDistributedTransactionManager
3732
extends DecoratedDistributedTransactionManager {
3833

39-
private static final long TRANSACTION_EXPIRATION_INTERVAL_MILLIS = 1000;
40-
4134
private static final Logger logger =
4235
LoggerFactory.getLogger(ActiveTransactionManagedDistributedTransactionManager.class);
4336

44-
private final ActiveExpiringMap<String, ActiveTransaction> activeTransactions;
45-
46-
private final AtomicReference<BiConsumer<String, DistributedTransaction>>
47-
transactionExpirationHandler =
48-
new AtomicReference<>(
49-
(id, t) -> {
50-
try {
51-
t.rollback();
52-
} catch (Exception e) {
53-
logger.warn("Rollback failed. Transaction ID: {}", id, e);
54-
}
55-
});
37+
private final ActiveTransactionRegistry<DistributedTransaction> registry;
5638

5739
public ActiveTransactionManagedDistributedTransactionManager(
58-
DistributedTransactionManager transactionManager,
59-
long activeTransactionManagementExpirationTimeMillis) {
40+
DistributedTransactionManager transactionManager, long expirationTimeMillis) {
6041
super(transactionManager);
61-
activeTransactions =
62-
new ActiveExpiringMap<>(
63-
activeTransactionManagementExpirationTimeMillis,
64-
TRANSACTION_EXPIRATION_INTERVAL_MILLIS,
65-
(id, t) -> {
66-
logger.warn("The transaction is expired. Transaction ID: {}", id);
67-
transactionExpirationHandler.get().accept(id, t);
68-
});
69-
}
70-
71-
@Override
72-
public void setTransactionExpirationHandler(BiConsumer<String, DistributedTransaction> handler) {
73-
transactionExpirationHandler.set(handler);
42+
registry =
43+
new ActiveTransactionRegistry<>(expirationTimeMillis, DistributedTransaction::rollback);
7444
}
7545

76-
private void add(ActiveTransaction transaction) throws TransactionException {
77-
if (activeTransactions.putIfAbsent(transaction.getId(), transaction).isPresent()) {
78-
transaction.rollback();
79-
throw new TransactionException(
80-
CoreError.TRANSACTION_ALREADY_EXISTS.buildMessage(), transaction.getId());
81-
}
82-
}
83-
84-
private void remove(String transactionId) {
85-
activeTransactions.remove(transactionId);
46+
public ActiveTransactionManagedDistributedTransactionManager(
47+
DistributedTransactionManager transactionManager,
48+
long expirationTimeMillis,
49+
ActiveTransactionRegistry.TransactionRollback<DistributedTransaction> rollbackFunction) {
50+
super(transactionManager);
51+
registry = new ActiveTransactionRegistry<>(expirationTimeMillis, rollbackFunction);
8652
}
8753

8854
@Override
@@ -98,7 +64,7 @@ public DistributedTransaction join(String txId) throws TransactionNotFoundExcept
9864

9965
@Override
10066
public DistributedTransaction resume(String txId) throws TransactionNotFoundException {
101-
return activeTransactions
67+
return registry
10268
.get(txId)
10369
.orElseThrow(
10470
() ->
@@ -117,7 +83,18 @@ class ActiveTransaction extends DecoratedDistributedTransaction {
11783
@SuppressFBWarnings("EI_EXPOSE_REP2")
11884
private ActiveTransaction(DistributedTransaction transaction) throws TransactionException {
11985
super(transaction);
120-
add(this);
86+
if (!registry.add(getId(), this)) {
87+
try {
88+
transaction.rollback();
89+
} catch (RollbackException e) {
90+
logger.warn(
91+
"Rollback failed during duplicate transaction handling. Transaction ID: {}",
92+
getId(),
93+
e);
94+
}
95+
throw new TransactionException(
96+
CoreError.TRANSACTION_ALREADY_EXISTS.buildMessage(), getId());
97+
}
12198
}
12299

123100
@Override
@@ -132,37 +109,7 @@ public synchronized List<Result> scan(Scan scan) throws CrudException {
132109

133110
@Override
134111
public synchronized Scanner getScanner(Scan scan) throws CrudException {
135-
Scanner scanner = super.getScanner(scan);
136-
return new Scanner() {
137-
@Override
138-
public Optional<Result> one() throws CrudException {
139-
synchronized (ActiveTransaction.this) {
140-
return scanner.one();
141-
}
142-
}
143-
144-
@Override
145-
public List<Result> all() throws CrudException {
146-
synchronized (ActiveTransaction.this) {
147-
return scanner.all();
148-
}
149-
}
150-
151-
@Override
152-
public void close() throws CrudException {
153-
synchronized (ActiveTransaction.this) {
154-
scanner.close();
155-
}
156-
}
157-
158-
@Nonnull
159-
@Override
160-
public Iterator<Result> iterator() {
161-
synchronized (ActiveTransaction.this) {
162-
return scanner.iterator();
163-
}
164-
}
165-
};
112+
return new SynchronizedScanner(this, super.getScanner(scan));
166113
}
167114

168115
/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
@@ -220,15 +167,15 @@ public synchronized List<BatchResult> batch(List<? extends Operation> operations
220167
@Override
221168
public synchronized void commit() throws CommitException, UnknownTransactionStatusException {
222169
super.commit();
223-
remove(getId());
170+
registry.remove(getId());
224171
}
225172

226173
@Override
227174
public synchronized void rollback() throws RollbackException {
228175
try {
229176
super.rollback();
230177
} finally {
231-
remove(getId());
178+
registry.remove(getId());
232179
}
233180
}
234181

@@ -237,7 +184,7 @@ public synchronized void abort() throws AbortException {
237184
try {
238185
super.abort();
239186
} finally {
240-
remove(getId());
187+
registry.remove(getId());
241188
}
242189
}
243190
}

0 commit comments

Comments
 (0)