Skip to content

Commit beac701

Browse files
authored
Refactor transaction decoration mechanism (#2585)
1 parent 0452a1a commit beac701

27 files changed

+1061
-502
lines changed

core/src/main/java/com/scalar/db/common/AbstractDistributedTransactionManager.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.scalar.db.common;
22

33
import com.scalar.db.api.Delete;
4+
import com.scalar.db.api.DistributedTransaction;
45
import com.scalar.db.api.DistributedTransactionManager;
56
import com.scalar.db.api.Get;
67
import com.scalar.db.api.Insert;
@@ -10,6 +11,7 @@
1011
import com.scalar.db.api.Update;
1112
import com.scalar.db.api.Upsert;
1213
import com.scalar.db.config.DatabaseConfig;
14+
import com.scalar.db.exception.transaction.TransactionNotFoundException;
1315
import com.scalar.db.util.ScalarDbUtils;
1416
import java.util.List;
1517
import java.util.Optional;
@@ -61,6 +63,16 @@ public Optional<String> getTable() {
6163
return tableName;
6264
}
6365

66+
@Override
67+
public DistributedTransaction join(String txId) throws TransactionNotFoundException {
68+
throw new UnsupportedOperationException("join is not supported in this implementation");
69+
}
70+
71+
@Override
72+
public DistributedTransaction resume(String txId) throws TransactionNotFoundException {
73+
throw new UnsupportedOperationException("resume is not supported in this implementation");
74+
}
75+
6476
protected <T extends Mutation> List<T> copyAndSetTargetToIfNot(List<T> mutations) {
6577
return ScalarDbUtils.copyAndSetTargetToIfNot(mutations, namespace, tableName);
6678
}

core/src/main/java/com/scalar/db/common/AbstractTwoPhaseCommitTransactionManager.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
import com.scalar.db.api.Mutation;
77
import com.scalar.db.api.Put;
88
import com.scalar.db.api.Scan;
9+
import com.scalar.db.api.TwoPhaseCommitTransaction;
910
import com.scalar.db.api.TwoPhaseCommitTransactionManager;
1011
import com.scalar.db.api.Update;
1112
import com.scalar.db.api.Upsert;
1213
import com.scalar.db.config.DatabaseConfig;
14+
import com.scalar.db.exception.transaction.TransactionNotFoundException;
1315
import com.scalar.db.util.ScalarDbUtils;
1416
import java.util.List;
1517
import java.util.Optional;
@@ -61,6 +63,11 @@ public Optional<String> getTable() {
6163
return tableName;
6264
}
6365

66+
@Override
67+
public TwoPhaseCommitTransaction resume(String txId) throws TransactionNotFoundException {
68+
throw new UnsupportedOperationException("resume is not supported in this implementation");
69+
}
70+
6471
protected <T extends Mutation> List<T> copyAndSetTargetToIfNot(List<T> mutations) {
6572
return ScalarDbUtils.copyAndSetTargetToIfNot(mutations, namespace, tableName);
6673
}

core/src/main/java/com/scalar/db/common/ActiveTransactionManagedDistributedTransactionManager.java

Lines changed: 85 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
11
package com.scalar.db.common;
22

3+
import com.google.common.annotations.VisibleForTesting;
34
import com.scalar.db.api.Delete;
45
import com.scalar.db.api.DistributedTransaction;
6+
import com.scalar.db.api.DistributedTransactionManager;
57
import com.scalar.db.api.Get;
68
import com.scalar.db.api.Insert;
9+
import com.scalar.db.api.Isolation;
710
import com.scalar.db.api.Mutation;
811
import com.scalar.db.api.Put;
912
import com.scalar.db.api.Result;
1013
import com.scalar.db.api.Scan;
14+
import com.scalar.db.api.SerializableStrategy;
1115
import com.scalar.db.api.Update;
1216
import com.scalar.db.api.Upsert;
1317
import com.scalar.db.common.error.CoreError;
14-
import com.scalar.db.config.DatabaseConfig;
1518
import com.scalar.db.exception.transaction.AbortException;
1619
import com.scalar.db.exception.transaction.CommitException;
1720
import com.scalar.db.exception.transaction.CrudException;
@@ -28,9 +31,8 @@
2831
import org.slf4j.Logger;
2932
import org.slf4j.LoggerFactory;
3033

31-
public abstract class ActiveTransactionManagedDistributedTransactionManager
32-
extends TransactionDecorationDistributedTransactionManager
33-
implements DistributedTransactionExpirationHandlerSettable {
34+
public class ActiveTransactionManagedDistributedTransactionManager
35+
extends DecoratedDistributedTransactionManager {
3436

3537
private static final long TRANSACTION_EXPIRATION_INTERVAL_MILLIS = 1000;
3638

@@ -50,11 +52,13 @@ public abstract class ActiveTransactionManagedDistributedTransactionManager
5052
}
5153
});
5254

