22
33import java .sql .SQLException ;
44import java .time .Duration ;
5+ import java .util .Collection ;
56import java .util .Collections ;
67import java .util .concurrent .CompletableFuture ;
8+ import java .util .concurrent .LinkedBlockingQueue ;
79import java .util .concurrent .atomic .AtomicReference ;
810
911import tech .ydb .core .Result ;
1012import tech .ydb .core .grpc .GrpcReadStream ;
1113import tech .ydb .jdbc .YdbConst ;
14+ import tech .ydb .jdbc .YdbResultSet ;
1215import tech .ydb .jdbc .YdbStatement ;
1316import tech .ydb .jdbc .YdbTracer ;
1417import tech .ydb .jdbc .impl .YdbQueryResult ;
18+ import tech .ydb .jdbc .impl .YdbStaticResultSet ;
1519import tech .ydb .jdbc .query .QueryType ;
1620import tech .ydb .jdbc .query .YdbQuery ;
1721import tech .ydb .table .Session ;
1822import tech .ydb .table .SessionRetryContext ;
1923import tech .ydb .table .TableClient ;
2024import tech .ydb .table .query .Params ;
2125import tech .ydb .table .result .ResultSetReader ;
26+ import tech .ydb .table .result .impl .ProtoValueReaders ;
2227import tech .ydb .table .settings .ExecuteScanQuerySettings ;
2328import tech .ydb .table .settings .ExecuteSchemeQuerySettings ;
2429import tech .ydb .table .values .ListValue ;
@@ -31,6 +36,7 @@ public abstract class BaseYdbExecutor implements YdbExecutor {
3136 private final SessionRetryContext retryCtx ;
3237 private final Duration sessionTimeout ;
3338 private final TableClient tableClient ;
39+ private final boolean useStreamResultSet ;
3440
3541 private final AtomicReference <YdbQueryResult > currResult ;
3642 protected final boolean traceEnabled ;
@@ -40,6 +46,7 @@ public BaseYdbExecutor(YdbContext ctx) {
4046 this .retryCtx = ctx .getRetryCtx ();
4147 this .traceEnabled = ctx .isTxTracerEnabled ();
4248 this .sessionTimeout = ctx .getOperationProperties ().getSessionTimeout ();
49+ this .useStreamResultSet = ctx .getOperationProperties ().getUseStreamResultSets ();
4350 this .tableClient = ctx .getTableClient ();
4451 this .prefixPragma = ctx .getPrefixPragma ();
4552 this .currResult = new AtomicReference <>();
@@ -138,6 +145,24 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
138145 tracer .query (yql );
139146
140147 final Session session = createNewTableSession (validator );
148+
149+ if (!useStreamResultSet ) {
150+ try {
151+ Collection <ResultSetReader > resultSets = new LinkedBlockingQueue <>();
152+
153+ ctx .traceQuery (query , yql );
154+ validator .execute (QueryType .SCAN_QUERY + " >>\n " + yql , tracer ,
155+ () -> session .executeScanQuery (yql , params , settings ).start (resultSets ::add )
156+ );
157+
158+ YdbResultSet rs = new YdbStaticResultSet (statement , ProtoValueReaders .forResultSets (resultSets ));
159+ return updateCurrentResult (new StaticQueryResult (query , Collections .singletonList (rs )));
160+ } finally {
161+ session .close ();
162+ tracer .close ();
163+ }
164+ }
165+
141166 StreamQueryResult lazy = validator .call (msg , () -> {
142167 final CompletableFuture <Result <StreamQueryResult >> future = new CompletableFuture <>();
143168 final GrpcReadStream <ResultSetReader > stream = session .executeScanQuery (yql , params , settings );
0 commit comments