88import java .util .Arrays ;
99import java .util .Collections ;
1010import java .util .List ;
11+ import java .util .concurrent .CompletableFuture ;
1112import java .util .concurrent .TimeUnit ;
13+ import java .util .concurrent .atomic .AtomicReference ;
1214
1315import tech .ydb .common .transaction .TxMode ;
1416import tech .ydb .core .Issue ;
4345public class QueryServiceExecutor extends BaseYdbExecutor {
4446 private final Duration sessionTimeout ;
4547 private final QueryClient queryClient ;
48+ private final boolean useStreamResultSet ;
4649
4750 private int transactionLevel ;
4851 private boolean isReadOnly ;
4952 private boolean isAutoCommit ;
5053 private TxMode txMode ;
5154
52- private QueryTransaction tx ;
53- private boolean isClosed ;
55+ private final AtomicReference < QueryTransaction > tx = new AtomicReference <>() ;
56+ private volatile boolean isClosed ;
5457
5558 public QueryServiceExecutor (YdbContext ctx , int transactionLevel , boolean autoCommit ) throws SQLException {
5659 super (ctx );
5760 this .sessionTimeout = ctx .getOperationProperties ().getSessionTimeout ();
5861 this .queryClient = ctx .getQueryClient ();
62+ this .useStreamResultSet = ctx .getOperationProperties ().getUseStreamResultSets ();
63+
5964 this .transactionLevel = transactionLevel ;
6065 this .isReadOnly = transactionLevel != Connection .TRANSACTION_SERIALIZABLE ;
6166 this .isAutoCommit = autoCommit ;
6267 this .txMode = txMode (transactionLevel , isReadOnly );
63- this .tx = null ;
6468 this .isClosed = false ;
6569 }
6670
@@ -78,14 +82,10 @@ protected QuerySession createNewQuerySession(YdbValidator validator) throws SQLE
7882 @ Override
7983 public void close () throws SQLException {
8084 closeCurrentResult ();
81- cleanTx ();
8285 isClosed = true ;
83- }
84-
85- private void cleanTx () {
86- if (tx != null ) {
87- tx .getSession ().close ();
88- tx = null ;
86+ QueryTransaction old = tx .getAndSet (null );
87+ if (old != null ) {
88+ old .getSession ().close ();
8989 }
9090 }
9191
@@ -97,7 +97,8 @@ public void setTransactionLevel(int level) throws SQLException {
9797 return ;
9898 }
9999
100- if (tx != null && tx .isActive ()) {
100+ QueryTransaction localTx = tx .get ();
101+ if (localTx != null && localTx .isActive ()) {
101102 throw new SQLFeatureNotSupportedException (YdbConst .CHANGE_ISOLATION_INSIDE_TX );
102103 }
103104
@@ -114,7 +115,8 @@ public void setReadOnly(boolean readOnly) throws SQLException {
114115 return ;
115116 }
116117
117- if (tx != null && tx .isActive ()) {
118+ QueryTransaction localTx = tx .get ();
119+ if (localTx != null && localTx .isActive ()) {
118120 throw new SQLFeatureNotSupportedException (YdbConst .READONLY_INSIDE_TRANSACTION );
119121 }
120122
@@ -130,7 +132,8 @@ public void setAutoCommit(boolean autoCommit) throws SQLException {
130132 return ;
131133 }
132134
133- if (tx != null && tx .isActive ()) {
135+ QueryTransaction localTx = tx .get ();
136+ if (localTx != null && localTx .isActive ()) {
134137 throw new SQLFeatureNotSupportedException (YdbConst .CHANGE_ISOLATION_INSIDE_TX );
135138 }
136139
@@ -146,13 +149,15 @@ public boolean isClosed() throws SQLException {
146149 @ Override
147150 public String txID () throws SQLException {
148151 closeCurrentResult ();
149- return tx != null ? tx .getId () : null ;
152+ QueryTransaction localTx = tx .get ();
153+ return localTx != null ? localTx .getId () : null ;
150154 }
151155
152156 @ Override
153157 public boolean isInsideTransaction () throws SQLException {
154158 ensureOpened ();
155- return tx != null && tx .isActive ();
159+ QueryTransaction localTx = tx .get ();
160+ return localTx != null && localTx .isActive ();
156161 }
157162
158163 @ Override
@@ -177,24 +182,28 @@ public int transactionLevel() throws SQLException {
177182 public void commit (YdbContext ctx , YdbValidator validator ) throws SQLException {
178183 ensureOpened ();
179184
180- if (tx == null || !tx .isActive ()) {
185+ QueryTransaction localTx = tx .get ();
186+ if (localTx == null || !localTx .isActive ()) {
181187 return ;
182188 }
183189
184190 CommitTransactionSettings settings = ctx .withRequestTimeout (CommitTransactionSettings .newBuilder ()).build ();
185191 try {
186192 validator .clearWarnings ();
187- validator .call ("Commit TxId: " + tx .getId (), () -> tx .commit (settings ));
193+ validator .call ("Commit TxId: " + localTx .getId (), () -> localTx .commit (settings ));
188194 } finally {
189- cleanTx ();
195+ if (tx .compareAndSet (localTx , null )) {
196+ localTx .getSession ().close ();
197+ }
190198 }
191199 }
192200
193201 @ Override
194202 public void rollback (YdbContext ctx , YdbValidator validator ) throws SQLException {
195203 ensureOpened ();
196204
197- if (tx == null || !tx .isActive ()) {
205+ QueryTransaction localTx = tx .get ();
206+ if (localTx == null || !localTx .isActive ()) {
198207 return ;
199208 }
200209
@@ -203,9 +212,11 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException
203212
204213 try {
205214 validator .clearWarnings ();
206- validator .execute ("Rollback TxId: " + tx .getId (), () -> tx .rollback (settings ));
215+ validator .execute ("Rollback TxId: " + localTx .getId (), () -> localTx .rollback (settings ));
207216 } finally {
208- cleanTx ();
217+ if (tx .compareAndSet (localTx , null )) {
218+ localTx .getSession ().close ();
219+ }
209220 }
210221 }
211222
@@ -223,13 +234,63 @@ public YdbQueryResult executeDataQuery(
223234 }
224235 final ExecuteQuerySettings settings = builder .build ();
225236
226- if (tx == null ) {
227- tx = createNewQuerySession (validator ).createNewTransaction (txMode );
237+ QueryTransaction nextTx = tx .get ();
238+ while (nextTx == null ) {
239+ nextTx = createNewQuerySession (validator ).createNewTransaction (txMode );
240+ if (!tx .compareAndSet (null , nextTx )) {
241+ nextTx .getSession ().close ();
242+ nextTx = tx .get ();
243+ }
244+ }
245+
246+ final QueryTransaction localTx = nextTx ;
247+
248+ if (useStreamResultSet ) {
249+ String msg = "STREAM_QUERY >>\n " + yql ;
250+ StreamQueryResult lazy = validator .call (msg , () -> {
251+ final CompletableFuture <Result <StreamQueryResult >> future = new CompletableFuture <>();
252+ final QueryStream stream = localTx .createQuery (yql , isAutoCommit , params , settings );
253+ final StreamQueryResult result = new StreamQueryResult (msg , statement , query , stream ::cancel );
254+
255+
256+ stream .execute (new QueryStream .PartsHandler () {
257+ @ Override
258+ public void onIssues (Issue [] issues ) {
259+ validator .addStatusIssues (Arrays .asList (issues ));
260+ }
261+
262+ @ Override
263+ public void onNextPart (QueryResultPart part ) {
264+ result .onStreamResultSet ((int ) part .getResultSetIndex (), part .getResultSetReader ());
265+ future .complete (Result .success (result ));
266+ }
267+ }).whenComplete ((res , th ) -> {
268+ if (!localTx .isActive ()) {
269+ if (tx .compareAndSet (localTx , null )) {
270+ localTx .getSession ().close ();
271+ }
272+ }
273+
274+ if (th != null ) {
275+ future .completeExceptionally (th );
276+ result .onStreamFinished (th );
277+ }
278+ if (res != null ) {
279+ validator .addStatusIssues (res .getStatus ());
280+ future .complete (res .isSuccess () ? Result .success (result ) : Result .fail (res .getStatus ()));
281+ result .onStreamFinished (res .getStatus ());
282+ }
283+ });
284+
285+ return future ;
286+ });
287+
288+ return updateCurrentResult (lazy );
228289 }
229290
230291 try {
231292 QueryReader result = validator .call (QueryType .DATA_QUERY + " >>\n " + yql ,
232- () -> QueryReader .readFrom (tx .createQuery (yql , isAutoCommit , params , settings ))
293+ () -> QueryReader .readFrom (localTx .createQuery (yql , isAutoCommit , params , settings ))
233294 );
234295 validator .addStatusIssues (result .getIssueList ());
235296
@@ -239,36 +300,14 @@ public YdbQueryResult executeDataQuery(
239300 }
240301 return updateCurrentResult (new StaticQueryResult (query , readers ));
241302 } finally {
242- if (!tx .isActive ()) {
243- cleanTx ();
303+ if (!localTx .isActive ()) {
304+ if (tx .compareAndSet (localTx , null )) {
305+ localTx .getSession ().close ();
306+ }
244307 }
245308 }
246309 }
247310
248- @ Override
249- public YdbQueryResult executeScanQuery (YdbStatement statement , YdbQuery query , String yql , Params params )
250- throws SQLException {
251- ensureOpened ();
252-
253- YdbContext ctx = statement .getConnection ().getCtx ();
254- YdbValidator validator = statement .getValidator ();
255-
256- Duration scanQueryTimeout = ctx .getOperationProperties ().getScanQueryTimeout ();
257- ExecuteQuerySettings settings = ExecuteQuerySettings .newBuilder ()
258- .withRequestTimeout (scanQueryTimeout )
259- .build ();
260-
261- final QuerySession session = createNewQuerySession (validator );
262- String msg = "STREAM_QUERY >>\n " + yql ;
263- StreamQueryResult lazy = validator .call (msg , () -> {
264- QueryStream stream = session .createQuery (yql , TxMode .SNAPSHOT_RO , params , settings );
265- StreamQueryResult result = new StreamQueryResult (msg , statement , query , stream ::cancel );
266- return result .execute (stream , session ::close );
267- });
268-
269- return updateCurrentResult (lazy );
270- }
271-
272311 @ Override
273312 public YdbQueryResult executeSchemeQuery (YdbStatement statement , YdbQuery query ) throws SQLException {
274313 ensureOpened ();
0 commit comments