53-
public ActiveTransactionManagedDistributedTransactionManager(DatabaseConfig config) {
54-
super(config);
55+
public ActiveTransactionManagedDistributedTransactionManager(
56+
DistributedTransactionManager transactionManager,
57+
long activeTransactionManagementExpirationTimeMillis) {
58+
super(transactionManager);
5559
activeTransactions =
5660
new ActiveExpiringMap<>(
57-
config.getActiveTransactionManagementExpirationTimeMillis(),
61+
activeTransactionManagementExpirationTimeMillis,
5862
TRANSACTION_EXPIRATION_INTERVAL_MILLIS,
5963
(id, t) -> {
6064
logger.warn("The transaction is expired. Transaction ID: {}", id);
@@ -79,6 +83,77 @@ private void remove(String transactionId) {
7983
activeTransactions.remove(transactionId);
8084
}
8185

86+
@Override
87+
public DistributedTransaction begin() throws TransactionException {
88+
return new ActiveTransaction(super.begin());
89+
}
90+
91+
@Override
92+
public DistributedTransaction begin(String txId) throws TransactionException {
93+
return new ActiveTransaction(super.begin(txId));
94+
}
95+
96+
@Override
97+
public DistributedTransaction start() throws TransactionException {
98+
return new ActiveTransaction(super.start());
99+
}
100+
101+
@Override
102+
public DistributedTransaction start(String txId) throws TransactionException {
103+
return new ActiveTransaction(super.start(txId));
104+
}
105+
106+
/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
107+
@Deprecated
108+
@Override
109+
public DistributedTransaction start(Isolation isolation) throws TransactionException {
110+
return new ActiveTransaction(super.start(isolation));
111+
}
112+
113+
/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
114+
@Deprecated
115+
@Override
116+
public DistributedTransaction start(String txId, Isolation isolation)
117+
throws TransactionException {
118+
return new ActiveTransaction(super.start(txId, isolation));
119+
}
120+
121+
/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
122+
@Deprecated
123+
@Override
124+
public DistributedTransaction start(Isolation isolation, SerializableStrategy strategy)
125+
throws TransactionException {
126+
return new ActiveTransaction(super.start(isolation, strategy));
127+
}
128+
129+
/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
130+
@Deprecated
131+
@Override
132+
public DistributedTransaction start(SerializableStrategy strategy) throws TransactionException {
133+
return new ActiveTransaction(super.start(strategy));
134+
}
135+
136+
/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
137+
@Deprecated
138+
@Override
139+
public DistributedTransaction start(String txId, SerializableStrategy strategy)
140+
throws TransactionException {
141+
return new ActiveTransaction(super.start(txId, strategy));
142+
}
143+
144+
/** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */
145+
@Deprecated
146+
@Override
147+
public DistributedTransaction start(
148+
String txId, Isolation isolation, SerializableStrategy strategy) throws TransactionException {
149+
return new ActiveTransaction(super.start(txId, isolation, strategy));
150+
}
151+
152+
@Override
153+
public DistributedTransaction join(String txId) throws TransactionNotFoundException {
154+
return resume(txId);
155+
}
156+
82157
@Override
83158
public DistributedTransaction resume(String txId) throws TransactionNotFoundException {
84159
return activeTransactions
@@ -89,13 +164,8 @@ public DistributedTransaction resume(String txId) throws TransactionNotFoundExce
89164
CoreError.TRANSACTION_NOT_FOUND.buildMessage(), txId));
90165
}
91166

92-
@Override
93-
protected DistributedTransaction decorate(DistributedTransaction transaction)
94-
throws TransactionException {
95-
return new ActiveTransaction(super.decorate(transaction));
96-
}
97-
98-
private class ActiveTransaction extends DecoratedDistributedTransaction {
167+
@VisibleForTesting
168+
class ActiveTransaction extends DecoratedDistributedTransaction {
99169

100170
@SuppressFBWarnings("EI_EXPOSE_REP2")
101171
private ActiveTransaction(DistributedTransaction transaction) throws TransactionException {
@@ -175,7 +245,7 @@ public synchronized void rollback() throws RollbackException {
175245
}
176246

177247
@Override
178-
public void abort() throws AbortException {
248+
public synchronized void abort() throws AbortException {
179249
try {
180250
super.abort();
181251
} finally {

core/src/main/java/com/scalar/db/common/ActiveTransactionManagedTwoPhaseCommitTransactionManager.java

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.scalar.db.common;
22

3+
import com.google.common.annotations.VisibleForTesting;
34
import com.scalar.db.api.Delete;
45
import com.scalar.db.api.Get;
56
import com.scalar.db.api.Insert;
@@ -8,10 +9,10 @@
89
import com.scalar.db.api.Result;
910
import com.scalar.db.api.Scan;
1011
import com.scalar.db.api.TwoPhaseCommitTransaction;
12+
import com.scalar.db.api.TwoPhaseCommitTransactionManager;
1113
import com.scalar.db.api.Update;
1214
import com.scalar.db.api.Upsert;
1315
import com.scalar.db.common.error.CoreError;
14-
import com.scalar.db.config.DatabaseConfig;
1516
import com.scalar.db.exception.transaction.AbortException;
1617
import com.scalar.db.exception.transaction.CommitException;
1718
import com.scalar.db.exception.transaction.CrudException;
@@ -30,14 +31,13 @@
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
3233

33-
public abstract class ActiveTransactionManagedTwoPhaseCommitTransactionManager
34-
extends TransactionDecorationTwoPhaseCommitTransactionManager
35-
implements TwoPhaseCommitTransactionExpirationHandlerSettable {
34+
public class ActiveTransactionManagedTwoPhaseCommitTransactionManager
35+
extends DecoratedTwoPhaseCommitTransactionManager {
3636

3737
private static final long TRANSACTION_EXPIRATION_INTERVAL_MILLIS = 1000;
3838

3939
private static final Logger logger =
40-
LoggerFactory.getLogger(AbstractTwoPhaseCommitTransactionManager.class);
40+
LoggerFactory.getLogger(ActiveTransactionManagedTwoPhaseCommitTransactionManager.class);
4141

4242
private final ActiveExpiringMap<String, ActiveTransaction> activeTransactions;
4343

@@ -52,11 +52,13 @@ public abstract class ActiveTransactionManagedTwoPhaseCommitTransactionManager
5252
}
5353
});
5454

55-
public ActiveTransactionManagedTwoPhaseCommitTransactionManager(DatabaseConfig config) {
56-
super(config);
55+
public ActiveTransactionManagedTwoPhaseCommitTransactionManager(
56+
TwoPhaseCommitTransactionManager transactionManager,
57+
long activeTransactionManagementExpirationTimeMillis) {
58+
super(transactionManager);
5759
activeTransactions =
5860
new ActiveExpiringMap<>(
59-
config.getActiveTransactionManagementExpirationTimeMillis(),
61+
activeTransactionManagementExpirationTimeMillis,
6062
TRANSACTION_EXPIRATION_INTERVAL_MILLIS,
6163
(id, t) -> {
6264
logger.warn("The transaction is expired. Transaction ID: {}", id);
@@ -82,8 +84,32 @@ private void remove(String transactionId) {
8284
activeTransactions.remove(transactionId);
8385
}
8486

85-
protected boolean isTransactionActive(String transactionId) {
86-
return activeTransactions.containsKey(transactionId);
87+
@Override
88+
public TwoPhaseCommitTransaction begin() throws TransactionException {
89+
return new ActiveTransaction(super.begin());
90+
}
91+
92+
@Override
93+
public TwoPhaseCommitTransaction begin(String txId) throws TransactionException {
94+
return new ActiveTransaction(super.begin(txId));
95+
}
96+
97+
@Override
98+
public TwoPhaseCommitTransaction start() throws TransactionException {
99+
return new ActiveTransaction(super.start());
100+
}
101+
102+
@Override
103+
public TwoPhaseCommitTransaction start(String txId) throws TransactionException {
104+
return new ActiveTransaction(super.start(txId));
105+
}
106+
107+
@Override
108+
public TwoPhaseCommitTransaction join(String txId) throws TransactionException {
109+
if (activeTransactions.containsKey(txId)) {
110+
return resume(txId);
111+
}
112+
return new ActiveTransaction(super.join(txId));
87113
}
88114

89115
@Override
@@ -96,13 +122,8 @@ public TwoPhaseCommitTransaction resume(String txId) throws TransactionNotFoundE
96122
CoreError.TRANSACTION_NOT_FOUND.buildMessage(), txId));
97123
}
98124

99-
@Override
100-
protected TwoPhaseCommitTransaction decorate(TwoPhaseCommitTransaction transaction)
101-
throws TransactionException {
102-
return new ActiveTransaction(super.decorate(transaction));
103-
}
104-
105-
private class ActiveTransaction extends DecoratedTwoPhaseCommitTransaction {
125+
@VisibleForTesting
126+
class ActiveTransaction extends DecoratedTwoPhaseCommitTransaction {
106127

107128
@SuppressFBWarnings("EI_EXPOSE_REP2")
108129
private ActiveTransaction(TwoPhaseCommitTransaction transaction) throws TransactionException {
@@ -192,7 +213,7 @@ public synchronized void rollback() throws RollbackException {
192213
}
193214

194215
@Override
195-
public void abort() throws AbortException {
216+
public synchronized void abort() throws AbortException {
196217
try {
197218
super.abort();
198219
} finally {

core/src/main/java/com/scalar/db/common/DecoratedDistributedTransactionManager.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@
2323
import java.util.function.BiConsumer;
2424

2525
public abstract class DecoratedDistributedTransactionManager
26-
implements DistributedTransactionManager,
27-
DistributedTransactionDecoratorAddable,
28-
DistributedTransactionExpirationHandlerSettable {
26+
implements DistributedTransactionManager, DistributedTransactionExpirationHandlerSettable {
2927

3028
private final DistributedTransactionManager transactionManager;
3129

@@ -229,14 +227,6 @@ public DistributedTransactionManager getOriginalTransactionManager() {
229227
return transactionManager;
230228
}
231229

232-
@Override
233-
public void addTransactionDecorator(DistributedTransactionDecorator transactionDecorator) {
234-
if (transactionManager instanceof DistributedTransactionDecoratorAddable) {
235-
((DistributedTransactionDecoratorAddable) transactionManager)
236-
.addTransactionDecorator(transactionDecorator);
237-
}
238-
}
239-
240230
@Override
241231
public void setTransactionExpirationHandler(BiConsumer<String, DistributedTransaction> handler) {
242232
if (transactionManager instanceof DistributedTransactionExpirationHandlerSettable) {

core/src/main/java/com/scalar/db/common/DecoratedTwoPhaseCommitTransactionManager.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
public abstract class DecoratedTwoPhaseCommitTransactionManager
2424
implements TwoPhaseCommitTransactionManager,
25-
TwoPhaseCommitTransactionDecoratorAddable,
2625
TwoPhaseCommitTransactionExpirationHandlerSettable {
2726

2827
private final TwoPhaseCommitTransactionManager transactionManager;
@@ -182,14 +181,6 @@ public TwoPhaseCommitTransactionManager getOriginalTransactionManager() {
182181
return transactionManager;
183182
}
184183

185-
@Override
186-
public void addTransactionDecorator(TwoPhaseCommitTransactionDecorator transactionDecorator) {
187-
if (transactionManager instanceof TwoPhaseCommitTransactionDecoratorAddable) {
188-
((TwoPhaseCommitTransactionDecoratorAddable) transactionManager)
189-
.addTransactionDecorator(transactionDecorator);
190-
}
191-
}
192-
193184
@Override
194185
public void setTransactionExpirationHandler(
195186
BiConsumer<String, TwoPhaseCommitTransaction> handler) {

core/src/main/java/com/scalar/db/common/DistributedTransactionDecorator.java

Lines changed: 0 additions & 8 deletions
This file was deleted.

core/src/main/java/com/scalar/db/common/DistributedTransactionDecoratorAddable.java

Lines changed: 0 additions & 6 deletions
This file was deleted.

0 commit comments

Comments
 (0)