Skip to content

Commit 4cfae23

Browse files
authored
New implementation of stream result sets (#143)
2 parents f49a537 + 0710a20 commit 4cfae23

32 files changed

+1830
-1283
lines changed

jdbc/src/main/java/tech/ydb/jdbc/YdbConst.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,22 @@ public final class YdbConst {
5353
public static final String QUERY_EXPECT_RESULT_SET = "Query must return ResultSet";
5454
public static final String QUERY_EXPECT_UPDATE = "Query must not return ResultSet";
5555
public static final String UNABLE_TO_SET_NULL_OBJECT = "Unable to set null object, type is required";
56-
public static final String DIRECTION_UNSUPPORTED = "Direction is not supported: ";
56+
5757
public static final String RESULT_SET_MODE_UNSUPPORTED = "ResultSet mode is not supported: ";
5858
public static final String RESULT_SET_UNAVAILABLE = "ResultSet is not available at index: ";
59+
public static final String RESULT_SET_IS_CLOSED = "ResultSet is closed";
5960
public static final String RESULT_IS_TRUNCATED = "Result #%s was truncated to %s rows";
61+
public static final String RESULT_WAS_INTERRUPTED = "ResultSet reading was interrupted";
62+
public static final String RESULT_IS_NOT_SCROLLABLE =
63+
"Requested scrollable ResutlSet, but this ResultSet is FORWARD_ONLY.";
64+
public static final String RESULT_SET_TYPE_UNSUPPORTED =
65+
"resultSetType must be ResultSet.TYPE_FORWARD_ONLY or ResultSet.TYPE_SCROLL_INSENSITIVE";
66+
public static final String RESULT_SET_CONCURRENCY_UNSUPPORTED =
67+
"resultSetConcurrency must be ResultSet.CONCUR_READ_ONLY";
68+
public static final String RESULT_SET_HOLDABILITY_UNSUPPORTED =
69+
"resultSetHoldability must be ResultSet.HOLD_CURSORS_OVER_COMMIT";
70+
71+
6072
public static final String INVALID_FETCH_DIRECTION = "Fetch direction %s cannot be used when result set type is %s";
6173
public static final String COLUMN_NOT_FOUND = "Column not found: ";
6274
public static final String COLUMN_NUMBER_NOT_FOUND = "Column is out of range: ";
@@ -71,19 +83,12 @@ public final class YdbConst {
7183
public static final String METADATA_RS_UNSUPPORTED_IN_PS = "ResultSet metadata is not supported " +
7284
"in prepared statements";
7385
public static final String CANNOT_UNWRAP_TO = "Cannot unwrap to ";
74-
public static final String RESULT_SET_TYPE_UNSUPPORTED =
75-
"resultSetType must be ResultSet.TYPE_FORWARD_ONLY or ResultSet.TYPE_SCROLL_INSENSITIVE";
76-
public static final String RESULT_SET_CONCURRENCY_UNSUPPORTED =
77-
"resultSetConcurrency must be ResultSet.CONCUR_READ_ONLY";
78-
public static final String RESULT_SET_HOLDABILITY_UNSUPPORTED =
79-
"resultSetHoldability must be ResultSet.HOLD_CURSORS_OVER_COMMIT";
8086
public static final String READONLY_INSIDE_TRANSACTION = "Cannot change read-only attribute inside a transaction";
8187
public static final String CHANGE_ISOLATION_INSIDE_TX = "Cannot change transaction isolation inside a transaction";
8288
public static final String UNSUPPORTED_TRANSACTION_LEVEL = "Unsupported transaction level: ";
8389
public static final String CLOSED_CONNECTION = "Connection is closed";
8490
public static final String DB_QUERY_DEADLINE_EXCEEDED = "DB query deadline exceeded: ";
8591
public static final String DB_QUERY_CANCELLED = "DB query cancelled: ";
86-
public static final String DATABASE_QUERY_INTERRUPTED = "Database query interrupted";
8792
public static final String DATABASE_UNAVAILABLE = "Database is unavailable: ";
8893
public static final String CANNOT_LOAD_DATA_FROM_IS = "Unable to load data from input stream: ";
8994
public static final String CANNOT_LOAD_DATA_FROM_READER = "Unable to load data from reader: ";
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package tech.ydb.jdbc;
2+
3+
import java.sql.SQLException;
4+
5+
/**
6+
*
7+
* @author Aleksandr Gorshenin
8+
*/
9+
public interface YdbQueryResult {
10+
int getUpdateCount() throws SQLException;
11+
YdbResultSet getCurrentResultSet() throws SQLException;
12+
YdbResultSet getGeneratedKeys() throws SQLException;
13+
14+
boolean hasResultSets() throws SQLException;
15+
boolean getMoreResults(int current) throws SQLException;
16+
17+
void close() throws SQLException;
18+
}

jdbc/src/main/java/tech/ydb/jdbc/YdbResultSet.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ public interface YdbResultSet extends ResultSet {
2626
*/
2727
Value<?> getNativeColumn(String columnLabel) throws SQLException;
2828

29-
//
30-
3129
@Override
3230
YdbResultSetMetaData getMetaData() throws SQLException;
3331

jdbc/src/main/java/tech/ydb/jdbc/YdbStatement.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,19 @@ public interface YdbStatement extends Statement {
4343

4444
@Override
4545
YdbConnection getConnection() throws SQLException;
46+
47+
@Override
48+
int getQueryTimeout();
49+
50+
@Override
51+
boolean isPoolable();
52+
53+
@Override
54+
int getFetchSize();
55+
56+
@Override
57+
int getFetchDirection();
58+
59+
@Override
60+
int getMaxRows();
4661
}

jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java

Lines changed: 36 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,22 @@
33
import java.sql.SQLException;
44
import java.time.Duration;
55
import java.util.Collection;
6-
import java.util.Collections;
7-
import java.util.concurrent.CompletableFuture;
86
import java.util.concurrent.LinkedBlockingQueue;
97
import java.util.concurrent.atomic.AtomicReference;
108

11-
import tech.ydb.core.Result;
9+
import tech.ydb.core.Status;
1210
import tech.ydb.core.StatusCode;
1311
import tech.ydb.core.grpc.GrpcReadStream;
1412
import tech.ydb.jdbc.YdbConst;
13+
import tech.ydb.jdbc.YdbQueryResult;
1514
import tech.ydb.jdbc.YdbResultSet;
1615
import tech.ydb.jdbc.YdbStatement;
1716
import tech.ydb.jdbc.YdbTracer;
1817
import tech.ydb.jdbc.common.YdbTypes;
1918
import tech.ydb.jdbc.exception.YdbRetryableException;
20-
import tech.ydb.jdbc.impl.YdbQueryResult;
21-
import tech.ydb.jdbc.impl.YdbStaticResultSet;
19+
import tech.ydb.jdbc.impl.YdbQueryResultReader;
20+
import tech.ydb.jdbc.impl.YdbQueryResultStatic;
21+
import tech.ydb.jdbc.impl.YdbResultSetMemory;
2222
import tech.ydb.jdbc.query.QueryType;
2323
import tech.ydb.jdbc.query.YdbQuery;
2424
import tech.ydb.table.Session;
@@ -66,7 +66,8 @@ protected Session createNewTableSession(YdbValidator validator) throws SQLExcept
6666
return validator.call("Get session", null, () -> tableClient.createSession(sessionTimeout));
6767
}
6868

69-
protected void closeCurrentResult() throws SQLException {
69+
@Override
70+
public void clearState() throws SQLException {
7071
YdbQueryResult rs = currResult.get();
7172
if (rs != null) {
7273
rs.close();
@@ -83,20 +84,18 @@ protected YdbQueryResult updateCurrentResult(YdbQueryResult result) throws SQLEx
8384

8485
@Override
8586
public void ensureOpened() throws SQLException {
86-
closeCurrentResult();
8787
if (isClosed()) {
8888
throw new SQLException(YdbConst.CLOSED_CONNECTION);
8989
}
9090
}
9191

92-
9392
@Override
94-
public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params,
95-
long timeout, boolean keepInCache) throws SQLException {
93+
public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params)
94+
throws SQLException {
9695
boolean insideTx = isInsideTransaction();
9796
while (true) {
9897
try {
99-
return executeQueryImpl(statement, query, preparedYql, params, timeout, keepInCache);
98+
return executeQueryImpl(statement, query, preparedYql, params);
10099
} catch (YdbRetryableException ex) {
101100
if (insideTx || ex.getStatus().getCode() != StatusCode.BAD_SESSION) {
102101
throw ex;
@@ -106,7 +105,7 @@ public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, S
106105
}
107106

108107
protected abstract YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query, String preparedYql,
109-
Params params, long timeout, boolean keepInCache) throws SQLException;
108+
Params params) throws SQLException;
110109

111110
@Override
112111
public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query) throws SQLException {
@@ -130,7 +129,7 @@ public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query)
130129
tracer.close();
131130
}
132131

133-
return updateCurrentResult(new StaticQueryResult(query, Collections.emptyList()));
132+
return updateCurrentResult(new YdbQueryResultStatic(query));
134133
}
135134

136135
@Override
@@ -152,7 +151,7 @@ public YdbQueryResult executeBulkUpsert(YdbStatement statement, YdbQuery query,
152151
tracer.close();
153152
}
154153

155-
return updateCurrentResult(new StaticQueryResult(query, Collections.emptyList()));
154+
return updateCurrentResult(new YdbQueryResultStatic(query));
156155
}
157156

158157
@Override
@@ -164,9 +163,6 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
164163
YdbContext ctx = statement.getConnection().getCtx();
165164
YdbValidator validator = statement.getValidator();
166165
Duration scanQueryTimeout = ctx.getOperationProperties().getScanQueryTimeout();
167-
ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder()
168-
.withRequestTimeout(scanQueryTimeout)
169-
.build();
170166
String msg = QueryType.SCAN_QUERY + " >>\n" + yql;
171167

172168
YdbTracer tracer = ctx.getTracer();
@@ -177,50 +173,49 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
177173

178174
if (!useStreamResultSet) {
179175
try {
176+
ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder()
177+
.withRequestTimeout(scanQueryTimeout)
178+
.build();
179+
180180
Collection<ResultSetReader> resultSets = new LinkedBlockingQueue<>();
181181

182182
ctx.traceQueryByFullScanDetector(query, yql);
183183
validator.execute(QueryType.SCAN_QUERY + " >>\n" + yql, tracer,
184184
() -> session.executeScanQuery(yql, params, settings).start(resultSets::add)
185185
);
186186

187-
YdbResultSet rs = new YdbStaticResultSet(types, statement, ProtoValueReaders.forResultSets(resultSets));
188-
return updateCurrentResult(new StaticQueryResult(query, Collections.singletonList(rs)));
187+
YdbResultSet rs = new YdbResultSetMemory(types, statement, ProtoValueReaders.forResultSets(resultSets));
188+
return updateCurrentResult(new YdbQueryResultStatic(query, rs));
189189
} finally {
190190
session.close();
191191
tracer.close();
192192
}
193193
}
194194

195-
StreamQueryResult lazy = validator.call(msg, null, () -> {
196-
final CompletableFuture<Result<StreamQueryResult>> future = new CompletableFuture<>();
197-
final GrpcReadStream<ResultSetReader> stream = session.executeScanQuery(yql, params, settings);
198-
final StreamQueryResult result = new StreamQueryResult(msg, types, statement, query, stream::cancel);
199-
200-
stream.start((rsr) -> {
201-
future.complete(Result.success(result));
202-
result.onStreamResultSet(0, rsr);
203-
}).whenComplete((st, th) -> {
195+
final YdbQueryResultReader reader = new YdbQueryResultReader(types, statement, query) {
196+
@Override
197+
public void onClose(Status status, Throwable th) {
204198
session.close();
205-
206199
if (th != null) {
207-
result.onStreamFinished(th);
208-
future.completeExceptionally(th);
209200
tracer.trace("<-- " + th.getMessage());
210201
}
211-
if (st != null) {
212-
validator.addStatusIssues(st);
213-
result.onStreamFinished(st);
214-
future.complete(st.isSuccess() ? Result.success(result) : Result.fail(st));
215-
tracer.trace("<-- " + st.toString());
202+
if (status != null) {
203+
validator.addStatusIssues(status);
204+
tracer.trace("<-- " + status.toString());
216205
}
217206
tracer.close();
218-
});
219207

220-
return future;
221-
});
208+
super.onClose(status, th);
209+
}
210+
};
222211

223-
return updateCurrentResult(lazy);
224-
}
212+
ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder()
213+
.withRequestTimeout(scanQueryTimeout)
214+
.setGrpcFlowControl(reader)
215+
.build();
225216

217+
GrpcReadStream<ResultSetReader> stream = session.executeScanQuery(yql, params, settings);
218+
validator.execute(msg, tracer, () -> reader.load(stream));
219+
return updateCurrentResult(reader);
220+
}
226221
}

0 commit comments

Comments
 (0)