88import java .util .Arrays ;
99import java .util .Collections ;
1010import java .util .List ;
11+ import java .util .concurrent .CompletableFuture ;
1112import java .util .concurrent .TimeUnit ;
13+ import java .util .concurrent .atomic .AtomicReference ;
1214
1315import tech .ydb .common .transaction .TxMode ;
1416import tech .ydb .core .Issue ;
@@ -50,8 +52,8 @@ public class QueryServiceExecutor extends BaseYdbExecutor {
5052 private boolean isAutoCommit ;
5153 private TxMode txMode ;
5254
53- private QueryTransaction tx ;
54- private boolean isClosed ;
55+ private final AtomicReference < QueryTransaction > tx = new AtomicReference <>() ;
56+ private volatile boolean isClosed ;
5557
5658 public QueryServiceExecutor (YdbContext ctx , int transactionLevel , boolean autoCommit ) throws SQLException {
5759 super (ctx );
@@ -63,7 +65,6 @@ public QueryServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCo
6365 this .isReadOnly = transactionLevel != Connection .TRANSACTION_SERIALIZABLE ;
6466 this .isAutoCommit = autoCommit ;
6567 this .txMode = txMode (transactionLevel , isReadOnly );
66- this .tx = null ;
6768 this .isClosed = false ;
6869 }
6970
@@ -81,14 +82,10 @@ protected QuerySession createNewQuerySession(YdbValidator validator) throws SQLE
8182 @ Override
8283 public void close () throws SQLException {
8384 closeCurrentResult ();
84- cleanTx ();
8585 isClosed = true ;
86- }
87-
88- private void cleanTx () {
89- if (tx != null ) {
90- tx .getSession ().close ();
91- tx = null ;
86+ QueryTransaction old = tx .getAndSet (null );
87+ if (old != null ) {
88+ old .getSession ().close ();
9289 }
9390 }
9491
@@ -100,7 +97,8 @@ public void setTransactionLevel(int level) throws SQLException {
10097 return ;
10198 }
10299
103- if (tx != null && tx .isActive ()) {
100+ QueryTransaction localTx = tx .get ();
101+ if (localTx != null && localTx .isActive ()) {
104102 throw new SQLFeatureNotSupportedException (YdbConst .CHANGE_ISOLATION_INSIDE_TX );
105103 }
106104
@@ -117,7 +115,8 @@ public void setReadOnly(boolean readOnly) throws SQLException {
117115 return ;
118116 }
119117
120- if (tx != null && tx .isActive ()) {
118+ QueryTransaction localTx = tx .get ();
119+ if (localTx != null && localTx .isActive ()) {
121120 throw new SQLFeatureNotSupportedException (YdbConst .READONLY_INSIDE_TRANSACTION );
122121 }
123122
@@ -133,7 +132,8 @@ public void setAutoCommit(boolean autoCommit) throws SQLException {
133132 return ;
134133 }
135134
136- if (tx != null && tx .isActive ()) {
135+ QueryTransaction localTx = tx .get ();
136+ if (localTx != null && localTx .isActive ()) {
137137 throw new SQLFeatureNotSupportedException (YdbConst .CHANGE_ISOLATION_INSIDE_TX );
138138 }
139139
@@ -149,13 +149,15 @@ public boolean isClosed() throws SQLException {
149149 @ Override
150150 public String txID () throws SQLException {
151151 closeCurrentResult ();
152- return tx != null ? tx .getId () : null ;
152+ QueryTransaction localTx = tx .get ();
153+ return localTx != null ? localTx .getId () : null ;
153154 }
154155
155156 @ Override
156157 public boolean isInsideTransaction () throws SQLException {
157158 ensureOpened ();
158- return tx != null && tx .isActive ();
159+ QueryTransaction localTx = tx .get ();
160+ return localTx != null && localTx .isActive ();
159161 }
160162
161163 @ Override
@@ -180,24 +182,28 @@ public int transactionLevel() throws SQLException {
180182 public void commit (YdbContext ctx , YdbValidator validator ) throws SQLException {
181183 ensureOpened ();
182184
183- if (tx == null || !tx .isActive ()) {
185+ QueryTransaction localTx = tx .get ();
186+ if (localTx == null || !localTx .isActive ()) {
184187 return ;
185188 }
186189
187190 CommitTransactionSettings settings = ctx .withRequestTimeout (CommitTransactionSettings .newBuilder ()).build ();
188191 try {
189192 validator .clearWarnings ();
190- validator .call ("Commit TxId: " + tx .getId (), () -> tx .commit (settings ));
193+ validator .call ("Commit TxId: " + localTx .getId (), () -> localTx .commit (settings ));
191194 } finally {
192- cleanTx ();
195+ if (tx .compareAndSet (localTx , null )) {
196+ localTx .getSession ().close ();
197+ }
193198 }
194199 }
195200
196201 @ Override
197202 public void rollback (YdbContext ctx , YdbValidator validator ) throws SQLException {
198203 ensureOpened ();
199204
200- if (tx == null || !tx .isActive ()) {
205+ QueryTransaction localTx = tx .get ();
206+ if (localTx == null || !localTx .isActive ()) {
201207 return ;
202208 }
203209
@@ -206,9 +212,11 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException
206212
207213 try {
208214 validator .clearWarnings ();
209- validator .execute ("Rollback TxId: " + tx .getId (), () -> tx .rollback (settings ));
215+ validator .execute ("Rollback TxId: " + localTx .getId (), () -> localTx .rollback (settings ));
210216 } finally {
211- cleanTx ();
217+ if (tx .compareAndSet (localTx , null )) {
218+ localTx .getSession ().close ();
219+ }
212220 }
213221 }
214222
@@ -226,28 +234,63 @@ public YdbQueryResult executeDataQuery(
226234 }
227235 final ExecuteQuerySettings settings = builder .build ();
228236
229- if (tx == null ) {
230- tx = createNewQuerySession (validator ).createNewTransaction (txMode );
237+ QueryTransaction nextTx = tx .get ();
238+ while (nextTx == null ) {
239+ nextTx = createNewQuerySession (validator ).createNewTransaction (txMode );
240+ if (!tx .compareAndSet (null , nextTx )) {
241+ nextTx .getSession ().close ();
242+ nextTx = tx .get ();
243+ }
231244 }
232245
246+ final QueryTransaction localTx = nextTx ;
247+
233248 if (useStreamResultSet ) {
234249 String msg = "STREAM_QUERY >>\n " + yql ;
235250 StreamQueryResult lazy = validator .call (msg , () -> {
236- QueryStream stream = tx .createQuery (yql , isAutoCommit , params , settings );
237- StreamQueryResult result = new StreamQueryResult (msg , statement , query , stream ::cancel );
238- return result .execute (stream , () -> {
239- if (!tx .isActive ()) {
240- cleanTx ();
251+ final CompletableFuture <Result <StreamQueryResult >> future = new CompletableFuture <>();
252+ final QueryStream stream = localTx .createQuery (yql , isAutoCommit , params , settings );
253+ final StreamQueryResult result = new StreamQueryResult (msg , statement , query , stream ::cancel );
254+
255+
256+ stream .execute (new QueryStream .PartsHandler () {
257+ @ Override
258+ public void onIssues (Issue [] issues ) {
259+ validator .addStatusIssues (Arrays .asList (issues ));
260+ }
261+
262+ @ Override
263+ public void onNextPart (QueryResultPart part ) {
264+ result .onStreamResultSet ((int ) part .getResultSetIndex (), part .getResultSetReader ());
265+ future .complete (Result .success (result ));
266+ }
267+ }).whenComplete ((res , th ) -> {
268+ if (!localTx .isActive ()) {
269+ if (tx .compareAndSet (localTx , null )) {
270+ localTx .getSession ().close ();
271+ }
272+ }
273+
274+ if (th != null ) {
275+ future .completeExceptionally (th );
276+ result .onStreamFinished (th );
277+ }
278+ if (res != null ) {
279+ validator .addStatusIssues (res .getStatus ());
280+ future .complete (res .isSuccess () ? Result .success (result ) : Result .fail (res .getStatus ()));
281+ result .onStreamFinished (res .getStatus ());
241282 }
242283 });
284+
285+ return future ;
243286 });
244287
245288 return updateCurrentResult (lazy );
246289 }
247290
248291 try {
249292 QueryReader result = validator .call (QueryType .DATA_QUERY + " >>\n " + yql ,
250- () -> QueryReader .readFrom (tx .createQuery (yql , isAutoCommit , params , settings ))
293+ () -> QueryReader .readFrom (localTx .createQuery (yql , isAutoCommit , params , settings ))
251294 );
252295 validator .addStatusIssues (result .getIssueList ());
253296
@@ -257,8 +300,10 @@ public YdbQueryResult executeDataQuery(
257300 }
258301 return updateCurrentResult (new StaticQueryResult (query , readers ));
259302 } finally {
260- if (!tx .isActive ()) {
261- cleanTx ();
303+ if (!localTx .isActive ()) {
304+ if (tx .compareAndSet (localTx , null )) {
305+ localTx .getSession ().close ();
306+ }
262307 }
263308 }
264309 }
0 commit comments