Skip to content

Commit f8b825d

Browse files
committed
Added option to use streamable result sets
1 parent ed8a3b9 commit f8b825d

File tree

6 files changed

+168
-7
lines changed

6 files changed

+168
-7
lines changed

jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
public class QueryServiceExecutor extends BaseYdbExecutor {
4444
private final Duration sessionTimeout;
4545
private final QueryClient queryClient;
46+
private final boolean useStreamResultSet;
4647

4748
private int transactionLevel;
4849
private boolean isReadOnly;
@@ -56,6 +57,8 @@ public QueryServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCo
5657
super(ctx);
5758
this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout();
5859
this.queryClient = ctx.getQueryClient();
60+
this.useStreamResultSet = ctx.getOperationProperties().getUseStreamResultSets();
61+
5962
this.transactionLevel = transactionLevel;
6063
this.isReadOnly = transactionLevel != Connection.TRANSACTION_SERIALIZABLE;
6164
this.isAutoCommit = autoCommit;
@@ -227,6 +230,21 @@ public YdbQueryResult executeDataQuery(
227230
tx = createNewQuerySession(validator).createNewTransaction(txMode);
228231
}
229232

233+
if (useStreamResultSet) {
234+
String msg = "STREAM_QUERY >>\n" + yql;
235+
StreamQueryResult lazy = validator.call(msg, () -> {
236+
QueryStream stream = tx.createQuery(yql, isAutoCommit, params, settings);
237+
StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);
238+
return result.execute(stream, () -> {
239+
if (!tx.isActive()) {
240+
cleanTx();
241+
}
242+
});
243+
});
244+
245+
return updateCurrentResult(lazy);
246+
}
247+
230248
try {
231249
QueryReader result = validator.call(QueryType.DATA_QUERY + " >>\n" + yql,
232250
() -> QueryReader.readFrom(tx.createQuery(yql, isAutoCommit, params, settings))

jdbc/src/main/java/tech/ydb/jdbc/settings/YdbConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class YdbConfig {
3838
+ "{@code 0} disables the cache.", 256
3939
);
4040
static final YdbProperty<Boolean> USE_QUERY_SERVICE = YdbProperty.bool("useQueryService",
41-
"Use QueryService intead of TableService", false
41+
"Use QueryService instead of TableService", false
4242
);
4343
static final YdbProperty<Boolean> FULLSCAN_DETECTOR_ENABLED = YdbProperty.bool(
4444
"jdbcFullScanDetector", "Enable analizator for collecting query stats", false
@@ -167,6 +167,7 @@ public DriverPropertyInfo[] toPropertyInfo() throws SQLException {
167167
YdbClientProperties.SESSION_POOL_SIZE_MIN.toInfo(properties),
168168
YdbClientProperties.SESSION_POOL_SIZE_MAX.toInfo(properties),
169169

170+
YdbOperationProperties.USE_STREAM_RESULT_SETS.toInfo(properties),
170171
YdbOperationProperties.JOIN_DURATION.toInfo(properties),
171172
YdbOperationProperties.QUERY_TIMEOUT.toInfo(properties),
172173
YdbOperationProperties.SCAN_QUERY_TIMEOUT.toInfo(properties),

jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ public class YdbOperationProperties {
5353
FakeTxMode.ERROR
5454
);
5555

56+
static final YdbProperty<Boolean> USE_STREAM_RESULT_SETS = YdbProperty.bool("useStreamResultSets",
57+
"Use stream implementation of ResultSet", false
58+
);
59+
5660
private static final int MAX_ROWS = 1000; // TODO: how to figure out the max rows of current connection?
5761

5862
private final YdbValue<Duration> joinDuration;
@@ -68,6 +72,8 @@ public class YdbOperationProperties {
6872
private final YdbValue<FakeTxMode> schemeQueryTxMode;
6973
private final YdbValue<FakeTxMode> bulkQueryTxMode;
7074

75+
private final YdbValue<Boolean> useStreamResultSets;
76+
7177
public YdbOperationProperties(YdbConfig config) throws SQLException {
7278
Properties props = config.getProperties();
7379

@@ -83,6 +89,8 @@ public YdbOperationProperties(YdbConfig config) throws SQLException {
8389
this.scanQueryTxMode = SCAN_QUERY_TX_MODE.readValue(props);
8490
this.schemeQueryTxMode = SCHEME_QUERY_TX_MODE.readValue(props);
8591
this.bulkQueryTxMode = BULK_QUERY_TX_MODE.readValue(props);
92+
93+
this.useStreamResultSets = USE_STREAM_RESULT_SETS.readValue(props);
8694
}
8795

8896
public Duration getJoinDuration() {
@@ -129,6 +137,10 @@ public int getTransactionLevel() {
129137
return transactionLevel.getValue();
130138
}
131139

140+
public boolean getUseStreamResultSets() {
141+
return useStreamResultSets.getValue();
142+
}
143+
132144
public int getMaxRows() {
133145
return MAX_ROWS;
134146
}

jdbc/src/test/java/tech/ydb/jdbc/YdbDriverTablesTest.java

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,4 +313,132 @@ public void forceScanAndBulkTest() throws SQLException {
313313
}
314314
}
315315
}
316+
317+
@Test
318+
public void streamResultsTest() throws SQLException {
319+
try (Connection conn = DriverManager.getConnection(jdbcURL
320+
.withArg("useQueryService", "true")
321+
.withArg("useStreamResultSets", "true")
322+
.build()
323+
)) {
324+
try {
325+
conn.createStatement().execute(DROP_TABLE);
326+
} catch (SQLException e) {
327+
// ignore
328+
}
329+
330+
conn.createStatement().execute(CREATE_TABLE);
331+
332+
LocalDate ld = LocalDate.of(2017, 12, 3);
333+
String prefix = "text-value-";
334+
int idx = 0;
335+
336+
// single batch upsert
337+
try (PreparedStatement ps = conn.prepareStatement(UPSERT_ROW)) {
338+
ps.setInt(1, ++idx);
339+
ps.setString(2, prefix + idx);
340+
ps.setDate(3, Date.valueOf(ld.plusDays(idx)));
341+
ps.executeUpdate();
342+
}
343+
344+
// single batch insert
345+
try (PreparedStatement ps = conn.prepareStatement(INSERT_ROW)) {
346+
ps.setInt(1, ++idx);
347+
ps.setString(2, prefix + idx);
348+
ps.setDate(3, Date.valueOf(ld.plusDays(idx)));
349+
ps.executeUpdate();
350+
}
351+
352+
// stream read
353+
try (Statement st = conn.createStatement()) {
354+
int readed = 0;
355+
try (ResultSet rs = st.executeQuery(SELECT_ALL)) {
356+
while (rs.next()) {
357+
readed++;
358+
Assertions.assertEquals(readed, rs.getInt("id"));
359+
Assertions.assertEquals(prefix + readed, rs.getString("value"));
360+
Assertions.assertEquals(Date.valueOf(ld.plusDays(readed)), rs.getDate("date"));
361+
}
362+
}
363+
Assertions.assertEquals(2, readed);
364+
}
365+
366+
// batch upsert
367+
try (PreparedStatement ps = conn.prepareStatement(UPSERT_ROW)) {
368+
for (int j = 0; j < 2000; j++) {
369+
ps.setInt(1, ++idx);
370+
ps.setString(2, prefix + idx);
371+
ps.setDate(3, Date.valueOf(ld.plusDays(idx)));
372+
ps.addBatch();
373+
}
374+
ps.executeBatch();
375+
376+
// single row upsert
377+
ps.setInt(1, ++idx);
378+
ps.setString(2, prefix + idx);
379+
ps.setDate(3, Date.valueOf(ld.plusDays(idx)));
380+
ps.execute();
381+
382+
for (int j = 0; j < 2000; j++) {
383+
ps.setInt(1, ++idx);
384+
ps.setString(2, prefix + idx);
385+
ps.setDate(3, Date.valueOf(ld.plusDays(idx)));
386+
ps.addBatch();
387+
}
388+
ps.executeBatch();
389+
}
390+
391+
// batch inserts
392+
try (PreparedStatement ps = conn.prepareStatement(INSERT_ROW)) {
393+
for (int j = 0; j < 2000; j++) {
394+
ps.setInt(1, ++idx);
395+
ps.setString(2, prefix + idx);
396+
ps.setDate(3, Date.valueOf(ld.plusDays(idx)));
397+
ps.addBatch();
398+
}
399+
ps.executeBatch();
400+
401+
// single row insert
402+
ps.setInt(1, ++idx);
403+
ps.setString(2, prefix + idx);
404+
ps.setDate(3, Date.valueOf(ld.plusDays(idx)));
405+
ps.execute();
406+
407+
for (int j = 0; j < 2000; j++) {
408+
ps.setInt(1, ++idx);
409+
ps.setString(2, prefix + idx);
410+
ps.setDate(3, Date.valueOf(ld.plusDays(idx)));
411+
ps.addBatch();
412+
}
413+
ps.executeBatch();
414+
}
415+
416+
// read all
417+
try (Statement st = conn.createStatement()) {
418+
int readed = 0;
419+
try (ResultSet rs = st.executeQuery(SELECT_ALL)) {
420+
while (rs.next()) {
421+
readed++;
422+
Assertions.assertEquals(readed, rs.getInt("id"));
423+
Assertions.assertEquals(prefix + readed, rs.getString("value"));
424+
Assertions.assertEquals(Date.valueOf(ld.plusDays(readed)), rs.getDate("date"));
425+
}
426+
}
427+
Assertions.assertEquals(8004, readed);
428+
}
429+
430+
// single update
431+
try (PreparedStatement ps = conn.prepareStatement(UPDATE_ROW)) {
432+
ps.setString(1, "updated-value");
433+
ps.setInt(2, 1);
434+
ps.executeUpdate();
435+
}
436+
437+
// single delete
438+
try (PreparedStatement ps = conn.prepareStatement(DELETE_ROW)) {
439+
ps.setInt(1, 2);
440+
ps.executeUpdate();
441+
}
442+
}
443+
}
316444
}

jdbc/src/test/java/tech/ydb/jdbc/impl/YdbQueryConnectionImplTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -879,15 +879,15 @@ private String createPayload(Random rnd, int length) {
879879
@Timeout(value = 30, unit = TimeUnit.SECONDS, threadMode = Timeout.ThreadMode.SAME_THREAD)
880880
public void testBigBulkAndScan() throws SQLException {
881881
String bulkUpsert = QUERIES.upsertOne(SqlQueries.JdbcQuery.BULK, "c_Text", "Text?");
882-
String scanSelectAll = QUERIES.scanSelectSQL();
882+
String selectAll = QUERIES.selectSQL();
883883
String selectOne = QUERIES.selectAllByKey("?");
884884

885885
Random rnd = new Random(0x234567);
886886
int payloadLength = 1000;
887887

888-
try {
888+
try (Connection conn = jdbc.createCustomConnection("useStreamResultSets", "true")) {
889889
// BULK UPSERT
890-
try (PreparedStatement ps = jdbc.connection().prepareStatement(bulkUpsert)) {
890+
try (PreparedStatement ps = conn.prepareStatement(bulkUpsert)) {
891891
for (int idx = 1; idx <= 10000; idx++) {
892892
ps.setInt(1, idx);
893893
String payload = createPayload(rnd, payloadLength);
@@ -901,7 +901,7 @@ public void testBigBulkAndScan() throws SQLException {
901901
}
902902

903903
// SCAN all table
904-
try (PreparedStatement ps = jdbc.connection().prepareStatement(scanSelectAll)) {
904+
try (PreparedStatement ps = conn.prepareStatement(selectAll)) {
905905
int readed = 0;
906906
Assertions.assertTrue(ps.execute());
907907
try (ResultSet rs = ps.getResultSet()) {
@@ -915,7 +915,7 @@ public void testBigBulkAndScan() throws SQLException {
915915
}
916916

917917
// Canceled scan
918-
try (PreparedStatement ps = jdbc.connection().prepareStatement(scanSelectAll)) {
918+
try (PreparedStatement ps = conn.prepareStatement(selectAll)) {
919919
Assertions.assertTrue(ps.execute());
920920
ps.getResultSet().next();
921921
ps.getResultSet().close();
@@ -933,7 +933,7 @@ public void testBigBulkAndScan() throws SQLException {
933933
}
934934

935935
// Scan was cancelled, but connection still work
936-
try (PreparedStatement ps = jdbc.connection().prepareStatement(selectOne)) {
936+
try (PreparedStatement ps = conn.prepareStatement(selectOne)) {
937937
ps.setInt(1, 1234);
938938

939939
Assertions.assertTrue(ps.execute());

jdbc/src/test/java/tech/ydb/jdbc/settings/YdbDriverProperitesTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ static DriverPropertyInfo[] defaultPropertyInfo(@Nullable String localDatacenter
318318
new DriverPropertyInfo("sessionMaxIdleTime", ""),
319319
new DriverPropertyInfo("sessionPoolSizeMin", ""),
320320
new DriverPropertyInfo("sessionPoolSizeMax", ""),
321+
new DriverPropertyInfo("useStreamResultSets", "false"),
321322
new DriverPropertyInfo("joinDuration", "5m"),
322323
new DriverPropertyInfo("queryTimeout", "0s"),
323324
new DriverPropertyInfo("scanQueryTimeout", "5m"),
@@ -358,6 +359,7 @@ static DriverPropertyInfo[] customizedPropertyInfo() {
358359
new DriverPropertyInfo("sessionMaxIdleTime", "5m"),
359360
new DriverPropertyInfo("sessionPoolSizeMin", "3"),
360361
new DriverPropertyInfo("sessionPoolSizeMax", "4"),
362+
new DriverPropertyInfo("useStreamResultSets", "true"),
361363
new DriverPropertyInfo("joinDuration", "6m"),
362364
new DriverPropertyInfo("queryTimeout", "2m"),
363365
new DriverPropertyInfo("scanQueryTimeout", "3m"),

0 commit comments

Comments
 (0)