Skip to content

Commit 2d6a5ee

Browse files
committed
Refactor executor's results to process lazy requests
1 parent 880aeba commit 2d6a5ee

13 files changed

+347
-413
lines changed

jdbc/src/main/java/tech/ydb/jdbc/YdbConnection.java

Lines changed: 2 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,11 @@
22

33
import java.sql.Connection;
44
import java.sql.SQLException;
5-
import java.util.List;
65

76
import javax.annotation.Nullable;
87

98
import tech.ydb.jdbc.context.YdbContext;
10-
import tech.ydb.jdbc.context.YdbValidator;
11-
import tech.ydb.jdbc.query.ExplainedQuery;
12-
import tech.ydb.jdbc.query.YdbQuery;
13-
import tech.ydb.table.query.Params;
14-
import tech.ydb.table.result.ResultSetReader;
15-
import tech.ydb.table.values.ListValue;
9+
import tech.ydb.jdbc.context.YdbExecutor;
1610

1711
public interface YdbConnection extends Connection {
1812
/**
@@ -23,68 +17,9 @@ public interface YdbConnection extends Connection {
2317
@Nullable
2418
String getYdbTxId();
2519

26-
2720
YdbContext getCtx();
2821

29-
/**
30-
* Explicitly execute query as a schema query
31-
*
32-
* @param yql query (DDL) to execute
33-
* @param validator handler for logging and warnings
34-
* @throws SQLException if query cannot be executed
35-
*/
36-
void executeSchemeQuery(String yql, YdbValidator validator) throws SQLException;
37-
38-
/**
39-
* Explicitly execute bulk upsert to the table
40-
*
41-
* @param yql description of request
42-
* @param tablePath path to table
43-
* @param validator handler for logging and warnings
44-
* @param rows bulk rows
45-
* @throws SQLException if query cannot be executed
46-
*/
47-
void executeBulkUpsertQuery(String yql, String tablePath, YdbValidator validator, ListValue rows)
48-
throws SQLException;
49-
50-
/**
51-
* Explicitly execute query as a data query
52-
*
53-
* @param query query to execute
54-
* @param yql YQL text to execute
55-
* @param params parameters for query
56-
* @param timeout timeout of operation
57-
* @param keepInCache flag to store query in server-side cache
58-
* @param validator handler for logging and warnings
59-
* @return list of result set
60-
* @throws SQLException if query cannot be executed
61-
*/
62-
List<ResultSetReader> executeDataQuery(YdbQuery query, String yql, YdbValidator validator,
63-
int timeout, boolean keepInCache, Params params) throws SQLException;
64-
65-
/**
66-
* Explicitly execute query as a scan query
67-
*
68-
* @param query query to execute
69-
* @param yql YQL text to execute
70-
* @param params parameters for query
71-
* @param validator handler for logging and warnings
72-
* @return single result set with rows
73-
* @throws SQLException if query cannot be executed
74-
*/
75-
ResultSetReader executeScanQuery(YdbQuery query, String yql, YdbValidator validator, Params params)
76-
throws SQLException;
77-
78-
/**
79-
* Explicitly explain this query
80-
*
81-
* @param yql query to explain
82-
* @param validator handler for logging and warnings
83-
* @return list of result set of two string columns: {@link YdbConst#EXPLAIN_COLUMN_AST}
84-
* and {@link YdbConst#EXPLAIN_COLUMN_PLAN}
85-
* @throws SQLException if query cannot be explained
86-
*/
87-
ExplainedQuery executeExplainQuery(String yql, YdbValidator validator) throws SQLException;
22+
YdbExecutor getExecutor();
8823

8924
@Override
9025
YdbDatabaseMetaData getMetaData() throws SQLException;

jdbc/src/main/java/tech/ydb/jdbc/YdbStatement.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import java.sql.SQLException;
44
import java.sql.Statement;
55

6+
import tech.ydb.jdbc.context.YdbValidator;
7+
68
public interface YdbStatement extends Statement {
79
/**
810
* Explicitly execute query as a schema query
@@ -31,6 +33,8 @@ public interface YdbStatement extends Statement {
3133
*/
3234
YdbResultSet executeExplainQuery(String sql) throws SQLException;
3335

36+
YdbValidator getValidator() throws SQLException;
37+
3438
/**
3539
* Cant return previous results sets after {@link #getMoreResults()} traverse,
3640
* in case previous rows were not removed by providing {@link Statement#CLOSE_ALL_RESULTS}
@@ -50,5 +54,4 @@ public interface YdbStatement extends Statement {
5054

5155
@Override
5256
YdbConnection getConnection() throws SQLException;
53-
5457
}

jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,12 @@
33
import java.sql.SQLException;
44
import java.time.Duration;
55
import java.util.Collection;
6+
import java.util.Collections;
67
import java.util.concurrent.LinkedBlockingQueue;
78

9+
import tech.ydb.jdbc.YdbResultSet;
10+
import tech.ydb.jdbc.YdbStatement;
11+
import tech.ydb.jdbc.impl.FixedResultSetImpl;
812
import tech.ydb.jdbc.query.QueryType;
913
import tech.ydb.jdbc.query.YdbQuery;
1014
import tech.ydb.table.SessionRetryContext;
@@ -27,31 +31,44 @@ public BaseYdbExecutor(YdbContext ctx) {
2731
}
2832

2933
@Override
30-
public void executeSchemeQuery(YdbContext ctx, YdbValidator validator, String yql) throws SQLException {
34+
public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query) throws SQLException {
3135
ensureOpened();
3236

37+
String yql = query.getPreparedYql();
38+
YdbContext ctx = statement.getConnection().getCtx();
39+
YdbValidator validator = statement.getValidator();
40+
3341
// Scheme query does not affect transactions or result sets
3442
ExecuteSchemeQuerySettings settings = ctx.withDefaultTimeout(new ExecuteSchemeQuerySettings());
3543
validator.execute(QueryType.SCHEME_QUERY + " >>\n" + yql,
3644
() -> retryCtx.supplyStatus(session -> session.executeSchemeQuery(yql, settings))
3745
);
46+
47+
return YdbQueryResult.fromResults(query, Collections.emptyList());
3848
}
3949

4050
@Override
41-
public void executeBulkUpsert(YdbContext ctx, YdbValidator validator, String yql, String tablePath, ListValue rows)
51+
public YdbQueryResult executeBulkUpsert(YdbStatement statement, YdbQuery query, String tablePath, ListValue rows)
4252
throws SQLException {
4353
ensureOpened();
54+
String yql = query.getPreparedYql();
55+
YdbValidator validator = statement.getValidator();
56+
4457
validator.execute(QueryType.BULK_QUERY + " >>\n" + yql,
4558
() -> retryCtx.supplyStatus(session -> session.executeBulkUpsert(tablePath, rows))
4659
);
60+
61+
return YdbQueryResult.fromResults(query, Collections.emptyList());
4762
}
4863

4964
@Override
50-
public ResultSetReader executeScanQuery(
51-
YdbContext ctx, YdbValidator validator, YdbQuery query, String yql, Params params
52-
) throws SQLException {
65+
public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, String yql, Params params)
66+
throws SQLException {
5367
ensureOpened();
5468

69+
YdbContext ctx = statement.getConnection().getCtx();
70+
YdbValidator validator = statement.getValidator();
71+
5572
Collection<ResultSetReader> resultSets = new LinkedBlockingQueue<>();
5673
Duration scanQueryTimeout = ctx.getOperationProperties().getScanQueryTimeout();
5774
ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder()
@@ -66,8 +83,7 @@ public ResultSetReader executeScanQuery(
6683
})
6784
);
6885

69-
return ProtoValueReaders.forResultSets(resultSets);
86+
YdbResultSet rs = new FixedResultSetImpl(statement, ProtoValueReaders.forResultSets(resultSets));
87+
return YdbQueryResult.fromResults(query, Collections.singletonList(rs));
7088
}
71-
72-
7389
}

jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.time.Duration;
77
import java.util.ArrayList;
88
import java.util.Arrays;
9+
import java.util.Collections;
910
import java.util.List;
1011
import java.util.concurrent.TimeUnit;
1112

@@ -14,8 +15,10 @@
1415
import tech.ydb.core.Result;
1516
import tech.ydb.core.UnexpectedResultException;
1617
import tech.ydb.jdbc.YdbConst;
18+
import tech.ydb.jdbc.YdbResultSet;
19+
import tech.ydb.jdbc.YdbStatement;
1720
import tech.ydb.jdbc.exception.ExceptionFactory;
18-
import tech.ydb.jdbc.query.ExplainedQuery;
21+
import tech.ydb.jdbc.impl.FixedResultSetImpl;
1922
import tech.ydb.jdbc.query.QueryType;
2023
import tech.ydb.jdbc.query.YdbQuery;
2124
import tech.ydb.query.QueryClient;
@@ -30,7 +33,6 @@
3033
import tech.ydb.query.settings.RollbackTransactionSettings;
3134
import tech.ydb.query.tools.QueryReader;
3235
import tech.ydb.table.query.Params;
33-
import tech.ydb.table.result.ResultSetReader;
3436

3537
/**
3638
*
@@ -196,12 +198,13 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException
196198
}
197199

198200
@Override
199-
public List<ResultSetReader> executeDataQuery(
200-
YdbContext ctx, YdbValidator validator, YdbQuery query,
201-
String yql, long timeout, boolean keepInCache, Params params
201+
public YdbQueryResult executeDataQuery(
202+
YdbStatement statement, YdbQuery query, String yql, Params params, long timeout, boolean keepInCache
202203
) throws SQLException {
203204
ensureOpened();
204205

206+
YdbValidator validator = statement.getValidator();
207+
205208
ExecuteQuerySettings.Builder builder = ExecuteQuerySettings.newBuilder();
206209
if (timeout > 0) {
207210
builder = builder.withRequestTimeout(timeout, TimeUnit.SECONDS);
@@ -218,9 +221,9 @@ public List<ResultSetReader> executeDataQuery(
218221
);
219222
validator.addStatusIssues(result.getIssueList());
220223

221-
List<ResultSetReader> readers = new ArrayList<>();
222-
result.forEach(readers::add);
223-
return readers;
224+
List<YdbResultSet> rsList = new ArrayList<>();
225+
result.forEach(rsr -> rsList.add(new FixedResultSetImpl(statement, rsr)));
226+
return YdbQueryResult.fromResults(query, rsList);
224227
} finally {
225228
if (!tx.isActive()) {
226229
cleanTx();
@@ -229,9 +232,13 @@ public List<ResultSetReader> executeDataQuery(
229232
}
230233

231234
@Override
232-
public void executeSchemeQuery(YdbContext ctx, YdbValidator validator, String yql) throws SQLException {
235+
public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query) throws SQLException {
233236
ensureOpened();
234237

238+
String yql = query.getPreparedYql();
239+
YdbContext ctx = statement.getConnection().getCtx();
240+
YdbValidator validator = statement.getValidator();
241+
235242
// Scheme query does not affect transactions or result sets
236243
ExecuteQuerySettings settings = ctx.withRequestTimeout(ExecuteQuerySettings.newBuilder()).build();
237244
try (QuerySession session = createNewQuerySession(validator)) {
@@ -240,13 +247,18 @@ public void executeSchemeQuery(YdbContext ctx, YdbValidator validator, String yq
240247
.execute(new IssueHandler(validator))
241248
);
242249
}
250+
251+
return YdbQueryResult.fromResults(query, Collections.emptyList());
243252
}
244253

245254
@Override
246-
public ExplainedQuery executeExplainQuery(YdbContext ctx, YdbValidator validator, String yql)
247-
throws SQLException {
255+
public YdbQueryResult executeExplainQuery(YdbStatement statement, YdbQuery query) throws SQLException {
248256
ensureOpened();
249257

258+
String yql = query.getPreparedYql();
259+
YdbContext ctx = statement.getConnection().getCtx();
260+
YdbValidator validator = statement.getValidator();
261+
250262
// Scheme query does not affect transactions or result sets
251263
ExecuteQuerySettings settings = ctx.withRequestTimeout(ExecuteQuerySettings.newBuilder())
252264
.withExecMode(QueryExecMode.EXPLAIN)
@@ -261,7 +273,8 @@ public ExplainedQuery executeExplainQuery(YdbContext ctx, YdbValidator validator
261273
if (!res.hasStats()) {
262274
throw new SQLException("No explain data");
263275
}
264-
return new ExplainedQuery(res.getStats().getQueryAst(), res.getStats().getQueryPlan());
276+
277+
return YdbQueryResult.fromExplain(statement, res.getStats().getQueryAst(), res.getStats().getQueryPlan());
265278
}
266279
}
267280

jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@
1010
import tech.ydb.core.Result;
1111
import tech.ydb.core.UnexpectedResultException;
1212
import tech.ydb.jdbc.YdbConst;
13+
import tech.ydb.jdbc.YdbResultSet;
14+
import tech.ydb.jdbc.YdbStatement;
1315
import tech.ydb.jdbc.exception.ExceptionFactory;
14-
import tech.ydb.jdbc.query.ExplainedQuery;
16+
import tech.ydb.jdbc.impl.FixedResultSetImpl;
1517
import tech.ydb.jdbc.query.QueryType;
1618
import tech.ydb.jdbc.query.YdbQuery;
1719
import tech.ydb.table.Session;
@@ -34,13 +36,15 @@
3436
public class TableServiceExecutor extends BaseYdbExecutor {
3537
private final Duration sessionTimeout;
3638
private final TableClient tableClient;
39+
private final boolean failOnTruncatedResult;
3740
private volatile TxState tx;
3841

3942
public TableServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException {
4043
super(ctx);
4144
this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout();
4245
this.tableClient = ctx.getTableClient();
4346
this.tx = createTx(transactionLevel, autoCommit);
47+
this.failOnTruncatedResult = ctx.getOperationProperties().isFailOnTruncatedResult();
4448
}
4549

4650
@Override
@@ -173,25 +177,27 @@ private ExecuteDataQuerySettings dataQuerySettings(long timeout, boolean keepInC
173177
}
174178

175179
@Override
176-
public ExplainedQuery executeExplainQuery(YdbContext ctx, YdbValidator validator, String yql)
177-
throws SQLException {
180+
public YdbQueryResult executeExplainQuery(YdbStatement statement, YdbQuery query) throws SQLException {
178181
ensureOpened();
179182

183+
YdbContext ctx = statement.getConnection().getCtx();
184+
YdbValidator validator = statement.getValidator();
185+
String yql = query.getPreparedYql();
186+
180187
ExplainDataQuerySettings settings = ctx.withDefaultTimeout(new ExplainDataQuerySettings());
181188
try (Session session = createNewTableSession(validator)) {
182189
String msg = QueryType.EXPLAIN_QUERY + " >>\n" + yql;
183190
ExplainDataQueryResult res = validator.call(msg, () -> session.explainDataQuery(yql, settings));
184-
return new ExplainedQuery(res.getQueryAst(), res.getQueryPlan());
191+
return YdbQueryResult.fromExplain(statement, res.getQueryAst(), res.getQueryPlan());
185192
}
186193
}
187194

188195
@Override
189-
public List<ResultSetReader> executeDataQuery(
190-
YdbContext ctx, YdbValidator validator, YdbQuery query,
191-
String yql, long timeout, boolean keepInCache, Params params
192-
) throws SQLException {
196+
public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String yql, Params params,
197+
long timeout, boolean keepInCache) throws SQLException {
193198
ensureOpened();
194199

200+
YdbValidator validator = statement.getValidator();
195201
final Session session = tx.getSession(validator);
196202
try {
197203
DataQueryResult result = validator.call(
@@ -200,12 +206,18 @@ public List<ResultSetReader> executeDataQuery(
200206
);
201207
updateState(tx.withDataQuery(session, result.getTxId()));
202208

203-
List<ResultSetReader> readers = new ArrayList<>();
209+
List<YdbResultSet> readers = new ArrayList<>();
204210
for (int idx = 0; idx < result.getResultSetCount(); idx += 1) {
205-
readers.add(result.getResultSet(idx));
211+
ResultSetReader rs = result.getResultSet(idx);
212+
if (failOnTruncatedResult && rs.isTruncated()) {
213+
String msg = String.format(YdbConst.RESULT_IS_TRUNCATED, idx, rs.getRowCount());
214+
throw new SQLException(msg);
215+
}
216+
217+
readers.add(new FixedResultSetImpl(statement, rs));
206218
}
207219

208-
return readers;
220+
return YdbQueryResult.fromResults(query, readers);
209221
} catch (SQLException | RuntimeException ex) {
210222
updateState(tx.withRollback(session));
211223
throw ex;

0 commit comments

Comments
 (0)