From ed8a3b92410a67b9f47d4c9c7f1ceccf1e27d00a Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Fri, 11 Oct 2024 16:40:15 +0100 Subject: [PATCH 1/3] Removed QueryService impelemntation of scanQuery --- .../ydb/jdbc/context/BaseYdbExecutor.java | 49 +++++++++++++++++++ .../jdbc/context/QueryServiceExecutor.java | 24 --------- .../jdbc/context/TableServiceExecutor.java | 44 ----------------- .../jdbc/impl/YdbQueryConnectionImplTest.java | 12 ----- .../YdbQueryPreparedStatementImplTest.java | 2 +- 5 files changed, 50 insertions(+), 81 deletions(-) diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java index 9f5c3bd3..8553c8d0 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java @@ -1,15 +1,25 @@ package tech.ydb.jdbc.context; import java.sql.SQLException; +import java.time.Duration; import java.util.Collections; import java.util.concurrent.atomic.AtomicReference; +import tech.ydb.core.Result; +import tech.ydb.core.UnexpectedResultException; +import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.jdbc.YdbConst; import tech.ydb.jdbc.YdbStatement; +import tech.ydb.jdbc.exception.ExceptionFactory; import tech.ydb.jdbc.impl.YdbQueryResult; import tech.ydb.jdbc.query.QueryType; import tech.ydb.jdbc.query.YdbQuery; +import tech.ydb.table.Session; import tech.ydb.table.SessionRetryContext; +import tech.ydb.table.TableClient; +import tech.ydb.table.query.Params; +import tech.ydb.table.result.ResultSetReader; +import tech.ydb.table.settings.ExecuteScanQuerySettings; import tech.ydb.table.settings.ExecuteSchemeQuerySettings; import tech.ydb.table.values.ListValue; @@ -19,13 +29,27 @@ */ public abstract class BaseYdbExecutor implements YdbExecutor { private final SessionRetryContext retryCtx; + private final Duration sessionTimeout; + private final TableClient tableClient; private final AtomicReference currResult; public BaseYdbExecutor(YdbContext ctx) { this.retryCtx = ctx.getRetryCtx(); + this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout(); + this.tableClient = ctx.getTableClient(); this.currResult = new AtomicReference<>(); } + protected Session createNewTableSession(YdbValidator validator) throws SQLException { + try { + Result session = tableClient.createSession(sessionTimeout).join(); + validator.addStatusIssues(session.getStatus()); + return session.getValue(); + } catch (UnexpectedResultException ex) { + throw ExceptionFactory.createException("Cannot create session with " + ex.getStatus(), ex); + } + } + protected void closeCurrentResult() throws SQLException { YdbQueryResult rs = currResult.get(); if (rs != null) { @@ -79,4 +103,29 @@ public YdbQueryResult executeBulkUpsert(YdbStatement statement, YdbQuery query, return updateCurrentResult(new StaticQueryResult(query, Collections.emptyList())); } + + @Override + public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, String yql, Params params) + throws SQLException { + ensureOpened(); + + YdbContext ctx = statement.getConnection().getCtx(); + YdbValidator validator = statement.getValidator(); + Duration scanQueryTimeout = ctx.getOperationProperties().getScanQueryTimeout(); + ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder() + .withRequestTimeout(scanQueryTimeout) + .build(); + + final Session session = createNewTableSession(validator); + + String msg = QueryType.SCAN_QUERY + " >>\n" + yql; + StreamQueryResult lazy = validator.call(msg, () -> { + GrpcReadStream stream = session.executeScanQuery(yql, params, settings); + StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel); + return result.execute(stream, session::close); + }); + + return updateCurrentResult(lazy); + } + } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java index 1a93cb6f..5955f889 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java @@ -245,30 +245,6 @@ public YdbQueryResult executeDataQuery( } } - @Override - public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, String yql, Params params) - throws SQLException { - ensureOpened(); - - YdbContext ctx = statement.getConnection().getCtx(); - YdbValidator validator = statement.getValidator(); - - Duration scanQueryTimeout = ctx.getOperationProperties().getScanQueryTimeout(); - ExecuteQuerySettings settings = ExecuteQuerySettings.newBuilder() - .withRequestTimeout(scanQueryTimeout) - .build(); - - final QuerySession session = createNewQuerySession(validator); - String msg = "STREAM_QUERY >>\n" + yql; - StreamQueryResult lazy = validator.call(msg, () -> { - QueryStream stream = session.createQuery(yql, TxMode.SNAPSHOT_RO, params, settings); - StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel); - return result.execute(stream, session::close); - }); - - return updateCurrentResult(lazy); - } - @Override public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query) throws SQLException { ensureOpened(); diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java index 30f6e877..d5d39a5b 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java @@ -7,26 +7,20 @@ import java.util.ArrayList; import java.util.List; -import tech.ydb.core.Result; -import tech.ydb.core.UnexpectedResultException; -import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.jdbc.YdbConst; import tech.ydb.jdbc.YdbResultSet; import tech.ydb.jdbc.YdbStatement; -import tech.ydb.jdbc.exception.ExceptionFactory; import tech.ydb.jdbc.impl.YdbQueryResult; import tech.ydb.jdbc.impl.YdbStaticResultSet; import tech.ydb.jdbc.query.QueryType; import tech.ydb.jdbc.query.YdbQuery; import tech.ydb.table.Session; -import tech.ydb.table.TableClient; import tech.ydb.table.query.DataQueryResult; import tech.ydb.table.query.ExplainDataQueryResult; import tech.ydb.table.query.Params; import tech.ydb.table.result.ResultSetReader; import tech.ydb.table.settings.CommitTxSettings; import tech.ydb.table.settings.ExecuteDataQuerySettings; -import tech.ydb.table.settings.ExecuteScanQuerySettings; import tech.ydb.table.settings.ExplainDataQuerySettings; import tech.ydb.table.settings.KeepAliveSessionSettings; import tech.ydb.table.settings.RollbackTxSettings; @@ -37,15 +31,11 @@ * @author Aleksandr Gorshenin */ public class TableServiceExecutor extends BaseYdbExecutor { - private final Duration sessionTimeout; - private final TableClient tableClient; private final boolean failOnTruncatedResult; private volatile TxState tx; public TableServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException { super(ctx); - this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout(); - this.tableClient = ctx.getTableClient(); this.tx = createTx(transactionLevel, autoCommit); this.failOnTruncatedResult = ctx.getOperationProperties().isFailOnTruncatedResult(); } @@ -63,16 +53,6 @@ private void updateState(TxState newTx) { this.tx = newTx; } - protected Session createNewTableSession(YdbValidator validator) throws SQLException { - try { - Result session = tableClient.createSession(sessionTimeout).join(); - validator.addStatusIssues(session.getStatus()); - return session.getValue(); - } catch (UnexpectedResultException ex) { - throw ExceptionFactory.createException("Cannot create session with " + ex.getStatus(), ex); - } - } - @Override public void setTransactionLevel(int level) throws SQLException { ensureOpened(); @@ -233,30 +213,6 @@ public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, S } } - @Override - public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, String yql, Params params) - throws SQLException { - ensureOpened(); - - YdbContext ctx = statement.getConnection().getCtx(); - YdbValidator validator = statement.getValidator(); - Duration scanQueryTimeout = ctx.getOperationProperties().getScanQueryTimeout(); - ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder() - .withRequestTimeout(scanQueryTimeout) - .build(); - - final Session session = createNewTableSession(validator); - - String msg = QueryType.SCAN_QUERY + " >>\n" + yql; - StreamQueryResult lazy = validator.call(msg, () -> { - GrpcReadStream stream = session.executeScanQuery(yql, params, settings); - StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel); - return result.execute(stream, session::close); - }); - - return updateCurrentResult(lazy); - } - @Override public boolean isValid(YdbValidator validator, int timeout) throws SQLException { ensureOpened(); diff --git a/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbQueryConnectionImplTest.java b/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbQueryConnectionImplTest.java index 24a1a6c7..995d62a9 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbQueryConnectionImplTest.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbQueryConnectionImplTest.java @@ -828,18 +828,6 @@ public void testWarningsInQuery() throws SQLException { warnings.getMessage()); Assertions.assertNull(warnings.getNextWarning()); } - - try (ResultSet rs = statement.executeQuery("SCAN " + query)) { - Assertions.assertFalse(rs.next()); - - SQLWarning warnings = statement.getWarnings(); - Assertions.assertNotNull(warnings); - - Assertions.assertEquals("#1060 Execution (S_WARNING)\n " - + "2:1 - 2:1: #2503 Given predicate is not suitable for used index: idx_value (S_WARNING)", - warnings.getMessage()); - Assertions.assertNull(warnings.getNextWarning()); - } } finally { statement.execute(dropTempTable); } diff --git a/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbQueryPreparedStatementImplTest.java b/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbQueryPreparedStatementImplTest.java index 2d155461..cd3f9f15 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbQueryPreparedStatementImplTest.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbQueryPreparedStatementImplTest.java @@ -350,7 +350,7 @@ public void executeScanQueryAsUpdate() throws SQLException { statement.setInt(1, 1); statement.setString(2, "value-1"); - ExceptionAssert.ydbException("Operation 'Upsert' can't be performed in read only transaction", + ExceptionAssert.ydbException("Scan query should have a single result set", statement::executeQuery); } } From f8b825dbd6a00a482a1cf0feb283a488ecb5a3c4 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Fri, 11 Oct 2024 17:12:52 +0100 Subject: [PATCH 2/3] Added option to use streamable result sets --- .../jdbc/context/QueryServiceExecutor.java | 18 +++ .../tech/ydb/jdbc/settings/YdbConfig.java | 3 +- .../jdbc/settings/YdbOperationProperties.java | 12 ++ .../tech/ydb/jdbc/YdbDriverTablesTest.java | 128 ++++++++++++++++++ .../jdbc/impl/YdbQueryConnectionImplTest.java | 12 +- .../settings/YdbDriverProperitesTest.java | 2 + 6 files changed, 168 insertions(+), 7 deletions(-) diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java index 5955f889..713a5a05 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java @@ -43,6 +43,7 @@ public class QueryServiceExecutor extends BaseYdbExecutor { private final Duration sessionTimeout; private final QueryClient queryClient; + private final boolean useStreamResultSet; private int transactionLevel; private boolean isReadOnly; @@ -56,6 +57,8 @@ public QueryServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCo super(ctx); this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout(); this.queryClient = ctx.getQueryClient(); + this.useStreamResultSet = ctx.getOperationProperties().getUseStreamResultSets(); + this.transactionLevel = transactionLevel; this.isReadOnly = transactionLevel != Connection.TRANSACTION_SERIALIZABLE; this.isAutoCommit = autoCommit; @@ -227,6 +230,21 @@ public YdbQueryResult executeDataQuery( tx = createNewQuerySession(validator).createNewTransaction(txMode); } + if (useStreamResultSet) { + String msg = "STREAM_QUERY >>\n" + yql; + StreamQueryResult lazy = validator.call(msg, () -> { + QueryStream stream = tx.createQuery(yql, isAutoCommit, params, settings); + StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel); + return result.execute(stream, () -> { + if (!tx.isActive()) { + cleanTx(); + } + }); + }); + + return updateCurrentResult(lazy); + } + try { QueryReader result = validator.call(QueryType.DATA_QUERY + " >>\n" + yql, () -> QueryReader.readFrom(tx.createQuery(yql, isAutoCommit, params, settings)) diff --git a/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbConfig.java b/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbConfig.java index 4411fd22..c8778ed7 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbConfig.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbConfig.java @@ -38,7 +38,7 @@ public class YdbConfig { + "{@code 0} disables the cache.", 256 ); static final YdbProperty USE_QUERY_SERVICE = YdbProperty.bool("useQueryService", - "Use QueryService intead of TableService", false + "Use QueryService instead of TableService", false ); static final YdbProperty FULLSCAN_DETECTOR_ENABLED = YdbProperty.bool( "jdbcFullScanDetector", "Enable analizator for collecting query stats", false @@ -167,6 +167,7 @@ public DriverPropertyInfo[] toPropertyInfo() throws SQLException { YdbClientProperties.SESSION_POOL_SIZE_MIN.toInfo(properties), YdbClientProperties.SESSION_POOL_SIZE_MAX.toInfo(properties), + YdbOperationProperties.USE_STREAM_RESULT_SETS.toInfo(properties), YdbOperationProperties.JOIN_DURATION.toInfo(properties), YdbOperationProperties.QUERY_TIMEOUT.toInfo(properties), YdbOperationProperties.SCAN_QUERY_TIMEOUT.toInfo(properties), diff --git a/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java b/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java index 41f939db..74c7bd6b 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java @@ -53,6 +53,10 @@ public class YdbOperationProperties { FakeTxMode.ERROR ); + static final YdbProperty USE_STREAM_RESULT_SETS = YdbProperty.bool("useStreamResultSets", + "Use stream implementation of ResultSet", false + ); + private static final int MAX_ROWS = 1000; // TODO: how to figure out the max rows of current connection? private final YdbValue joinDuration; @@ -68,6 +72,8 @@ public class YdbOperationProperties { private final YdbValue schemeQueryTxMode; private final YdbValue bulkQueryTxMode; + private final YdbValue useStreamResultSets; + public YdbOperationProperties(YdbConfig config) throws SQLException { Properties props = config.getProperties(); @@ -83,6 +89,8 @@ public YdbOperationProperties(YdbConfig config) throws SQLException { this.scanQueryTxMode = SCAN_QUERY_TX_MODE.readValue(props); this.schemeQueryTxMode = SCHEME_QUERY_TX_MODE.readValue(props); this.bulkQueryTxMode = BULK_QUERY_TX_MODE.readValue(props); + + this.useStreamResultSets = USE_STREAM_RESULT_SETS.readValue(props); } public Duration getJoinDuration() { @@ -129,6 +137,10 @@ public int getTransactionLevel() { return transactionLevel.getValue(); } + public boolean getUseStreamResultSets() { + return useStreamResultSets.getValue(); + } + public int getMaxRows() { return MAX_ROWS; } diff --git a/jdbc/src/test/java/tech/ydb/jdbc/YdbDriverTablesTest.java b/jdbc/src/test/java/tech/ydb/jdbc/YdbDriverTablesTest.java index 79828585..e1070f39 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/YdbDriverTablesTest.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/YdbDriverTablesTest.java @@ -313,4 +313,132 @@ public void forceScanAndBulkTest() throws SQLException { } } } + + @Test + public void streamResultsTest() throws SQLException { + try (Connection conn = DriverManager.getConnection(jdbcURL + .withArg("useQueryService", "true") + .withArg("useStreamResultSets", "true") + .build() + )) { + try { + conn.createStatement().execute(DROP_TABLE); + } catch (SQLException e) { + // ignore + } + + conn.createStatement().execute(CREATE_TABLE); + + LocalDate ld = LocalDate.of(2017, 12, 3); + String prefix = "text-value-"; + int idx = 0; + + // single batch upsert + try (PreparedStatement ps = conn.prepareStatement(UPSERT_ROW)) { + ps.setInt(1, ++idx); + ps.setString(2, prefix + idx); + ps.setDate(3, Date.valueOf(ld.plusDays(idx))); + ps.executeUpdate(); + } + + // single batch insert + try (PreparedStatement ps = conn.prepareStatement(INSERT_ROW)) { + ps.setInt(1, ++idx); + ps.setString(2, prefix + idx); + ps.setDate(3, Date.valueOf(ld.plusDays(idx))); + ps.executeUpdate(); + } + + // stream read + try (Statement st = conn.createStatement()) { + int readed = 0; + try (ResultSet rs = st.executeQuery(SELECT_ALL)) { + while (rs.next()) { + readed++; + Assertions.assertEquals(readed, rs.getInt("id")); + Assertions.assertEquals(prefix + readed, rs.getString("value")); + Assertions.assertEquals(Date.valueOf(ld.plusDays(readed)), rs.getDate("date")); + } + } + Assertions.assertEquals(2, readed); + } + + // batch upsert + try (PreparedStatement ps = conn.prepareStatement(UPSERT_ROW)) { + for (int j = 0; j < 2000; j++) { + ps.setInt(1, ++idx); + ps.setString(2, prefix + idx); + ps.setDate(3, Date.valueOf(ld.plusDays(idx))); + ps.addBatch(); + } + ps.executeBatch(); + + // single row upsert + ps.setInt(1, ++idx); + ps.setString(2, prefix + idx); + ps.setDate(3, Date.valueOf(ld.plusDays(idx))); + ps.execute(); + + for (int j = 0; j < 2000; j++) { + ps.setInt(1, ++idx); + ps.setString(2, prefix + idx); + ps.setDate(3, Date.valueOf(ld.plusDays(idx))); + ps.addBatch(); + } + ps.executeBatch(); + } + + // batch inserts + try (PreparedStatement ps = conn.prepareStatement(INSERT_ROW)) { + for (int j = 0; j < 2000; j++) { + ps.setInt(1, ++idx); + ps.setString(2, prefix + idx); + ps.setDate(3, Date.valueOf(ld.plusDays(idx))); + ps.addBatch(); + } + ps.executeBatch(); + + // single row insert + ps.setInt(1, ++idx); + ps.setString(2, prefix + idx); + ps.setDate(3, Date.valueOf(ld.plusDays(idx))); + ps.execute(); + + for (int j = 0; j < 2000; j++) { + ps.setInt(1, ++idx); + ps.setString(2, prefix + idx); + ps.setDate(3, Date.valueOf(ld.plusDays(idx))); + ps.addBatch(); + } + ps.executeBatch(); + } + + // read all + try (Statement st = conn.createStatement()) { + int readed = 0; + try (ResultSet rs = st.executeQuery(SELECT_ALL)) { + while (rs.next()) { + readed++; + Assertions.assertEquals(readed, rs.getInt("id")); + Assertions.assertEquals(prefix + readed, rs.getString("value")); + Assertions.assertEquals(Date.valueOf(ld.plusDays(readed)), rs.getDate("date")); + } + } + Assertions.assertEquals(8004, readed); + } + + // single update + try (PreparedStatement ps = conn.prepareStatement(UPDATE_ROW)) { + ps.setString(1, "updated-value"); + ps.setInt(2, 1); + ps.executeUpdate(); + } + + // single delete + try (PreparedStatement ps = conn.prepareStatement(DELETE_ROW)) { + ps.setInt(1, 2); + ps.executeUpdate(); + } + } + } } diff --git a/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbQueryConnectionImplTest.java b/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbQueryConnectionImplTest.java index 995d62a9..bc1eb65c 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbQueryConnectionImplTest.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbQueryConnectionImplTest.java @@ -879,15 +879,15 @@ private String createPayload(Random rnd, int length) { @Timeout(value = 30, unit = TimeUnit.SECONDS, threadMode = Timeout.ThreadMode.SAME_THREAD) public void testBigBulkAndScan() throws SQLException { String bulkUpsert = QUERIES.upsertOne(SqlQueries.JdbcQuery.BULK, "c_Text", "Text?"); - String scanSelectAll = QUERIES.scanSelectSQL(); + String selectAll = QUERIES.selectSQL(); String selectOne = QUERIES.selectAllByKey("?"); Random rnd = new Random(0x234567); int payloadLength = 1000; - try { + try (Connection conn = jdbc.createCustomConnection("useStreamResultSets", "true")) { // BULK UPSERT - try (PreparedStatement ps = jdbc.connection().prepareStatement(bulkUpsert)) { + try (PreparedStatement ps = conn.prepareStatement(bulkUpsert)) { for (int idx = 1; idx <= 10000; idx++) { ps.setInt(1, idx); String payload = createPayload(rnd, payloadLength); @@ -901,7 +901,7 @@ public void testBigBulkAndScan() throws SQLException { } // SCAN all table - try (PreparedStatement ps = jdbc.connection().prepareStatement(scanSelectAll)) { + try (PreparedStatement ps = conn.prepareStatement(selectAll)) { int readed = 0; Assertions.assertTrue(ps.execute()); try (ResultSet rs = ps.getResultSet()) { @@ -915,7 +915,7 @@ public void testBigBulkAndScan() throws SQLException { } // Canceled scan - try (PreparedStatement ps = jdbc.connection().prepareStatement(scanSelectAll)) { + try (PreparedStatement ps = conn.prepareStatement(selectAll)) { Assertions.assertTrue(ps.execute()); ps.getResultSet().next(); ps.getResultSet().close(); @@ -933,7 +933,7 @@ public void testBigBulkAndScan() throws SQLException { } // Scan was cancelled, but connection still work - try (PreparedStatement ps = jdbc.connection().prepareStatement(selectOne)) { + try (PreparedStatement ps = conn.prepareStatement(selectOne)) { ps.setInt(1, 1234); Assertions.assertTrue(ps.execute()); diff --git a/jdbc/src/test/java/tech/ydb/jdbc/settings/YdbDriverProperitesTest.java b/jdbc/src/test/java/tech/ydb/jdbc/settings/YdbDriverProperitesTest.java index ae52b9ae..31f1388c 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/settings/YdbDriverProperitesTest.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/settings/YdbDriverProperitesTest.java @@ -318,6 +318,7 @@ static DriverPropertyInfo[] defaultPropertyInfo(@Nullable String localDatacenter new DriverPropertyInfo("sessionMaxIdleTime", ""), new DriverPropertyInfo("sessionPoolSizeMin", ""), new DriverPropertyInfo("sessionPoolSizeMax", ""), + new DriverPropertyInfo("useStreamResultSets", "false"), new DriverPropertyInfo("joinDuration", "5m"), new DriverPropertyInfo("queryTimeout", "0s"), new DriverPropertyInfo("scanQueryTimeout", "5m"), @@ -358,6 +359,7 @@ static DriverPropertyInfo[] customizedPropertyInfo() { new DriverPropertyInfo("sessionMaxIdleTime", "5m"), new DriverPropertyInfo("sessionPoolSizeMin", "3"), new DriverPropertyInfo("sessionPoolSizeMax", "4"), + new DriverPropertyInfo("useStreamResultSets", "true"), new DriverPropertyInfo("joinDuration", "6m"), new DriverPropertyInfo("queryTimeout", "2m"), new DriverPropertyInfo("scanQueryTimeout", "3m"), From 483976d316d5556c6f0309cd240cfc38032384fb Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Mon, 14 Oct 2024 11:16:09 +0100 Subject: [PATCH 3/3] More simplify implementation of StreamQueryResult --- .../ydb/jdbc/context/BaseYdbExecutor.java | 26 +++- .../jdbc/context/QueryServiceExecutor.java | 107 +++++++++++----- .../ydb/jdbc/context/StreamQueryResult.java | 115 +++++------------- 3 files changed, 132 insertions(+), 116 deletions(-) diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java index 8553c8d0..3adee199 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java @@ -3,6 +3,7 @@ import java.sql.SQLException; import java.time.Duration; import java.util.Collections; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; import tech.ydb.core.Result; @@ -120,9 +121,28 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S String msg = QueryType.SCAN_QUERY + " >>\n" + yql; StreamQueryResult lazy = validator.call(msg, () -> { - GrpcReadStream stream = session.executeScanQuery(yql, params, settings); - StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel); - return result.execute(stream, session::close); + final CompletableFuture> future = new CompletableFuture<>(); + final GrpcReadStream stream = session.executeScanQuery(yql, params, settings); + final StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel); + + stream.start((rsr) -> { + future.complete(Result.success(result)); + result.onStreamResultSet(0, rsr); + }).whenComplete((st, th) -> { + session.close(); + + if (th != null) { + result.onStreamFinished(th); + future.completeExceptionally(th); + } + if (st != null) { + validator.addStatusIssues(st); + result.onStreamFinished(st); + future.complete(st.isSuccess() ? Result.success(result) : Result.fail(st)); + } + }); + + return future; }); return updateCurrentResult(lazy); diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java index 713a5a05..dea2023f 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java @@ -8,7 +8,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Issue; @@ -50,8 +52,8 @@ public class QueryServiceExecutor extends BaseYdbExecutor { private boolean isAutoCommit; private TxMode txMode; - private QueryTransaction tx; - private boolean isClosed; + private final AtomicReference tx = new AtomicReference<>(); + private volatile boolean isClosed; public QueryServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException { super(ctx); @@ -63,7 +65,6 @@ public QueryServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCo this.isReadOnly = transactionLevel != Connection.TRANSACTION_SERIALIZABLE; this.isAutoCommit = autoCommit; this.txMode = txMode(transactionLevel, isReadOnly); - this.tx = null; this.isClosed = false; } @@ -81,14 +82,10 @@ protected QuerySession createNewQuerySession(YdbValidator validator) throws SQLE @Override public void close() throws SQLException { closeCurrentResult(); - cleanTx(); isClosed = true; - } - - private void cleanTx() { - if (tx != null) { - tx.getSession().close(); - tx = null; + QueryTransaction old = tx.getAndSet(null); + if (old != null) { + old.getSession().close(); } } @@ -100,7 +97,8 @@ public void setTransactionLevel(int level) throws SQLException { return; } - if (tx != null && tx.isActive()) { + QueryTransaction localTx = tx.get(); + if (localTx != null && localTx.isActive()) { throw new SQLFeatureNotSupportedException(YdbConst.CHANGE_ISOLATION_INSIDE_TX); } @@ -117,7 +115,8 @@ public void setReadOnly(boolean readOnly) throws SQLException { return; } - if (tx != null && tx.isActive()) { + QueryTransaction localTx = tx.get(); + if (localTx != null && localTx.isActive()) { throw new SQLFeatureNotSupportedException(YdbConst.READONLY_INSIDE_TRANSACTION); } @@ -133,7 +132,8 @@ public void setAutoCommit(boolean autoCommit) throws SQLException { return; } - if (tx != null && tx.isActive()) { + QueryTransaction localTx = tx.get(); + if (localTx != null && localTx.isActive()) { throw new SQLFeatureNotSupportedException(YdbConst.CHANGE_ISOLATION_INSIDE_TX); } @@ -149,13 +149,15 @@ public boolean isClosed() throws SQLException { @Override public String txID() throws SQLException { closeCurrentResult(); - return tx != null ? tx.getId() : null; + QueryTransaction localTx = tx.get(); + return localTx != null ? localTx.getId() : null; } @Override public boolean isInsideTransaction() throws SQLException { ensureOpened(); - return tx != null && tx.isActive(); + QueryTransaction localTx = tx.get(); + return localTx != null && localTx.isActive(); } @Override @@ -180,16 +182,19 @@ public int transactionLevel() throws SQLException { public void commit(YdbContext ctx, YdbValidator validator) throws SQLException { ensureOpened(); - if (tx == null || !tx.isActive()) { + QueryTransaction localTx = tx.get(); + if (localTx == null || !localTx.isActive()) { return; } CommitTransactionSettings settings = ctx.withRequestTimeout(CommitTransactionSettings.newBuilder()).build(); try { validator.clearWarnings(); - validator.call("Commit TxId: " + tx.getId(), () -> tx.commit(settings)); + validator.call("Commit TxId: " + localTx.getId(), () -> localTx.commit(settings)); } finally { - cleanTx(); + if (tx.compareAndSet(localTx, null)) { + localTx.getSession().close(); + } } } @@ -197,7 +202,8 @@ public void commit(YdbContext ctx, YdbValidator validator) throws SQLException { public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException { ensureOpened(); - if (tx == null || !tx.isActive()) { + QueryTransaction localTx = tx.get(); + if (localTx == null || !localTx.isActive()) { return; } @@ -206,9 +212,11 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException try { validator.clearWarnings(); - validator.execute("Rollback TxId: " + tx.getId(), () -> tx.rollback(settings)); + validator.execute("Rollback TxId: " + localTx.getId(), () -> localTx.rollback(settings)); } finally { - cleanTx(); + if (tx.compareAndSet(localTx, null)) { + localTx.getSession().close(); + } } } @@ -226,20 +234,55 @@ public YdbQueryResult executeDataQuery( } final ExecuteQuerySettings settings = builder.build(); - if (tx == null) { - tx = createNewQuerySession(validator).createNewTransaction(txMode); + QueryTransaction nextTx = tx.get(); + while (nextTx == null) { + nextTx = createNewQuerySession(validator).createNewTransaction(txMode); + if (!tx.compareAndSet(null, nextTx)) { + nextTx.getSession().close(); + nextTx = tx.get(); + } } + final QueryTransaction localTx = nextTx; + if (useStreamResultSet) { String msg = "STREAM_QUERY >>\n" + yql; StreamQueryResult lazy = validator.call(msg, () -> { - QueryStream stream = tx.createQuery(yql, isAutoCommit, params, settings); - StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel); - return result.execute(stream, () -> { - if (!tx.isActive()) { - cleanTx(); + final CompletableFuture> future = new CompletableFuture<>(); + final QueryStream stream = localTx.createQuery(yql, isAutoCommit, params, settings); + final StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel); + + + stream.execute(new QueryStream.PartsHandler() { + @Override + public void onIssues(Issue[] issues) { + validator.addStatusIssues(Arrays.asList(issues)); + } + + @Override + public void onNextPart(QueryResultPart part) { + result.onStreamResultSet((int) part.getResultSetIndex(), part.getResultSetReader()); + future.complete(Result.success(result)); + } + }).whenComplete((res, th) -> { + if (!localTx.isActive()) { + if (tx.compareAndSet(localTx, null)) { + localTx.getSession().close(); + } + } + + if (th != null) { + future.completeExceptionally(th); + result.onStreamFinished(th); + } + if (res != null) { + validator.addStatusIssues(res.getStatus()); + future.complete(res.isSuccess() ? Result.success(result) : Result.fail(res.getStatus())); + result.onStreamFinished(res.getStatus()); } }); + + return future; }); return updateCurrentResult(lazy); @@ -247,7 +290,7 @@ public YdbQueryResult executeDataQuery( try { QueryReader result = validator.call(QueryType.DATA_QUERY + " >>\n" + yql, - () -> QueryReader.readFrom(tx.createQuery(yql, isAutoCommit, params, settings)) + () -> QueryReader.readFrom(localTx.createQuery(yql, isAutoCommit, params, settings)) ); validator.addStatusIssues(result.getIssueList()); @@ -257,8 +300,10 @@ public YdbQueryResult executeDataQuery( } return updateCurrentResult(new StaticQueryResult(query, readers)); } finally { - if (!tx.isActive()) { - cleanTx(); + if (!localTx.isActive()) { + if (tx.compareAndSet(localTx, null)) { + localTx.getSession().close(); + } } } } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java b/jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java index c64a0983..7870d96a 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java @@ -5,7 +5,6 @@ import java.sql.SQLFeatureNotSupportedException; import java.sql.Statement; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -16,11 +15,9 @@ import java.util.logging.Level; import java.util.logging.Logger; -import tech.ydb.core.Issue; import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.UnexpectedResultException; -import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.jdbc.YdbConst; import tech.ydb.jdbc.YdbResultSet; import tech.ydb.jdbc.YdbStatement; @@ -30,8 +27,6 @@ import tech.ydb.jdbc.impl.YdbQueryResult; import tech.ydb.jdbc.query.QueryStatement; import tech.ydb.jdbc.query.YdbQuery; -import tech.ydb.query.QueryStream; -import tech.ydb.query.result.QueryResultPart; import tech.ydb.table.result.ResultSetReader; import tech.ydb.table.result.ValueReader; @@ -47,22 +42,21 @@ public class StreamQueryResult implements YdbQueryResult { private final String msg; private final YdbStatement statement; - private final Runnable stopRunnable; + private final Runnable streamStopper; private final CompletableFuture streamFuture = new CompletableFuture<>(); - private final CompletableFuture> startFuture = new CompletableFuture<>(); + private final AtomicBoolean streamCancelled = new AtomicBoolean(false); private final int[] resultIndexes; private final List>> resultFutures = new ArrayList<>(); - private final AtomicBoolean streamCancelled = new AtomicBoolean(false); private int resultIndex = 0; private volatile boolean resultClosed = false; - public StreamQueryResult(String msg, YdbStatement statement, YdbQuery query, Runnable stopRunnable) { + public StreamQueryResult(String msg, YdbStatement statement, YdbQuery query, Runnable streamStopper) { this.msg = msg; this.statement = statement; - this.stopRunnable = stopRunnable; + this.streamStopper = streamStopper; this.resultIndexes = new int[query.getStatements().size()]; @@ -84,49 +78,42 @@ public StreamQueryResult(String msg, YdbStatement statement, YdbQuery query, Run } } - public CompletableFuture> execute(QueryStream stream, Runnable finish) { - LOGGER.log(Level.FINE, "Stream executed by QueryStream"); - stream.execute(new QueryPartsHandler()) - .thenApply(Result::getStatus) - .whenComplete(this::onStreamFinished) - .thenRun(finish); - return startFuture; - } + public void onStreamResultSet(int index, ResultSetReader rsr) { + CompletableFuture> future = resultFutures.get(index); + if (!future.isDone()) { + ColumnInfo[] columns = ColumnInfo.fromResultSetReader(rsr); + future.complete(Result.success(new LazyResultSet(statement, columns))); + } - public CompletableFuture> execute( - GrpcReadStream stream, Runnable finish - ) { - LOGGER.log(Level.FINE, "Stream executed by ScanQuery"); - stream.start(new ScanQueryHandler()) - .whenComplete(this::onStreamFinished) - .thenRun(finish); - return startFuture; + Result res = future.join(); + if (res.isSuccess()) { + res.getValue().addResultSet(rsr); + } } - private void onStreamFinished(Status status, Throwable th) { - if (th != null) { - streamFuture.completeExceptionally(th); - for (CompletableFuture> future: resultFutures) { - future.completeExceptionally(th); - } - startFuture.completeExceptionally(th); + public void onStreamFinished(Throwable th) { + streamFuture.completeExceptionally(th); + for (CompletableFuture> future: resultFutures) { + future.completeExceptionally(th); } - if (status != null) { - streamFuture.complete(status); + completeAllSets(); + } + + public void onStreamFinished(Status status) { + for (CompletableFuture> future : resultFutures) { if (status.isSuccess()) { - for (CompletableFuture> future: resultFutures) { - future.complete(Result.success(new LazyResultSet(statement, new ColumnInfo[0]), status)); - } - startFuture.complete(Result.success(this)); + future.complete(Result.success(new LazyResultSet(statement, new ColumnInfo[0]), status)); } else { - for (CompletableFuture> future: resultFutures) { - future.complete(Result.fail(status)); - } - startFuture.complete(Result.fail(status)); + future.complete(Result.fail(status)); } } + streamFuture.complete(status); + + completeAllSets(); + } + private void completeAllSets() { for (CompletableFuture> future: resultFutures) { if (!future.isCompletedExceptionally()) { Result rs = future.join(); @@ -155,13 +142,13 @@ private void checkStream() { if (!streamFuture.isDone() && streamCancelled.compareAndSet(false, true)) { LOGGER.log(Level.FINE, "Stream cancel"); - stopRunnable.run(); + streamStopper.run(); } } @Override public void close() throws SQLException { - if (startFuture.isDone() && resultClosed) { + if (streamFuture.isDone() && resultClosed) { return; } @@ -170,8 +157,6 @@ public void close() throws SQLException { resultClosed = true; Status status = streamFuture.join(); - statement.getValidator().addStatusIssues(status); - if (streamCancelled.get()) { LOGGER.log(Level.FINE, "Stream canceled and finished with status {0}", status); return; @@ -252,40 +237,6 @@ public boolean getMoreResults(int current) throws SQLException { } - private void onResultSet(int index, ResultSetReader rsr) { - CompletableFuture> future = resultFutures.get(index); - if (!future.isDone()) { - ColumnInfo[] columns = ColumnInfo.fromResultSetReader(rsr); - future.complete(Result.success(new LazyResultSet(statement, columns))); - } - - Result res = future.join(); - if (res.isSuccess()) { - res.getValue().addResultSet(rsr); - } - } - - private class ScanQueryHandler implements GrpcReadStream.Observer { - @Override - public void onNext(ResultSetReader part) { - onResultSet(0, part); - startFuture.complete(Result.success(StreamQueryResult.this)); - } - } - - private class QueryPartsHandler implements QueryStream.PartsHandler { - @Override - public void onIssues(Issue[] issues) { - statement.getValidator().addStatusIssues(Arrays.asList(issues)); - } - - @Override - public void onNextPart(QueryResultPart part) { - onResultSet((int) part.getResultSetIndex(), part.getResultSetReader()); - startFuture.complete(Result.success(StreamQueryResult.this)); - } - } - private class LazyResultSet extends BaseYdbResultSet { private final BlockingQueue readers = new ArrayBlockingQueue<>(5); private final AtomicLong rowsCount = new AtomicLong(); @@ -314,7 +265,7 @@ public void addResultSet(ResultSetReader rsr) { } catch (InterruptedException ex) { if (streamFuture.completeExceptionally(ex)) { LOGGER.log(Level.WARNING, "LazyResultSet offer interrupted"); - stopRunnable.run(); + streamStopper.run(); } return; }