11package com .scalar .db .common ;
22
3+ import com .google .common .util .concurrent .Uninterruptibles ;
34import com .scalar .db .api .CrudOperable ;
45import com .scalar .db .api .DistributedTransaction ;
56import com .scalar .db .api .DistributedTransactionManager ;
7+ import com .scalar .db .exception .transaction .CommitConflictException ;
8+ import com .scalar .db .exception .transaction .CrudConflictException ;
69import com .scalar .db .exception .transaction .RollbackException ;
710import com .scalar .db .exception .transaction .TransactionException ;
11+ import com .scalar .db .exception .transaction .UnknownTransactionStatusException ;
812import com .scalar .db .transaction .singlecrudoperation .SingleCrudOperationTransactionUtils ;
913import com .scalar .db .util .ThrowableConsumer ;
1014import com .scalar .db .util .ThrowableFunction ;
15+ import java .util .concurrent .TimeUnit ;
1116import org .slf4j .Logger ;
1217import org .slf4j .LoggerFactory ;
1318
1419public final class TransactionExecutor {
1520
1621 private static final Logger logger = LoggerFactory .getLogger (TransactionExecutor .class );
1722
23+ private static final int DEFAULT_RETRY_INITIAL_INTERVAL_MILLIS = 100 ;
24+ private static final int DEFAULT_RETRY_MAX_INTERVAL_MILLIS = 1000 ;
25+ private static final int DEFAULT_RETRY_MULTIPLIER = 2 ;
26+ private static final int DEFAULT_RETRY_MAX_RETRIES = 5 ;
27+
1828 private TransactionExecutor () {}
1929
2030 public static <T > T execute (
@@ -32,6 +42,9 @@ public static <T> T execute(
3242 T result = throwableFunction .apply (transaction );
3343 transaction .commit ();
3444 return result ;
45+ } catch (UnknownTransactionStatusException e ) {
46+ // We don't need to rollback the transaction for UnknownTransactionStatusException
47+ throw e ;
3548 } catch (Exception e ) {
3649 if (transaction != null ) {
3750 rollback (transaction );
@@ -43,26 +56,14 @@ public static <T> T execute(
4356
4457 public static void execute (
4558 DistributedTransactionManager transactionManager ,
46- ThrowableConsumer <CrudOperable <?>, TransactionException > throwableFunction )
59+ ThrowableConsumer <CrudOperable <?>, TransactionException > throwableConsumer )
4760 throws TransactionException {
48- if (SingleCrudOperationTransactionUtils .isSingleCrudOperationTransactionManager (
49- transactionManager )) {
50- throwableFunction .accept (transactionManager );
51- return ;
52- }
53-
54- DistributedTransaction transaction = null ;
55- try {
56- transaction = transactionManager .begin ();
57- throwableFunction .accept (transaction );
58- transaction .commit ();
59- } catch (Exception e ) {
60- if (transaction != null ) {
61- rollback (transaction );
62- }
63-
64- throw e ;
65- }
61+ execute (
62+ transactionManager ,
63+ t -> {
64+ throwableConsumer .accept (t );
65+ return null ;
66+ });
6667 }
6768
6869 private static void rollback (DistributedTransaction transaction ) {
@@ -72,4 +73,91 @@ private static void rollback(DistributedTransaction transaction) {
7273 logger .warn ("Failed to rollback a transaction" , e );
7374 }
7475 }
76+
77+ public static <T > T executeWithRetries (
78+ DistributedTransactionManager transactionManager ,
79+ ThrowableFunction <CrudOperable <?>, T , TransactionException > throwableFunction )
80+ throws TransactionException {
81+ return executeWithRetries (
82+ transactionManager ,
83+ throwableFunction ,
84+ DEFAULT_RETRY_INITIAL_INTERVAL_MILLIS ,
85+ DEFAULT_RETRY_MAX_INTERVAL_MILLIS ,
86+ DEFAULT_RETRY_MULTIPLIER ,
87+ DEFAULT_RETRY_MAX_RETRIES );
88+ }
89+
90+ public static <T > T executeWithRetries (
91+ DistributedTransactionManager transactionManager ,
92+ ThrowableFunction <CrudOperable <?>, T , TransactionException > throwableFunction ,
93+ int retryInitialIntervalMillis ,
94+ int retryMaxIntervalMillis ,
95+ int retryMultiplier ,
96+ int retryMaxRetries )
97+ throws TransactionException {
98+ TransactionException lastException ;
99+ int interval = retryInitialIntervalMillis ;
100+ int attempt = 0 ;
101+ while (true ) {
102+ try {
103+ return execute (transactionManager , throwableFunction );
104+ } catch (CrudConflictException | CommitConflictException e ) {
105+ // Retry the transaction for the conflict exceptions
106+ lastException = e ;
107+ }
108+
109+ if (attempt ++ >= retryMaxRetries ) {
110+ break ;
111+ }
112+
113+ logger .warn (
114+ "The transaction failed. Retrying after {} milliseconds... The current attempt count: {}." ,
115+ interval ,
116+ attempt ,
117+ lastException );
118+
119+ Uninterruptibles .sleepUninterruptibly (interval , TimeUnit .MILLISECONDS );
120+
121+ interval *= retryMultiplier ;
122+ if (interval > retryMaxIntervalMillis ) {
123+ interval = retryMaxIntervalMillis ;
124+ }
125+ }
126+
127+ logger .error ("The transaction failed after {} retries." , retryMaxRetries , lastException );
128+ throw lastException ;
129+ }
130+
131+ public static void executeWithRetries (
132+ DistributedTransactionManager transactionManager ,
133+ ThrowableConsumer <CrudOperable <?>, TransactionException > throwableConsumer )
134+ throws TransactionException {
135+ executeWithRetries (
136+ transactionManager ,
137+ throwableConsumer ,
138+ DEFAULT_RETRY_INITIAL_INTERVAL_MILLIS ,
139+ DEFAULT_RETRY_MAX_INTERVAL_MILLIS ,
140+ DEFAULT_RETRY_MULTIPLIER ,
141+ DEFAULT_RETRY_MAX_RETRIES );
142+ }
143+
144+ public static void executeWithRetries (
145+ DistributedTransactionManager transactionManager ,
146+ ThrowableConsumer <CrudOperable <?>, TransactionException > throwableConsumer ,
147+ int retryInitialIntervalMillis ,
148+ int retryMaxIntervalMillis ,
149+ int retryMultiplier ,
150+ int retryMaxRetries )
151+ throws TransactionException {
152+ executeWithRetries (
153+ transactionManager ,
154+ t -> {
155+ throwableConsumer .accept (t );
156+ return null ;
157+ },
158+ retryInitialIntervalMillis ,
159+ retryMaxIntervalMillis ,
160+ retryMultiplier ,
161+ retryMaxRetries );
162+ }
75163}
0 commit comments