31
31
32
32
class ClientSessionImpl extends BaseClientSessionImpl implements ClientSession {
33
33
34
+ private enum TransactionState {
35
+ NONE , IN , DONE , ABORTED
36
+ }
37
+
34
38
private final OperationExecutor executor ;
35
- private boolean inTransaction ;
39
+ private TransactionState transactionState = TransactionState . NONE ;
36
40
private boolean messageSent ;
41
+ private boolean commitInProgress ;
42
+
37
43
private TransactionOptions transactionOptions ;
38
44
39
45
ClientSessionImpl (final ServerSessionPool serverSessionPool , final MongoClient mongoClient , final ClientSessionOptions options ,
@@ -44,7 +50,7 @@ class ClientSessionImpl extends BaseClientSessionImpl implements ClientSession {
44
50
45
51
@ Override
46
52
public boolean hasActiveTransaction () {
47
- return inTransaction ;
53
+ return transactionState == TransactionState . IN || ( transactionState == TransactionState . DONE && commitInProgress ) ;
48
54
}
49
55
50
56
@ Override
@@ -56,7 +62,7 @@ public boolean notifyMessageSent() {
56
62
57
63
@ Override
58
64
public TransactionOptions getTransactionOptions () {
59
- isTrue ("in transaction" , inTransaction );
65
+ isTrue ("in transaction" , transactionState == TransactionState . IN || transactionState == TransactionState . DONE );
60
66
return transactionOptions ;
61
67
}
62
68
@@ -68,32 +74,42 @@ public void startTransaction() {
68
74
@ Override
69
75
public void startTransaction (final TransactionOptions transactionOptions ) {
70
76
notNull ("transactionOptions" , transactionOptions );
71
- if (inTransaction ) {
77
+ if (transactionState == TransactionState . IN ) {
72
78
throw new IllegalStateException ("Transaction already in progress" );
73
79
}
74
- inTransaction = true ;
80
+ if (transactionState == TransactionState .DONE ) {
81
+ cleanupTransaction (TransactionState .IN );
82
+ } else {
83
+ transactionState = TransactionState .IN ;
84
+ }
85
+ getServerSession ().advanceTransactionNumber ();
75
86
this .transactionOptions = TransactionOptions .merge (transactionOptions , getOptions ().getDefaultTransactionOptions ());
76
87
}
77
88
78
89
@ Override
79
90
public void commitTransaction (final SingleResultCallback <Void > callback ) {
80
- if (!canCommitOrAbort ()) {
91
+ if (transactionState == TransactionState .ABORTED ) {
92
+ throw new IllegalStateException ("Cannot call commitTransaction after calling abortTransaction" );
93
+ }
94
+ if (transactionState == TransactionState .NONE ) {
81
95
throw new IllegalStateException ("There is no transaction started" );
82
96
}
83
97
if (!messageSent ) {
84
- cleanupTransaction ();
98
+ cleanupTransaction (TransactionState . DONE );
85
99
callback .onResult (null , null );
86
100
} else {
87
101
ReadConcern readConcern = transactionOptions .getReadConcern ();
88
102
if (readConcern == null ) {
89
103
throw new MongoInternalException ("Invariant violated. Transaction options read concern can not be null" );
90
104
}
105
+ commitInProgress = true ;
91
106
executor .execute (new CommitTransactionOperation (transactionOptions .getWriteConcern ()),
92
107
readConcern , this ,
93
108
new SingleResultCallback <Void >() {
94
109
@ Override
95
110
public void onResult (final Void result , final Throwable t ) {
96
- cleanupTransaction ();
111
+ commitInProgress = false ;
112
+ transactionState = TransactionState .DONE ;
97
113
callback .onResult (result , t );
98
114
}
99
115
});
@@ -102,11 +118,17 @@ public void onResult(final Void result, final Throwable t) {
102
118
103
119
@ Override
104
120
public void abortTransaction (final SingleResultCallback <Void > callback ) {
105
- if (!canCommitOrAbort ()) {
121
+ if (transactionState == TransactionState .ABORTED ) {
122
+ throw new IllegalStateException ("Cannot call abortTransaction twice" );
123
+ }
124
+ if (transactionState == TransactionState .DONE ) {
125
+ throw new IllegalStateException ("Cannot call abortTransaction after calling commitTransaction" );
126
+ }
127
+ if (transactionState == TransactionState .NONE ) {
106
128
throw new IllegalStateException ("There is no transaction started" );
107
129
}
108
130
if (!messageSent ) {
109
- cleanupTransaction ();
131
+ cleanupTransaction (TransactionState . ABORTED );
110
132
callback .onResult (null , null );
111
133
} else {
112
134
ReadConcern readConcern = transactionOptions .getReadConcern ();
@@ -118,21 +140,17 @@ public void abortTransaction(final SingleResultCallback<Void> callback) {
118
140
new SingleResultCallback <Void >() {
119
141
@ Override
120
142
public void onResult (final Void result , final Throwable t ) {
121
- cleanupTransaction ();
143
+ cleanupTransaction (TransactionState . ABORTED );
122
144
callback .onResult (null , null );
123
145
}
124
146
});
125
147
}
126
148
}
127
149
128
- private boolean canCommitOrAbort () {
129
- return inTransaction ;
130
- }
131
-
132
150
// TODO: should there be a version of this that takes a callback?
133
151
@ Override
134
152
public void close () {
135
- if (inTransaction ) {
153
+ if (transactionState == TransactionState . IN ) {
136
154
abortTransaction (new SingleResultCallback <Void >() {
137
155
@ Override
138
156
public void onResult (final Void result , final Throwable t ) {
@@ -144,10 +162,9 @@ public void onResult(final Void result, final Throwable t) {
144
162
}
145
163
}
146
164
147
- private void cleanupTransaction () {
148
- inTransaction = false ;
165
+ private void cleanupTransaction (final TransactionState nextState ) {
149
166
messageSent = false ;
150
167
transactionOptions = null ;
151
- getServerSession (). advanceTransactionNumber () ;
168
+ transactionState = nextState ;
152
169
}
153
170
}
0 commit comments