Skip to content

Commit cdcd273

Browse files
committed
Disable useStreamResultSets for scan queries
1 parent e3e0dbe commit cdcd273

File tree

4 files changed

+42
-7
lines changed

4 files changed

+42
-7
lines changed

jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,28 @@
22

33
import java.sql.SQLException;
44
import java.time.Duration;
5+
import java.util.Collection;
56
import java.util.Collections;
67
import java.util.concurrent.CompletableFuture;
8+
import java.util.concurrent.LinkedBlockingQueue;
79
import java.util.concurrent.atomic.AtomicReference;
810

911
import tech.ydb.core.Result;
1012
import tech.ydb.core.grpc.GrpcReadStream;
1113
import tech.ydb.jdbc.YdbConst;
14+
import tech.ydb.jdbc.YdbResultSet;
1215
import tech.ydb.jdbc.YdbStatement;
1316
import tech.ydb.jdbc.YdbTracer;
1417
import tech.ydb.jdbc.impl.YdbQueryResult;
18+
import tech.ydb.jdbc.impl.YdbStaticResultSet;
1519
import tech.ydb.jdbc.query.QueryType;
1620
import tech.ydb.jdbc.query.YdbQuery;
1721
import tech.ydb.table.Session;
1822
import tech.ydb.table.SessionRetryContext;
1923
import tech.ydb.table.TableClient;
2024
import tech.ydb.table.query.Params;
2125
import tech.ydb.table.result.ResultSetReader;
26+
import tech.ydb.table.result.impl.ProtoValueReaders;
2227
import tech.ydb.table.settings.ExecuteScanQuerySettings;
2328
import tech.ydb.table.settings.ExecuteSchemeQuerySettings;
2429
import tech.ydb.table.values.ListValue;
@@ -31,6 +36,7 @@ public abstract class BaseYdbExecutor implements YdbExecutor {
3136
private final SessionRetryContext retryCtx;
3237
private final Duration sessionTimeout;
3338
private final TableClient tableClient;
39+
private final boolean useStreamResultSet;
3440

3541
private final AtomicReference<YdbQueryResult> currResult;
3642
protected final boolean traceEnabled;
@@ -40,6 +46,7 @@ public BaseYdbExecutor(YdbContext ctx) {
4046
this.retryCtx = ctx.getRetryCtx();
4147
this.traceEnabled = ctx.isTxTracerEnabled();
4248
this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout();
49+
this.useStreamResultSet = ctx.getOperationProperties().getUseStreamResultSets();
4350
this.tableClient = ctx.getTableClient();
4451
this.prefixPragma = ctx.getPrefixPragma();
4552
this.currResult = new AtomicReference<>();
@@ -138,6 +145,24 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
138145
tracer.query(yql);
139146

140147
final Session session = createNewTableSession(validator);
148+
149+
if (!useStreamResultSet) {
150+
try {
151+
Collection<ResultSetReader> resultSets = new LinkedBlockingQueue<>();
152+
153+
ctx.traceQuery(query, yql);
154+
validator.execute(QueryType.SCAN_QUERY + " >>\n" + yql, tracer,
155+
() -> session.executeScanQuery(yql, params, settings).start(resultSets::add)
156+
);
157+
158+
YdbResultSet rs = new YdbStaticResultSet(statement, ProtoValueReaders.forResultSets(resultSets));
159+
return updateCurrentResult(new StaticQueryResult(query, Collections.singletonList(rs)));
160+
} finally {
161+
session.close();
162+
tracer.close();
163+
}
164+
}
165+
141166
StreamQueryResult lazy = validator.call(msg, () -> {
142167
final CompletableFuture<Result<StreamQueryResult>> future = new CompletableFuture<>();
143168
final GrpcReadStream<ResultSetReader> stream = session.executeScanQuery(yql, params, settings);

jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,15 @@ public void close() throws SQLException {
167167
return;
168168
}
169169

170+
for (CompletableFuture<Result<LazyResultSet>> future: resultFutures) {
171+
if (future.isDone()) {
172+
Result<LazyResultSet> res = future.join();
173+
if (res.isSuccess()) {
174+
res.getValue().close();
175+
}
176+
}
177+
}
178+
170179
LOGGER.log(Level.FINE, "Stream closed with status {0}", status);
171180
if (!status.isSuccess()) {
172181
throw ExceptionFactory.createException("Cannot execute '" + msg + "' with " + status,

jdbc/src/test/java/tech/ydb/jdbc/impl/YdbLazyResultSetImplTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ public class YdbLazyResultSetImplTest {
6464
private static final YdbHelperExtension ydb = new YdbHelperExtension();
6565

6666
@RegisterExtension
67-
private static final JdbcConnectionExtention jdbc = new JdbcConnectionExtention(ydb);
67+
private static final JdbcConnectionExtention jdbc = new JdbcConnectionExtention(ydb)
68+
.withArg("useStreamResultSets", "true");
6869

6970
private static final SqlQueries TEST_TABLE = new SqlQueries("ydb_result_set_test");
7071

@@ -96,7 +97,7 @@ public static void dropTable() throws SQLException {
9697
@BeforeEach
9798
public void beforeEach() throws SQLException {
9899
statement = jdbc.connection().createStatement();
99-
resultSet = statement.executeQuery(TEST_TABLE.scanSelectSQL());
100+
resultSet = statement.executeQuery(TEST_TABLE.selectSQL());
100101
}
101102

102103
@AfterEach

jdbc/src/test/java/tech/ydb/jdbc/impl/YdbTableConnectionImplTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -883,9 +883,9 @@ public void testBigBulkAndScan() throws SQLException {
883883
Random rnd = new Random(0x234567);
884884
int payloadLength = 1000;
885885

886-
try {
886+
try (Connection conn = jdbc.createCustomConnection("useStreamResultSets", "true")) {
887887
// BULK UPSERT
888-
try (PreparedStatement ps = jdbc.connection().prepareStatement(bulkUpsert)) {
888+
try (PreparedStatement ps = conn.prepareStatement(bulkUpsert)) {
889889
for (int idx = 1; idx <= 10000; idx++) {
890890
ps.setInt(1, idx);
891891
String payload = createPayload(rnd, payloadLength);
@@ -899,7 +899,7 @@ public void testBigBulkAndScan() throws SQLException {
899899
}
900900

901901
// SCAN all table
902-
try (PreparedStatement ps = jdbc.connection().prepareStatement(scanSelectAll)) {
902+
try (PreparedStatement ps = conn.prepareStatement(scanSelectAll)) {
903903
int readed = 0;
904904
Assertions.assertTrue(ps.execute());
905905
try (ResultSet rs = ps.getResultSet()) {
@@ -913,7 +913,7 @@ public void testBigBulkAndScan() throws SQLException {
913913
}
914914

915915
// Canceled scan
916-
try (PreparedStatement ps = jdbc.connection().prepareStatement(scanSelectAll)) {
916+
try (PreparedStatement ps = conn.prepareStatement(scanSelectAll)) {
917917
Assertions.assertTrue(ps.execute());
918918
ps.getResultSet().next();
919919
ps.getResultSet().close();
@@ -931,7 +931,7 @@ public void testBigBulkAndScan() throws SQLException {
931931
}
932932

933933
// Scan was cancelled, but connection still work
934-
try (PreparedStatement ps = jdbc.connection().prepareStatement(selectOne)) {
934+
try (PreparedStatement ps = conn.prepareStatement(selectOne)) {
935935
ps.setInt(1, 1234);
936936

937937
Assertions.assertTrue(ps.execute());

0 commit comments

Comments
 (0)