Skip to content

Commit f0d05e9

Browse files
committed
Streaming query result implementation
1 parent c57e1d8 commit f0d05e9

File tree

5 files changed

+407
-25
lines changed

5 files changed

+407
-25
lines changed

jdbc/src/main/java/tech/ydb/jdbc/YdbStatement.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public interface YdbStatement extends Statement {
3333
*/
3434
YdbResultSet executeExplainQuery(String sql) throws SQLException;
3535

36-
YdbValidator getValidator() throws SQLException;
36+
YdbValidator getValidator();
3737

3838
@Override
3939
YdbResultSet executeQuery(String sql) throws SQLException;

jdbc/src/main/java/tech/ydb/jdbc/common/ColumnInfo.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package tech.ydb.jdbc.common;
22

3+
import tech.ydb.table.result.ResultSetReader;
34
import tech.ydb.table.values.PrimitiveType;
45
import tech.ydb.table.values.Type;
56

@@ -66,4 +67,13 @@ public MappingGetters.SqlType getSqlType() {
6667
public MappingGetters.Getters getGetters() {
6768
return this.getters;
6869
}
70+
71+
public static ColumnInfo[] fromResultSetReader(ResultSetReader rsr) {
72+
ColumnInfo[] columns = new ColumnInfo[rsr.getColumnCount()];
73+
for (int idx = 0; idx < rsr.getColumnCount(); idx += 1) {
74+
columns[idx] = new ColumnInfo(rsr.getColumnName(idx), rsr.getColumnType(idx));
75+
}
76+
return columns;
77+
}
78+
6979
}

jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,18 @@
44
import java.sql.SQLException;
55
import java.sql.SQLFeatureNotSupportedException;
66
import java.time.Duration;
7-
import java.util.ArrayList;
87
import java.util.Arrays;
98
import java.util.Collections;
10-
import java.util.List;
119
import java.util.concurrent.TimeUnit;
1210

1311
import tech.ydb.common.transaction.TxMode;
1412
import tech.ydb.core.Issue;
1513
import tech.ydb.core.Result;
1614
import tech.ydb.core.UnexpectedResultException;
1715
import tech.ydb.jdbc.YdbConst;
18-
import tech.ydb.jdbc.YdbResultSet;
1916
import tech.ydb.jdbc.YdbStatement;
2017
import tech.ydb.jdbc.exception.ExceptionFactory;
2118
import tech.ydb.jdbc.impl.YdbQueryResult;
22-
import tech.ydb.jdbc.impl.YdbStaticResultSet;
2319
import tech.ydb.jdbc.query.QueryType;
2420
import tech.ydb.jdbc.query.YdbQuery;
2521
import tech.ydb.query.QueryClient;
@@ -32,7 +28,6 @@
3228
import tech.ydb.query.settings.ExecuteQuerySettings;
3329
import tech.ydb.query.settings.QueryExecMode;
3430
import tech.ydb.query.settings.RollbackTransactionSettings;
35-
import tech.ydb.query.tools.QueryReader;
3631
import tech.ydb.table.query.Params;
3732

3833
/**
@@ -216,20 +211,17 @@ public YdbQueryResult executeDataQuery(
216211
tx = createNewQuerySession(validator).createNewTransaction(txMode);
217212
}
218213

219-
try {
220-
QueryReader result = validator.call(QueryType.DATA_QUERY + " >>\n" + yql,
221-
() -> QueryReader.readFrom(tx.createQuery(yql, isAutoCommit, params, settings))
222-
);
223-
validator.addStatusIssues(result.getIssueList());
214+
String msg = "STREAM_QUERY >>\n" + yql;
215+
QueryStream stream = tx.createQuery(yql, isAutoCommit, params, settings);
216+
StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);
224217

225-
List<YdbResultSet> rsList = new ArrayList<>();
226-
result.forEach(rsr -> rsList.add(new YdbStaticResultSet(statement, rsr)));
227-
return new StaticQueryResult(query, rsList);
228-
} finally {
218+
result.start(stream).thenRun(() -> {
229219
if (!tx.isActive()) {
230220
cleanTx();
231221
}
232-
}
222+
});
223+
224+
return result;
233225
}
234226

235227
@Override

0 commit comments

Comments
 (0)